You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by kt...@apache.org on 2016/07/15 15:31:42 UTC
[2/3] incubator-fluo git commit: fixes #639 vastly improved the
scanner API
fixes #639 vastly improved the scanner API
Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo/commit/23374784
Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo/tree/23374784
Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo/diff/23374784
Branch: refs/heads/master
Commit: 233747847831efc4dad6cc030a895860ead7d71e
Parents: e72a931
Author: Keith Turner <kt...@apache.org>
Authored: Wed Jul 13 18:09:15 2016 -0400
Committer: Keith Turner <kt...@apache.org>
Committed: Fri Jul 15 11:27:13 2016 -0400
----------------------------------------------------------------------
.../apache/fluo/api/client/SnapshotBase.java | 43 +++-
.../fluo/api/client/scanner/CellScanner.java | 25 ++
.../fluo/api/client/scanner/ColumnScanner.java | 30 +++
.../fluo/api/client/scanner/RowScanner.java | 23 ++
.../api/client/scanner/RowScannerBuilder.java | 26 ++
.../fluo/api/client/scanner/ScannerBuilder.java | 61 +++++
.../fluo/api/config/ScannerConfiguration.java | 98 --------
.../org/apache/fluo/api/data/ColumnValue.java | 76 ++++++
.../fluo/api/iterator/ColumnIterator.java | 31 ---
.../apache/fluo/api/iterator/RowIterator.java | 30 ---
.../apache/fluo/cluster/runner/AppRunner.java | 107 ++++----
.../apache/fluo/cluster/runner/ScanTest.java | 8 +-
.../fluo/core/impl/ColumnIteratorImpl.java | 90 -------
.../apache/fluo/core/impl/RowIteratorImpl.java | 77 ------
.../apache/fluo/core/impl/SnapshotScanner.java | 244 +++++++++++--------
.../apache/fluo/core/impl/TransactionImpl.java | 61 +++--
.../fluo/core/impl/scanner/CellScannerImpl.java | 57 +++++
.../core/impl/scanner/ColumnScannerImpl.java | 71 ++++++
.../fluo/core/impl/scanner/RowScannerImpl.java | 50 ++++
.../core/impl/scanner/ScannerBuilderImpl.java | 90 +++++++
.../fluo/core/log/TracingTransaction.java | 9 +-
.../core/config/ScannerConfigurationTest.java | 92 -------
.../org/apache/fluo/integration/ITBase.java | 21 +-
.../org/apache/fluo/integration/ITBaseImpl.java | 1 +
.../fluo/integration/TestTransaction.java | 7 +-
.../apache/fluo/integration/impl/FluoIT.java | 31 +--
.../apache/fluo/integration/impl/ScannerIT.java | 217 +++++++++++++++++
.../fluo/integration/impl/StochasticBankIT.java | 24 +-
.../integration/impl/WeakNotificationIT.java | 21 +-
.../apache/fluo/integration/impl/WorkerIT.java | 17 +-
.../fluo/mapreduce/FluoEntryInputFormat.java | 45 ++--
.../fluo/mapreduce/FluoRowInputFormat.java | 37 +--
pom.xml | 4 +-
33 files changed, 1082 insertions(+), 742 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java b/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
index 778aa94..79e1cef 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
@@ -19,11 +19,11 @@ import java.util.Collection;
import java.util.Map;
import java.util.Set;
-import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.client.scanner.ScannerBuilder;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
-import org.apache.fluo.api.iterator.RowIterator;
+import org.apache.fluo.api.data.Span;
/**
* Allows users to read from a Fluo table at a certain point in time
@@ -57,9 +57,44 @@ public interface SnapshotBase {
Map<Bytes, Map<Column, Bytes>> get(Collection<RowColumn> rowColumns);
/**
- * Retrieves a {@link RowIterator} with the given {@link ScannerConfiguration}
+ * This method is the starting point for constructing a scanner. Scanners can be constructed over
+ * a {@link Span} and/or with a subset of columns. Below is simple example of building a scanner.
+ *
+ * <pre>
+ * {@code
+ * Transaction tx = ...;
+ * Span span = Span.exact("row4");
+ * Column col1 = new Column("fam1","qual1");
+ * Column col2 = new Column("fam1","qual2");
+ *
+ * //create a scanner over row4 fetching columns fam1:qual1 and fam1:qual2
+ * CellScanner cs = tx.scanner().over(span).fetch(col1,col2).build();
+ * for(RowColumnValue rcv : cs) {
+ * //do stuff with rcv
+ * }
+ * }
+ * </pre>
+ *
+ * <p>
+ * The following example shows how to build a row scanner.
+ *
+ * <pre>
+ * {
+ * @code
+ * RowScanner rs = tx.scanner().over(span).fetch(col1, col2).byRow().build();
+ * for (ColumnScanner colScanner : rs) {
+ * Bytes row = colScanner.getRow();
+ * for (ColumnValue cv : colScanner) {
+ * // do stuff with the columns and values in the row
+ * }
+ * }
+ * }
+ * </pre>
+ *
+ * @return A scanner builder.
*/
- RowIterator get(ScannerConfiguration config);
+
+ ScannerBuilder scanner();
/**
* Wrapper for {@link #get(Collection)} that uses Strings. All strings are encoded and decoded
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/api/src/main/java/org/apache/fluo/api/client/scanner/CellScanner.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/scanner/CellScanner.java b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/CellScanner.java
new file mode 100644
index 0000000..b2bf50e
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/CellScanner.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.api.client.scanner;
+
+import org.apache.fluo.api.data.RowColumnValue;
+
+/**
+ * @since 1.0.0
+ */
+public interface CellScanner extends Iterable<RowColumnValue> {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/api/src/main/java/org/apache/fluo/api/client/scanner/ColumnScanner.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/scanner/ColumnScanner.java b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/ColumnScanner.java
new file mode 100644
index 0000000..9e790ea
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/ColumnScanner.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.api.client.scanner;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.ColumnValue;
+
+/**
+ * @since 1.0.0
+ */
+public interface ColumnScanner extends Iterable<ColumnValue> {
+
+ /**
+ * @return the row for all column values
+ */
+ Bytes getRow();
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/api/src/main/java/org/apache/fluo/api/client/scanner/RowScanner.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/scanner/RowScanner.java b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/RowScanner.java
new file mode 100644
index 0000000..e65816b
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/RowScanner.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.api.client.scanner;
+
+/**
+ * @since 1.0.0
+ */
+public interface RowScanner extends Iterable<ColumnScanner> {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/api/src/main/java/org/apache/fluo/api/client/scanner/RowScannerBuilder.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/scanner/RowScannerBuilder.java b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/RowScannerBuilder.java
new file mode 100644
index 0000000..fa7c60e
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/RowScannerBuilder.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.api.client.scanner;
+
+/**
+ * @since 1.0.0
+ */
+public interface RowScannerBuilder {
+ /**
+ * @return a new scanner created with any previously set restrictions
+ */
+ RowScanner build();
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/api/src/main/java/org/apache/fluo/api/client/scanner/ScannerBuilder.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/scanner/ScannerBuilder.java b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/ScannerBuilder.java
new file mode 100644
index 0000000..1217aa0
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/ScannerBuilder.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.api.client.scanner;
+
+import java.util.Collection;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+
+/**
+ * @since 1.0.0
+ */
+public interface ScannerBuilder {
+ /**
+ * @param span restrict the scanner to data within span
+ * @return self
+ */
+ ScannerBuilder over(Span span);
+
+
+ /**
+ * Passing in a Column with only the family set will fetch the entire column family.
+ *
+ * @param columns restrict the scanner to only these columns
+ * @return self
+ */
+ ScannerBuilder fetch(Column... columns);
+
+ /**
+ * Passing in a Column with only the family set will fetch the entire column family.
+ *
+ * @param columns restrict the scanner to only these columns
+ * @return self
+ */
+ ScannerBuilder fetch(Collection<Column> columns);
+
+ /**
+ * @return a new scanner created with any previously set restrictions
+ */
+ CellScanner build();
+
+ /**
+ * Call this to build a row scanner.
+ *
+ * @return a row scanner builder using any previously set restrictions
+ */
+ RowScannerBuilder byRow();
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/api/src/main/java/org/apache/fluo/api/config/ScannerConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/config/ScannerConfiguration.java b/modules/api/src/main/java/org/apache/fluo/api/config/ScannerConfiguration.java
deleted file mode 100644
index 3a73b50..0000000
--- a/modules/api/src/main/java/org/apache/fluo/api/config/ScannerConfiguration.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.api.config;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Objects;
-import java.util.Set;
-
-import org.apache.fluo.api.client.SnapshotBase;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.Span;
-
-/**
- * Contains configuration for a {@link org.apache.fluo.api.client.Snapshot} scanner. Passed to
- * {@link SnapshotBase#get(ScannerConfiguration)}.
- *
- * @since 1.0.0
- */
-public class ScannerConfiguration implements Cloneable {
-
- private Span span = new Span();
- private Set<Column> columns = new HashSet<>();
-
- /**
- * Sets {@link Span} for ScannerConfiguration
- */
- public ScannerConfiguration setSpan(Span span) {
- Objects.requireNonNull(span);
- this.span = span;
- return this;
- }
-
- /**
- * Retrieves {@link Span} for ScannerConfiguration
- */
- public Span getSpan() {
- return span;
- }
-
- /**
- * List of all {@link Column}s that scanner will retrieve
- */
- public Set<Column> getColumns() {
- return Collections.unmodifiableSet(columns);
- }
-
- /**
- * Configures scanner to retrieve column with the given family
- */
- public ScannerConfiguration fetchColumnFamily(Bytes fam) {
- Objects.requireNonNull(fam);
- columns.add(new Column(fam));
- return this;
- }
-
- /**
- * Configures scanner to retrieve column with the given family and qualifier
- */
- public ScannerConfiguration fetchColumn(Bytes fam, Bytes qual) {
- Objects.requireNonNull(fam);
- Objects.requireNonNull(qual);
- columns.add(new Column(fam, qual));
- return this;
- }
-
- /**
- * Clears all fetched column settings
- */
- public void clearColumns() {
- columns.clear();
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public Object clone() throws CloneNotSupportedException {
- ScannerConfiguration sc = (ScannerConfiguration) super.clone();
-
- sc.columns = (Set<Column>) ((HashSet<Column>) columns).clone();
- sc.span = span;
-
- return sc;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/api/src/main/java/org/apache/fluo/api/data/ColumnValue.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/data/ColumnValue.java b/modules/api/src/main/java/org/apache/fluo/api/data/ColumnValue.java
new file mode 100644
index 0000000..23a3741
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/fluo/api/data/ColumnValue.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.api.data;
+
+import java.io.Serializable;
+
+/**
+ * @since 1.0.0
+ */
+
+public class ColumnValue implements Serializable, Comparable<ColumnValue> {
+ private static final long serialVersionUID = 1L;
+
+ private Column column;
+ private Bytes val;
+
+ public ColumnValue(Column col, Bytes val) {
+ this.column = col;
+ this.val = val;
+ }
+
+ public ColumnValue(Column col, String val) {
+ this.column = col;
+ this.val = Bytes.of(val);
+ }
+
+ public Column getColumn() {
+ return column;
+ }
+
+ public Bytes getValue() {
+ return val;
+ }
+
+ @Override
+ public int compareTo(ColumnValue o) {
+ int comp = column.compareTo(o.column);
+ if (comp == 0) {
+ comp = val.compareTo(o.val);
+ }
+ return comp;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof ColumnValue) {
+ ColumnValue ocv = (ColumnValue) o;
+ return column.equals(ocv.column) && val.equals(ocv.val);
+ }
+
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return column.hashCode() + 31 * val.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return column + " " + val;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/api/src/main/java/org/apache/fluo/api/iterator/ColumnIterator.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/iterator/ColumnIterator.java b/modules/api/src/main/java/org/apache/fluo/api/iterator/ColumnIterator.java
deleted file mode 100644
index be04738..0000000
--- a/modules/api/src/main/java/org/apache/fluo/api/iterator/ColumnIterator.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.api.iterator;
-
-import java.util.Iterator;
-import java.util.Map.Entry;
-
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-
-/**
- * Iterator for Fluo columns
- *
- * @since 1.0.0
- */
-public interface ColumnIterator extends Iterator<Entry<Column, Bytes>> {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/api/src/main/java/org/apache/fluo/api/iterator/RowIterator.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/iterator/RowIterator.java b/modules/api/src/main/java/org/apache/fluo/api/iterator/RowIterator.java
deleted file mode 100644
index 21e53b1..0000000
--- a/modules/api/src/main/java/org/apache/fluo/api/iterator/RowIterator.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.api.iterator;
-
-import java.util.Iterator;
-import java.util.Map.Entry;
-
-import org.apache.fluo.api.data.Bytes;
-
-/**
- * Iterator for Fluo rows
- *
- * @since 1.0.0
- */
-public interface RowIterator extends Iterator<Entry<Bytes, ColumnIterator>> {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
----------------------------------------------------------------------
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
index ca9afe0..d40ff9e 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
@@ -17,7 +17,8 @@ package org.apache.fluo.cluster.runner;
import java.lang.reflect.Method;
import java.util.Arrays;
-import java.util.Map;
+import java.util.Collection;
+import java.util.HashSet;
import javax.inject.Provider;
@@ -35,14 +36,13 @@ import org.apache.fluo.accumulo.format.FluoFormatter;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.scanner.CellScanner;
import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ScannerConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumnValue;
import org.apache.fluo.api.data.Span;
import org.apache.fluo.api.exceptions.FluoException;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
import org.apache.fluo.cluster.util.FluoYarnConfig;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.Notification;
@@ -68,9 +68,8 @@ public abstract class AppRunner {
this.scriptName = scriptName;
}
- public static ScannerConfiguration buildScanConfig(ScanOptions options) {
- ScannerConfiguration scanConfig = new ScannerConfiguration();
-
+ public static Span getSpan(ScanOptions options) {
+ Span span = new Span();
if ((options.getExactRow() != null)
&& ((options.getStartRow() != null) || (options.getEndRow() != null) || (options
.getRowPrefix() != null))) {
@@ -87,35 +86,43 @@ public abstract class AppRunner {
// configure span of scanner
if (options.getExactRow() != null) {
- scanConfig.setSpan(Span.exact(options.getExactRow()));
+ span = Span.exact(options.getExactRow());
} else if (options.getRowPrefix() != null) {
- scanConfig.setSpan(Span.prefix(options.getRowPrefix()));
+ span = Span.prefix(options.getRowPrefix());
} else {
if ((options.getStartRow() != null) && (options.getEndRow() != null)) {
- scanConfig.setSpan(new Span(options.getStartRow(), true, options.getEndRow(), true));
+ span = new Span(options.getStartRow(), true, options.getEndRow(), true);
} else if (options.getStartRow() != null) {
- scanConfig.setSpan(new Span(Bytes.of(options.getStartRow()), true, Bytes.EMPTY, true));
+ span = new Span(Bytes.of(options.getStartRow()), true, Bytes.EMPTY, true);
} else if (options.getEndRow() != null) {
- scanConfig.setSpan(new Span(Bytes.EMPTY, true, Bytes.of(options.getEndRow()), true));
+ span = new Span(Bytes.EMPTY, true, Bytes.of(options.getEndRow()), true);
}
}
+ return span;
+ }
+
+ public static Collection<Column> getColumns(ScanOptions options) {
+ Collection<Column> columns = new HashSet<>();
+
// configure columns of scanner
for (String column : options.getColumns()) {
String[] colFields = column.split(":");
if (colFields.length == 1) {
- scanConfig.fetchColumnFamily(Bytes.of(colFields[0]));
+ columns.add(new Column(colFields[0]));
} else if (colFields.length == 2) {
- scanConfig.fetchColumn(Bytes.of(colFields[0]), Bytes.of(colFields[1]));
+ columns.add(new Column(colFields[0], colFields[1]));
} else {
throw new IllegalArgumentException("Failed to scan! Column '" + column
+ "' has too many fields (indicated by ':')");
}
}
- return scanConfig;
+ return columns;
}
+
+
public long scan(FluoConfiguration config, String[] args) {
ScanOptions options = new ScanOptions();
JCommander jcommand = new JCommander(options);
@@ -148,46 +155,46 @@ public abstract class AppRunner {
try (FluoClient client = FluoFactory.newClient(sConfig)) {
try (Snapshot s = client.newSnapshot()) {
- ScannerConfiguration scanConfig = null;
+ Span span = null;
+ Collection<Column> columns = null;
try {
- scanConfig = buildScanConfig(options);
+ span = getSpan(options);
+ columns = getColumns(options);
} catch (IllegalArgumentException e) {
System.err.println(e.getMessage());
System.exit(-1);
}
- RowIterator iter = s.get(scanConfig);
-
- if (!iter.hasNext()) {
- System.out.println("\nNo data found\n");
- }
+ CellScanner cellScanner = s.scanner().over(span).fetch(columns).build();
StringBuilder sb = new StringBuilder();
- while (iter.hasNext() && !System.out.checkError()) {
- Map.Entry<Bytes, ColumnIterator> rowEntry = iter.next();
- ColumnIterator citer = rowEntry.getValue();
- while (citer.hasNext() && !System.out.checkError()) {
- Map.Entry<Column, Bytes> colEntry = citer.next();
- if (options.hexEncNonAscii) {
- sb.setLength(0);
- Hex.encNonAscii(sb, rowEntry.getKey());
- sb.append(" ");
- Hex.encNonAscii(sb, colEntry.getKey(), " ");
- sb.append("\t");
- Hex.encNonAscii(sb, colEntry.getValue());
- System.out.println(sb.toString());
- } else {
- sb.setLength(0);
- sb.append(rowEntry.getKey());
- sb.append(" ");
- sb.append(colEntry.getKey());
- sb.append("\t");
- sb.append(colEntry.getValue());
- System.out.println(sb.toString());
- }
- entriesFound++;
+ for (RowColumnValue rcv : cellScanner) {
+ if (options.hexEncNonAscii) {
+ sb.setLength(0);
+ Hex.encNonAscii(sb, rcv.getRow());
+ sb.append(" ");
+ Hex.encNonAscii(sb, rcv.getColumn(), " ");
+ sb.append("\t");
+ Hex.encNonAscii(sb, rcv.getValue());
+ System.out.println(sb.toString());
+ } else {
+ sb.setLength(0);
+ sb.append(rcv.getsRow());
+ sb.append(" ");
+ sb.append(rcv.getColumn());
+ sb.append("\t");
+ sb.append(rcv.getsValue());
+ System.out.println(sb.toString());
+ }
+ entriesFound++;
+ if (System.out.checkError()) {
+ break;
}
}
+
+ if (entriesFound == 0) {
+ System.out.println("\nNo data found\n");
+ }
} catch (FluoException e) {
System.out.println("Scan failed - " + e.getMessage());
}
@@ -201,9 +208,11 @@ public abstract class AppRunner {
Connector conn = AccumuloUtil.getConnector(sConfig);
- ScannerConfiguration scanConfig = null;
+ Span span = null;
+ Collection<Column> columns = null;
try {
- scanConfig = buildScanConfig(options);
+ span = getSpan(options);
+ columns = getColumns(options);
} catch (IllegalArgumentException e) {
System.err.println(e.getMessage());
System.exit(-1);
@@ -213,8 +222,8 @@ public abstract class AppRunner {
try {
Scanner scanner = conn.createScanner(sConfig.getAccumuloTable(), Authorizations.EMPTY);
- scanner.setRange(SpanUtil.toRange(scanConfig.getSpan()));
- for (Column col : scanConfig.getColumns()) {
+ scanner.setRange(SpanUtil.toRange(span));
+ for (Column col : columns) {
if (col.isQualifierSet()) {
scanner
.fetchColumn(ByteUtil.toText(col.getFamily()), ByteUtil.toText(col.getQualifier()));
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/cluster/src/test/java/org/apache/fluo/cluster/runner/ScanTest.java
----------------------------------------------------------------------
diff --git a/modules/cluster/src/test/java/org/apache/fluo/cluster/runner/ScanTest.java b/modules/cluster/src/test/java/org/apache/fluo/cluster/runner/ScanTest.java
index 2d7c17d..a84e8ea 100644
--- a/modules/cluster/src/test/java/org/apache/fluo/cluster/runner/ScanTest.java
+++ b/modules/cluster/src/test/java/org/apache/fluo/cluster/runner/ScanTest.java
@@ -16,10 +16,10 @@
package org.apache.fluo.cluster.runner;
import com.beust.jcommander.JCommander;
-import org.apache.fluo.api.config.ScannerConfiguration;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.api.data.Span;
+import org.apache.fluo.core.impl.SnapshotScanner;
import org.junit.Assert;
import org.junit.Test;
@@ -28,16 +28,16 @@ import org.junit.Test;
*/
public class ScanTest {
- private ScannerConfiguration parseArgs(String args) {
+ private SnapshotScanner.Opts parseArgs(String args) {
ScanOptions options = new ScanOptions();
JCommander jcommand = new JCommander(options);
jcommand.parse(args.split(" "));
- return AppRunner.buildScanConfig(options);
+ return new SnapshotScanner.Opts(AppRunner.getSpan(options), AppRunner.getColumns(options));
}
@Test
public void testValidInput() {
- ScannerConfiguration config;
+ SnapshotScanner.Opts config;
config = parseArgs("");
Assert.assertEquals(RowColumn.EMPTY, config.getSpan().getStart());
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/core/src/main/java/org/apache/fluo/core/impl/ColumnIteratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/ColumnIteratorImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/ColumnIteratorImpl.java
deleted file mode 100644
index 266df69..0000000
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/ColumnIteratorImpl.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.core.impl;
-
-import java.util.Iterator;
-import java.util.Map.Entry;
-
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.core.util.ByteUtil;
-
-/**
- * Implementation of Column Iterator
- */
-public class ColumnIteratorImpl implements ColumnIterator {
-
- private Iterator<Entry<Key, Value>> scanner;
- private Entry<Key, Value> firstEntry;
-
- ColumnIteratorImpl(Iterator<Entry<Key, Value>> scanner) {
- this(null, scanner);
- }
-
- ColumnIteratorImpl(Entry<Key, Value> firstEntry, Iterator<Entry<Key, Value>> cols) {
- this.firstEntry = firstEntry;
- this.scanner = cols;
- }
-
- @Override
- public boolean hasNext() {
- return firstEntry != null || scanner.hasNext();
- }
-
- // TODO create custom class to return instead of entry
- @Override
- public Entry<Column, Bytes> next() {
- Entry<Key, Value> entry;
- if (firstEntry != null) {
- entry = firstEntry;
- firstEntry = null;
- } else {
- entry = scanner.next();
- }
- final Bytes cf = ByteUtil.toBytes(entry.getKey().getColumnFamilyData());
- final Bytes cq = ByteUtil.toBytes(entry.getKey().getColumnQualifierData());
- final Bytes cv = ByteUtil.toBytes(entry.getKey().getColumnVisibilityData());
-
- final Column col = new Column(cf, cq, cv);
- final Bytes val = Bytes.of(entry.getValue().get());
-
- return new Entry<Column, Bytes>() {
-
- @Override
- public Bytes setValue(Bytes value) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Bytes getValue() {
- return val;
- }
-
- @Override
- public Column getKey() {
- return col;
- }
- };
- }
-
- @Override
- public void remove() {
- scanner.remove();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/core/src/main/java/org/apache/fluo/core/impl/RowIteratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/RowIteratorImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/RowIteratorImpl.java
deleted file mode 100644
index efdefeb..0000000
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/RowIteratorImpl.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.core.impl;
-
-import java.util.Iterator;
-import java.util.Map.Entry;
-
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
-
-/**
- * Implementation of RowIterator
- */
-public class RowIteratorImpl implements RowIterator {
-
- private final org.apache.accumulo.core.client.RowIterator rowIter;
-
- RowIteratorImpl(Iterator<Entry<Key, Value>> scanner) {
- rowIter = new org.apache.accumulo.core.client.RowIterator(scanner);
- }
-
- @Override
- public boolean hasNext() {
- return rowIter.hasNext();
- }
-
- // TODO create custom class to return instead of entry
- @Override
- public Entry<Bytes, ColumnIterator> next() {
- Iterator<Entry<Key, Value>> cols = rowIter.next();
-
- Entry<Key, Value> entry = cols.next();
-
- final Bytes row = Bytes.of(entry.getKey().getRowData().toArray());
- final ColumnIterator coliter = new ColumnIteratorImpl(entry, cols);
-
- return new Entry<Bytes, ColumnIterator>() {
-
- @Override
- public Bytes getKey() {
- return row;
- }
-
- @Override
- public ColumnIterator getValue() {
- return coliter;
- }
-
- @Override
- public ColumnIterator setValue(ColumnIterator value) {
- throw new UnsupportedOperationException();
- }
- };
-
- }
-
- @Override
- public void remove() {
- rowIter.remove();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java b/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
index 2b9959d..cd2e008 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
@@ -16,12 +16,13 @@
package org.apache.fluo.core.impl;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
-import java.util.Set;
+import com.google.common.collect.ImmutableSet;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.ScannerBase;
@@ -30,7 +31,6 @@ import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.fluo.accumulo.iterators.SnapshotIterator;
import org.apache.fluo.accumulo.util.ColumnConstants;
-import org.apache.fluo.api.config.ScannerConfiguration;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.api.data.Span;
@@ -41,45 +41,41 @@ import org.apache.fluo.core.util.UtilWaitThread;
/**
* Allows users to iterate over entries of a {@link org.apache.fluo.api.client.Snapshot}
*/
-public class SnapshotScanner implements Iterator<Entry<Key, Value>> {
+public class SnapshotScanner implements Iterable<Entry<Key, Value>> {
+
+ /**
+ * Immutable options for a SnapshotScanner
+ */
+ public static final class Opts {
+ private final Span span;
+ private final Collection<Column> columns;
+
+ public Opts(Span span, Collection<Column> columns) {
+ this.span = span;
+ this.columns = ImmutableSet.copyOf(columns);
+ }
+
+ public Span getSpan() {
+ return span;
+ }
+
+ public Collection<Column> getColumns() {
+ return columns;
+ }
+ }
private final long startTs;
private final Environment env;
private final TxStats stats;
-
- private Iterator<Entry<Key, Value>> iterator;
- private Entry<Key, Value> next;
- private ScannerConfiguration config;
+ private final Opts config;
static final long INITIAL_WAIT_TIME = 50;
// TODO make configurable
static final long MAX_WAIT_TIME = 60000;
- public SnapshotScanner(Environment env, ScannerConfiguration config, long startTs, TxStats stats) {
- this.env = env;
- this.config = config;
- this.startTs = startTs;
- this.stats = stats;
- setUpIterator();
- }
- private void setUpIterator() {
- Scanner scanner;
- try {
- scanner = env.getConnector().createScanner(env.getTable(), env.getAuthorizations());
- } catch (TableNotFoundException e) {
- throw new RuntimeException(e);
- }
- scanner.clearColumns();
- scanner.clearScanIterators();
- scanner.setRange(SpanUtil.toRange(config.getSpan()));
-
- setupScanner(scanner, config.getColumns(), startTs);
- this.iterator = scanner.iterator();
- }
-
- static void setupScanner(ScannerBase scanner, Set<Column> columns, long startTs) {
+ static void setupScanner(ScannerBase scanner, Collection<Column> columns, long startTs) {
for (Column col : columns) {
if (col.isQualifierSet()) {
scanner.fetchColumn(ByteUtil.toText(col.getFamily()), ByteUtil.toText(col.getQualifier()));
@@ -93,119 +89,153 @@ public class SnapshotScanner implements Iterator<Entry<Key, Value>> {
scanner.addScanIterator(iterConf);
}
- @Override
- public boolean hasNext() {
- if (next == null) {
- next = getNext();
+ private class SnapIter implements Iterator<Entry<Key, Value>> {
+
+ private Iterator<Entry<Key, Value>> iterator;
+ private Entry<Key, Value> next;
+ private Opts snapIterConfig;
+
+ SnapIter(Opts config) {
+ this.snapIterConfig = config;
+ setUpIterator();
}
- return next != null;
- }
+ private void setUpIterator() {
+ Scanner scanner;
+ try {
+ scanner = env.getConnector().createScanner(env.getTable(), env.getAuthorizations());
+ } catch (TableNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ scanner.clearColumns();
+ scanner.clearScanIterators();
+ scanner.setRange(SpanUtil.toRange(snapIterConfig.getSpan()));
- @Override
- public Entry<Key, Value> next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
+ setupScanner(scanner, snapIterConfig.getColumns(), startTs);
+
+ this.iterator = scanner.iterator();
}
- Entry<Key, Value> tmp = next;
- next = null;
- return tmp;
- }
+ @Override
+ public boolean hasNext() {
+ if (next == null) {
+ next = getNext();
+ }
- private void resetScanner(Span span) {
- try {
- config = (ScannerConfiguration) config.clone();
- } catch (CloneNotSupportedException e) {
- throw new RuntimeException(e);
+ return next != null;
}
- config.setSpan(span);
- setUpIterator();
- }
+ @Override
+ public Entry<Key, Value> next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ Entry<Key, Value> tmp = next;
+ next = null;
+ return tmp;
+ }
- public void resolveLock(Entry<Key, Value> lockEntry) {
+ private void resetScanner(Span span) {
+ snapIterConfig = new Opts(span, snapIterConfig.columns);
+ setUpIterator();
+ }
- // read ahead a little bit looking for other locks to resolve
+ public void resolveLock(Entry<Key, Value> lockEntry) {
- long startTime = System.currentTimeMillis();
- long waitTime = INITIAL_WAIT_TIME;
+ // read ahead a little bit looking for other locks to resolve
- List<Entry<Key, Value>> locks = new ArrayList<>();
- locks.add(lockEntry);
- int amountRead = 0;
- int numRead = 0;
+ long startTime = System.currentTimeMillis();
+ long waitTime = INITIAL_WAIT_TIME;
- RowColumn origEnd = config.getSpan().getEnd();
- boolean isEndInclusive = config.getSpan().isEndInclusive();
+ List<Entry<Key, Value>> locks = new ArrayList<>();
+ locks.add(lockEntry);
+ int amountRead = 0;
+ int numRead = 0;
- while (true) {
- while (iterator.hasNext()) {
- Entry<Key, Value> entry = iterator.next();
+ RowColumn origEnd = snapIterConfig.getSpan().getEnd();
+ boolean isEndInclusive = snapIterConfig.getSpan().isEndInclusive();
- long colType = entry.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
+ while (true) {
+ while (iterator.hasNext()) {
+ Entry<Key, Value> entry = iterator.next();
- if (colType == ColumnConstants.LOCK_PREFIX) {
- locks.add(entry);
- }
+ long colType = entry.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
- amountRead += entry.getKey().getSize() + entry.getValue().getSize();
- numRead++;
+ if (colType == ColumnConstants.LOCK_PREFIX) {
+ locks.add(entry);
+ }
- if (numRead > 100 || amountRead > 1 << 12) {
- break;
+ amountRead += entry.getKey().getSize() + entry.getValue().getSize();
+ numRead++;
+
+ if (numRead > 100 || amountRead > 1 << 12) {
+ break;
+ }
}
- }
- boolean resolvedLocks = LockResolver.resolveLocks(env, startTs, stats, locks, startTime);
+ boolean resolvedLocks = LockResolver.resolveLocks(env, startTs, stats, locks, startTime);
- if (!resolvedLocks) {
- UtilWaitThread.sleep(waitTime);
- stats.incrementLockWaitTime(waitTime);
- waitTime = Math.min(MAX_WAIT_TIME, waitTime * 2);
+ if (!resolvedLocks) {
+ UtilWaitThread.sleep(waitTime);
+ stats.incrementLockWaitTime(waitTime);
+ waitTime = Math.min(MAX_WAIT_TIME, waitTime * 2);
- RowColumn start = SpanUtil.toRowColumn(locks.get(0).getKey());
- RowColumn end = SpanUtil.toRowColumn(locks.get(locks.size() - 1).getKey()).following();
+ RowColumn start = SpanUtil.toRowColumn(locks.get(0).getKey());
+ RowColumn end = SpanUtil.toRowColumn(locks.get(locks.size() - 1).getKey()).following();
- resetScanner(new Span(start, true, end, false));
+ resetScanner(new Span(start, true, end, false));
- locks.clear();
+ locks.clear();
- } else {
- break;
+ } else {
+ break;
+ }
}
- }
- RowColumn start = SpanUtil.toRowColumn(lockEntry.getKey());
+ RowColumn start = SpanUtil.toRowColumn(lockEntry.getKey());
- resetScanner(new Span(start, true, origEnd, isEndInclusive));
- }
+ resetScanner(new Span(start, true, origEnd, isEndInclusive));
+ }
- public Entry<Key, Value> getNext() {
- mloop: while (true) {
- // its possible a next could exist then be rolled back
- if (!iterator.hasNext()) {
- return null;
- }
+ public Entry<Key, Value> getNext() {
+ mloop: while (true) {
+ // its possible a next could exist then be rolled back
+ if (!iterator.hasNext()) {
+ return null;
+ }
- Entry<Key, Value> entry = iterator.next();
+ Entry<Key, Value> entry = iterator.next();
- long colType = entry.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
+ long colType = entry.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
- if (colType == ColumnConstants.LOCK_PREFIX) {
- resolveLock(entry);
- continue mloop;
- } else if (colType == ColumnConstants.DATA_PREFIX) {
- stats.incrementEntriesReturned(1);
- return entry;
- } else {
- throw new IllegalArgumentException("Unexpected column type " + colType);
+ if (colType == ColumnConstants.LOCK_PREFIX) {
+ resolveLock(entry);
+ continue mloop;
+ } else if (colType == ColumnConstants.DATA_PREFIX) {
+ stats.incrementEntriesReturned(1);
+ return entry;
+ } else {
+ throw new IllegalArgumentException("Unexpected column type " + colType);
+ }
}
}
+
+ @Override
+ public void remove() {
+ iterator.remove();
+ }
+ }
+
+ SnapshotScanner(Environment env, Opts config, long startTs, TxStats stats) {
+ this.env = env;
+ this.config = config;
+ this.startTs = startTs;
+ this.stats = stats;
}
@Override
- public void remove() {
- iterator.remove();
+ public Iterator<Entry<Key, Value>> iterator() {
+ return new SnapIter(config);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index 3b0b85c..c4c429e 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -29,6 +29,7 @@ import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@@ -53,22 +54,23 @@ import org.apache.fluo.accumulo.util.ColumnConstants;
import org.apache.fluo.accumulo.values.DelLockValue;
import org.apache.fluo.accumulo.values.LockValue;
import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.client.scanner.ScannerBuilder;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.ColumnValue;
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.api.data.Span;
import org.apache.fluo.api.exceptions.AlreadySetException;
import org.apache.fluo.api.exceptions.CommitException;
import org.apache.fluo.api.exceptions.FluoException;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
import org.apache.fluo.core.async.AsyncCommitObserver;
import org.apache.fluo.core.async.AsyncConditionalWriter;
import org.apache.fluo.core.async.AsyncTransaction;
import org.apache.fluo.core.async.SyncCommitObserver;
import org.apache.fluo.core.exceptions.AlreadyAcknowledgedException;
import org.apache.fluo.core.exceptions.StaleScanException;
+import org.apache.fluo.core.impl.scanner.ColumnScannerImpl;
+import org.apache.fluo.core.impl.scanner.ScannerBuilderImpl;
import org.apache.fluo.core.oracle.Stamp;
import org.apache.fluo.core.util.ColumnUtil;
import org.apache.fluo.core.util.ConditionalFlutation;
@@ -207,36 +209,46 @@ public class TransactionImpl implements AsyncTransaction, Snapshot {
return ret;
}
- @Override
- public RowIterator get(ScannerConfiguration config) {
- checkIfOpen();
- return getImpl(config);
- }
-
private Map<Column, Bytes> getImpl(Bytes row, Set<Column> columns) {
// TODO push visibility filtering to server side?
env.getSharedResources().getVisCache().validate(columns);
- ScannerConfiguration config = new ScannerConfiguration();
- config.setSpan(Span.exact(row));
+ boolean shouldCopy = false;
+
for (Column column : columns) {
- config.fetchColumn(column.getFamily(), column.getQualifier());
+ if (column.isVisibilitySet()) {
+ shouldCopy = true;
+ }
}
- RowIterator iter = getImpl(config);
+ SnapshotScanner.Opts opts;
+ if (shouldCopy) {
+ HashSet<Column> cols = new HashSet<Column>();
+ for (Column column : columns) {
+ if (column.isVisibilitySet()) {
+ cols.add(new Column(column.getFamily(), column.getQualifier()));
+ } else {
+ cols.add(column);
+ }
+ }
+ opts = new SnapshotScanner.Opts(Span.exact(row), columns);
+ } else {
+ opts = new SnapshotScanner.Opts(Span.exact(row), columns);
+ }
Map<Column, Bytes> ret = new HashMap<>();
- while (iter.hasNext()) {
- Entry<Bytes, ColumnIterator> entry = iter.next();
- ColumnIterator citer = entry.getValue();
- while (citer.hasNext()) {
- Entry<Column, Bytes> centry = citer.next();
- if (columns.contains(centry.getKey())) {
- ret.put(centry.getKey(), centry.getValue());
+ Iterable<ColumnValue> scanner =
+ Iterables.transform(new SnapshotScanner(env, opts, startTs, stats), ColumnScannerImpl.E2CV);
+ for (ColumnValue cv : scanner) {
+ if (shouldCopy) {
+ if (columns.contains(cv.getColumn())) {
+ ret.put(cv.getColumn(), cv.getValue());
}
+ } else {
+ ret.put(cv.getColumn(), cv.getValue());
}
}
@@ -246,8 +258,10 @@ public class TransactionImpl implements AsyncTransaction, Snapshot {
return ret;
}
- private RowIterator getImpl(ScannerConfiguration config) {
- return new RowIteratorImpl(new SnapshotScanner(this.env, config, startTs, stats));
+ @Override
+ public ScannerBuilder scanner() {
+ checkIfOpen();
+ return new ScannerBuilderImpl(this);
}
private void updateColumnsRead(Bytes row, Set<Column> columns) {
@@ -1193,4 +1207,7 @@ public class TransactionImpl implements AsyncTransaction, Snapshot {
cd.commitObserver.committed();
}
+ public SnapshotScanner newSnapshotScanner(Span span, Collection<Column> columns) {
+ return new SnapshotScanner(env, new SnapshotScanner.Opts(span, columns), startTs, stats);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/CellScannerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/CellScannerImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/CellScannerImpl.java
new file mode 100644
index 0000000..06e68d6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/CellScannerImpl.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.impl.scanner;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.fluo.api.client.scanner.CellScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumnValue;
+import org.apache.fluo.core.util.ByteUtil;
+
+public class CellScannerImpl implements CellScanner {
+
+ private Iterable<Entry<Key, Value>> snapshot;
+
+ private static final Function<Entry<Key, Value>, RowColumnValue> E2RCV =
+ new Function<Entry<Key, Value>, RowColumnValue>() {
+ @Override
+ public RowColumnValue apply(Entry<Key, Value> entry) {
+ Bytes row = ByteUtil.toBytes(entry.getKey().getRowData());
+ Bytes cf = ByteUtil.toBytes(entry.getKey().getColumnFamilyData());
+ Bytes cq = ByteUtil.toBytes(entry.getKey().getColumnQualifierData());
+ Bytes cv = ByteUtil.toBytes(entry.getKey().getColumnVisibilityData());
+ Column col = new Column(cf, cq, cv);
+ Bytes val = Bytes.of(entry.getValue().get());
+ return new RowColumnValue(row, col, val);
+ }
+ };
+
+ CellScannerImpl(Iterable<Entry<Key, Value>> snapshot) {
+ this.snapshot = snapshot;
+ }
+
+ @Override
+ public Iterator<RowColumnValue> iterator() {
+ return Iterators.transform(snapshot.iterator(), E2RCV);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ColumnScannerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ColumnScannerImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ColumnScannerImpl.java
new file mode 100644
index 0000000..c85dfeb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ColumnScannerImpl.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.impl.scanner;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.ColumnValue;
+import org.apache.fluo.core.util.ByteUtil;
+
+public class ColumnScannerImpl implements ColumnScanner {
+
+ public static final Function<Entry<Key, Value>, ColumnValue> E2CV =
+ new Function<Entry<Key, Value>, ColumnValue>() {
+ @Override
+ public ColumnValue apply(Entry<Key, Value> entry) {
+ Bytes cf = ByteUtil.toBytes(entry.getKey().getColumnFamilyData());
+ Bytes cq = ByteUtil.toBytes(entry.getKey().getColumnQualifierData());
+ Bytes cv = ByteUtil.toBytes(entry.getKey().getColumnVisibilityData());
+ Column col = new Column(cf, cq, cv);
+ Bytes val = Bytes.of(entry.getValue().get());
+ return new ColumnValue(col, val);
+ }
+ };
+
+ private PeekingIterator<Entry<Key, Value>> peekingIter;
+ private Bytes row;
+ private Iterator<ColumnValue> iter;
+ private boolean gotIter = false;
+
+ ColumnScannerImpl(Iterator<Entry<Key, Value>> e) {
+ peekingIter = Iterators.peekingIterator(e);
+ row = ByteUtil.toBytes(peekingIter.peek().getKey().getRowData());
+ iter = Iterators.transform(peekingIter, E2CV);
+ }
+
+ @Override
+ public Iterator<ColumnValue> iterator() {
+ Preconditions.checkState(!gotIter,
+ "Unfortunately this implementation only support getting the iterator once");
+ gotIter = true;
+ return iter;
+ }
+
+ @Override
+ public Bytes getRow() {
+ return row;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/RowScannerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/RowScannerImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/RowScannerImpl.java
new file mode 100644
index 0000000..4a9eb38
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/RowScannerImpl.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.impl.scanner;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import org.apache.accumulo.core.client.RowIterator;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+
+public class RowScannerImpl implements RowScanner {
+
+ private Iterable<Entry<Key, Value>> snapshot;
+
+ private static final Function<Iterator<Entry<Key, Value>>, ColumnScanner> RI2CS =
+ new Function<Iterator<Entry<Key, Value>>, ColumnScanner>() {
+ @Override
+ public ColumnScanner apply(Iterator<Entry<Key, Value>> input) {
+ return new ColumnScannerImpl(input);
+ }
+ };
+
+ RowScannerImpl(Iterable<Entry<Key, Value>> snapshot) {
+ this.snapshot = snapshot;
+ }
+
+ @Override
+ public Iterator<ColumnScanner> iterator() {
+ RowIterator rowiter = new RowIterator(snapshot.iterator());
+ return Iterators.transform(rowiter, RI2CS);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ScannerBuilderImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ScannerBuilderImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ScannerBuilderImpl.java
new file mode 100644
index 0000000..8c833d5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ScannerBuilderImpl.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.impl.scanner;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import org.apache.fluo.api.client.scanner.CellScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.client.scanner.RowScannerBuilder;
+import org.apache.fluo.api.client.scanner.ScannerBuilder;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+import org.apache.fluo.core.impl.SnapshotScanner;
+import org.apache.fluo.core.impl.TransactionImpl;
+
+public class ScannerBuilderImpl implements ScannerBuilder {
+
+ private static final Span EMPTY_SPAN = new Span();
+
+ private TransactionImpl tx;
+ private Span span = EMPTY_SPAN;
+ private Collection<Column> columns = Collections.emptyList();
+
+ public ScannerBuilderImpl(TransactionImpl tx) {
+ this.tx = tx;
+ }
+
+ @Override
+ public ScannerBuilder over(Span span) {
+ Objects.requireNonNull(span);
+ this.span = span;
+ return this;
+ }
+
+ private void setColumns(Collection<Column> columns) {
+ for (Column column : columns) {
+ Preconditions.checkArgument(!column.isVisibilitySet(),
+ "Fetching visibility is not currently supported");
+ }
+ this.columns = columns;
+ }
+
+ @Override
+ public ScannerBuilder fetch(Collection<Column> columns) {
+ Objects.requireNonNull(columns);
+ setColumns(ImmutableSet.copyOf(columns));
+ return this;
+ }
+
+ @Override
+ public ScannerBuilder fetch(Column... columns) {
+ Objects.requireNonNull(columns);
+ setColumns(ImmutableSet.copyOf(columns));
+ return this;
+ }
+
+ @Override
+ public CellScanner build() {
+ SnapshotScanner snapScanner = tx.newSnapshotScanner(span, columns);
+ return new CellScannerImpl(snapScanner);
+ }
+
+ @Override
+ public RowScannerBuilder byRow() {
+ return new RowScannerBuilder() {
+ @Override
+ public RowScanner build() {
+ SnapshotScanner snapScanner = tx.newSnapshotScanner(span, columns);
+ return new RowScannerImpl(snapScanner);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java b/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
index da6b8dd..fe5c21d 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
@@ -23,14 +23,13 @@ import java.util.Set;
import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.scanner.ScannerBuilder;
import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ScannerConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.api.exceptions.AlreadySetException;
import org.apache.fluo.api.exceptions.CommitException;
-import org.apache.fluo.api.iterator.RowIterator;
import org.apache.fluo.core.async.AsyncCommitObserver;
import org.apache.fluo.core.async.AsyncTransaction;
import org.apache.fluo.core.impl.Notification;
@@ -179,10 +178,10 @@ public class TracingTransaction implements AsyncTransaction, Snapshot {
}
@Override
- public RowIterator get(ScannerConfiguration config) {
+ public ScannerBuilder scanner() {
// TODO log something better (see fluo-425)
- log.trace("txid: {} get(ScannerConfiguration)", txid);
- return tx.get(config);
+ log.trace("txid: {} newScanner()", txid);
+ return tx.scanner();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/core/src/test/java/org/apache/fluo/core/config/ScannerConfigurationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/core/config/ScannerConfigurationTest.java b/modules/core/src/test/java/org/apache/fluo/core/config/ScannerConfigurationTest.java
deleted file mode 100644
index f3cc7af..0000000
--- a/modules/core/src/test/java/org/apache/fluo/core/config/ScannerConfigurationTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.core.config;
-
-import org.apache.fluo.api.config.ScannerConfiguration;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.Span;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Unit test for ScannerConfiguration class
- */
-public class ScannerConfigurationTest {
-
- @Test
- public void testSetGet() {
-
- ScannerConfiguration config = new ScannerConfiguration();
- Assert.assertEquals(new Span(), config.getSpan());
- Assert.assertEquals(0, config.getColumns().size());
-
- config = new ScannerConfiguration();
- config.setSpan(Span.exact("row1"));
- Assert.assertEquals(Span.exact("row1"), config.getSpan());
- Assert.assertEquals(0, config.getColumns().size());
-
- config = new ScannerConfiguration();
- config.fetchColumnFamily(Bytes.of("cf1"));
- Assert.assertEquals(1, config.getColumns().size());
- Assert.assertEquals(new Column("cf1"), config.getColumns().iterator().next());
-
- config = new ScannerConfiguration();
- config.fetchColumn(Bytes.of("cf2"), Bytes.of("cq2"));
- Assert.assertEquals(1, config.getColumns().size());
- Assert.assertEquals(new Column("cf2", "cq2"), config.getColumns().iterator().next());
-
- config = new ScannerConfiguration();
- config.fetchColumnFamily(Bytes.of("a"));
- config.fetchColumnFamily(Bytes.of("b"));
- config.fetchColumnFamily(Bytes.of("a"));
- Assert.assertEquals(2, config.getColumns().size());
-
- config.clearColumns();
- Assert.assertEquals(0, config.getColumns().size());
- }
-
- @Test
- public void testNullSet() {
-
- ScannerConfiguration config = new ScannerConfiguration();
-
- try {
- config.setSpan(null);
- Assert.fail();
- } catch (NullPointerException e) {
- }
-
- try {
- config.fetchColumnFamily(null);
- Assert.fail();
- } catch (NullPointerException e) {
- }
-
- try {
- config.fetchColumn(null, Bytes.of("qual"));
- Assert.fail();
- } catch (NullPointerException e) {
- }
-
- try {
- config.fetchColumn(Bytes.of("fam"), null);
- Assert.fail();
- } catch (NullPointerException e) {
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java b/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java
index 4cdf146..5e2a7f9 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java
@@ -18,7 +18,6 @@ package org.apache.fluo.integration;
import java.io.File;
import java.util.Collections;
import java.util.List;
-import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.accumulo.core.client.Connector;
@@ -32,11 +31,7 @@ import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.ObserverConfiguration;
-import org.apache.fluo.api.config.ScannerConfiguration;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
+import org.apache.fluo.api.data.RowColumnValue;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -99,18 +94,12 @@ public class ITBase {
protected void printSnapshot() throws Exception {
try (Snapshot s = client.newSnapshot()) {
- RowIterator iter = s.get(new ScannerConfiguration());
-
System.out.println("== snapshot start ==");
- while (iter.hasNext()) {
- Entry<Bytes, ColumnIterator> rowEntry = iter.next();
- ColumnIterator citer = rowEntry.getValue();
- while (citer.hasNext()) {
- Entry<Column, Bytes> colEntry = citer.next();
- System.out.println(rowEntry.getKey() + " " + colEntry.getKey() + "\t"
- + colEntry.getValue());
- }
+
+ for (RowColumnValue rcv : s.scanner().build()) {
+ System.out.println(rcv.getRow() + " " + rcv.getColumn() + "\t" + rcv.getValue());
}
+
System.out.println("=== snapshot end ===");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseImpl.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseImpl.java b/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseImpl.java
index e2ed47a..3a541d9 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseImpl.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseImpl.java
@@ -77,6 +77,7 @@ public class ITBaseImpl extends ITBase {
config.setTransactionRollbackTime(1, TimeUnit.SECONDS);
config.addObservers(getObservers());
config.setProperty(FluoConfigurationImpl.ZK_UPDATE_PERIOD_PROP, "1000");
+ config.setMiniStartAccumulo(false);
try (FluoAdmin admin = FluoFactory.newAdmin(config)) {
InitOpts opts = new InitOpts().setClearZookeeper(true).setClearTable(true);
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java b/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
index b64cda5..ceda193 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
@@ -31,14 +31,13 @@ import org.apache.fluo.accumulo.iterators.NotificationIterator;
import org.apache.fluo.accumulo.util.ColumnConstants;
import org.apache.fluo.accumulo.util.NotificationUtil;
import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.client.scanner.ScannerBuilder;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.api.data.Span;
import org.apache.fluo.api.exceptions.AlreadySetException;
import org.apache.fluo.api.exceptions.CommitException;
-import org.apache.fluo.api.iterator.RowIterator;
import org.apache.fluo.core.exceptions.AlreadyAcknowledgedException;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.Notification;
@@ -205,8 +204,8 @@ public class TestTransaction implements TransactionBase {
}
@Override
- public RowIterator get(ScannerConfiguration config) {
- return tx.get(config);
+ public ScannerBuilder scanner() {
+ return tx.scanner();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
index 321374b..086f1d9 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
@@ -29,15 +29,14 @@ import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.client.scanner.CellScanner;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.ObserverConfiguration;
-import org.apache.fluo.api.config.ScannerConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumnValue;
import org.apache.fluo.api.data.Span;
import org.apache.fluo.api.exceptions.CommitException;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
import org.apache.fluo.api.observer.AbstractObserver;
import org.apache.fluo.core.exceptions.AlreadyAcknowledgedException;
import org.apache.fluo.core.impl.Environment;
@@ -469,14 +468,11 @@ public class FluoIT extends ITBaseImpl {
tx3.done();
HashSet<Column> columns = new HashSet<>();
- RowIterator riter =
- tx2.get(new ScannerConfiguration().setSpan(Span.exact(Bytes.of("d00001"),
- new Column(Bytes.of("outlink")))));
- while (riter.hasNext()) {
- ColumnIterator citer = riter.next().getValue();
- while (citer.hasNext()) {
- columns.add(citer.next().getKey());
- }
+
+ CellScanner cellScanner =
+ tx2.scanner().over(Span.exact(Bytes.of("d00001"))).fetch(new Column("outlink")).build();
+ for (RowColumnValue rcv : cellScanner) {
+ columns.add(rcv.getColumn());
}
tx2.done();
@@ -490,15 +486,12 @@ public class FluoIT extends ITBaseImpl {
TestTransaction tx4 = new TestTransaction(env);
columns.clear();
- riter =
- tx4.get(new ScannerConfiguration().setSpan(Span.exact(Bytes.of("d00001"),
- new Column(Bytes.of("outlink")))));
- while (riter.hasNext()) {
- ColumnIterator citer = riter.next().getValue();
- while (citer.hasNext()) {
- columns.add(citer.next().getKey());
- }
+ cellScanner =
+ tx4.scanner().over(Span.exact(Bytes.of("d00001"))).fetch(new Column("outlink")).build();
+ for (RowColumnValue rcv : cellScanner) {
+ columns.add(rcv.getColumn());
}
+
expected.add(new Column("outlink", "http://z.com"));
expected.remove(new Column("outlink", "http://b.com"));
Assert.assertEquals(expected, columns);
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/integration/src/test/java/org/apache/fluo/integration/impl/ScannerIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ScannerIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ScannerIT.java
new file mode 100644
index 0000000..bdf473d
--- /dev/null
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ScannerIT.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.integration.impl;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterables;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.client.scanner.CellScanner;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.ColumnValue;
+import org.apache.fluo.api.data.RowColumnValue;
+import org.apache.fluo.api.data.Span;
+import org.apache.fluo.integration.ITBaseImpl;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ScannerIT extends ITBaseImpl {
+
+ static class ColumnPredicate implements Predicate<RowColumnValue> {
+ Column c;
+
+ ColumnPredicate(Column c) {
+ this.c = c;
+ }
+
+ @Override
+ public boolean apply(RowColumnValue input) {
+ return input.getColumn().equals(c);
+ }
+ }
+
+ static class FamilyPredicate implements Predicate<RowColumnValue> {
+ Bytes fam;
+
+ FamilyPredicate(String family) {
+ this.fam = Bytes.of(family);
+ }
+
+ @Override
+ public boolean apply(RowColumnValue input) {
+ return input.getColumn().getFamily().equals(fam);
+ }
+ }
+
+ static class RowPredicate implements Predicate<RowColumnValue> {
+ Bytes row;
+
+ RowPredicate(String row) {
+ this.row = Bytes.of(row);
+ }
+
+ @Override
+ public boolean apply(RowColumnValue input) {
+ return input.getRow().equals(row);
+ }
+ }
+
+ @Test
+ public void testFiltering() {
+ Set<RowColumnValue> expected = genData();
+
+ HashSet<RowColumnValue> expectedR2 = new HashSet<>();
+ Iterables.addAll(expectedR2, Iterables.filter(expected, new RowPredicate("r2")));
+ Assert.assertEquals(2, expectedR2.size());
+
+
+ HashSet<RowColumnValue> expectedR2c = new HashSet<>();
+ Iterables.addAll(
+ expectedR2c,
+ Iterables.filter(expected,
+ Predicates.and(new RowPredicate("r2"), new ColumnPredicate(new Column("f1", "q2")))));
+ Assert.assertEquals(1, expectedR2c.size());
+
+ HashSet<RowColumnValue> expectedC = new HashSet<>();
+ Iterables.addAll(expectedC,
+ Iterables.filter(expected, new ColumnPredicate(new Column("f1", "q1"))));
+ Assert.assertEquals(2, expectedC.size());
+
+ HashSet<RowColumnValue> expectedCF = new HashSet<>();
+ Iterables.addAll(expectedCF, Iterables.filter(expected, new FamilyPredicate("f2")));
+ Assert.assertEquals(2, expectedCF.size());
+
+ HashSet<RowColumnValue> expectedCols = new HashSet<>();
+ Iterables.addAll(expectedCols, Iterables.filter(expected, Predicates.or(new ColumnPredicate(
+ new Column("f2", "q5")), new ColumnPredicate(new Column("f1", "q1")))));
+ Assert.assertEquals(3, expectedCols.size());
+
+ try (Snapshot snap = client.newSnapshot()) {
+ HashSet<RowColumnValue> actual = new HashSet<>();
+ Iterables.addAll(actual, snap.scanner().over(Span.exact("r2")).build());
+ Assert.assertEquals(expectedR2, actual);
+
+ actual.clear();
+ Iterables.addAll(actual, snap.scanner().over(Span.exact("r2")).fetch(new Column("f1", "q2"))
+ .build());
+ Assert.assertEquals(expectedR2c, actual);
+
+ actual.clear();
+ Iterables.addAll(actual, snap.scanner().fetch(new Column("f1", "q1")).build());
+ Assert.assertEquals(expectedC, actual);
+
+ actual.clear();
+ Iterables.addAll(actual, snap.scanner().fetch(new Column("f2")).build());
+ Assert.assertEquals(expectedCF, actual);
+
+ actual.clear();
+ Iterables.addAll(actual, snap.scanner().fetch(new Column("f2", "q5"), new Column("f1", "q1"))
+ .build());
+ Assert.assertEquals(expectedCols, actual);
+ }
+
+ }
+
+ @Test
+ public void testMultipleIteratorsFromSameRowScanner() {
+ Set<RowColumnValue> expected = genData();
+
+ try (Snapshot snap = client.newSnapshot()) {
+ RowScanner rowScanner = snap.scanner().byRow().build();
+
+ Iterator<ColumnScanner> iter1 = rowScanner.iterator();
+ Iterator<ColumnScanner> iter2 = rowScanner.iterator();
+
+ HashSet<RowColumnValue> actual1 = new HashSet<>();
+ HashSet<RowColumnValue> actual2 = new HashSet<>();
+
+ while (iter1.hasNext()) {
+ ColumnScanner cs1 = iter1.next();
+
+ Assert.assertTrue(iter2.hasNext());
+ ColumnScanner cs2 = iter2.next();
+
+ for (ColumnValue cv : cs1) {
+ actual1.add(new RowColumnValue(cs1.getRow(), cv.getColumn(), cv.getValue()));
+ }
+
+ for (ColumnValue cv : cs2) {
+ actual2.add(new RowColumnValue(cs2.getRow(), cv.getColumn(), cv.getValue()));
+ }
+ }
+
+ Assert.assertFalse(iter2.hasNext());
+
+ Assert.assertEquals(expected, actual1);
+ Assert.assertEquals(expected, actual2);
+ }
+ }
+
+ @Test
+ public void testMultipleIteratorsFromSameIterable() {
+
+ Set<RowColumnValue> expected = genData();
+
+ try (Snapshot snap = client.newSnapshot()) {
+ CellScanner cellScanner = snap.scanner().build();
+ // grab two iterators from same iterable and iterator over them in interleaved fashion
+ Iterator<RowColumnValue> iter1 = cellScanner.iterator();
+ Iterator<RowColumnValue> iter2 = cellScanner.iterator();
+
+ HashSet<RowColumnValue> actual1 = new HashSet<>();
+ HashSet<RowColumnValue> actual2 = new HashSet<>();
+
+ while (iter1.hasNext()) {
+ Assert.assertTrue(iter2.hasNext());
+ actual1.add(iter1.next());
+ actual2.add(iter2.next());
+ }
+
+ Assert.assertFalse(iter2.hasNext());
+
+ Assert.assertEquals(expected, actual1);
+ Assert.assertEquals(expected, actual2);
+ }
+ }
+
+ private Set<RowColumnValue> genData() {
+ Set<RowColumnValue> expected = new HashSet<>();
+ expected.add(new RowColumnValue("r1", new Column("f1", "q1"), "v1"));
+ expected.add(new RowColumnValue("r1", new Column("f2", "q3"), "v2"));
+ expected.add(new RowColumnValue("r2", new Column("f1", "q1"), "v3"));
+ expected.add(new RowColumnValue("r2", new Column("f1", "q2"), "v4"));
+ expected.add(new RowColumnValue("r4", new Column("f2", "q5"), "v5"));
+
+ Assert.assertEquals(5, expected.size());
+
+ try (Transaction tx = client.newTransaction()) {
+ for (RowColumnValue rcv : expected) {
+ tx.set(rcv.getRow(), rcv.getColumn(), rcv.getValue());
+ }
+ tx.commit();
+ }
+
+ return expected;
+ }
+}