You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/09/03 19:40:29 UTC
[2/6] git commit: ACCUMULO-3097 Add some class-level javadoc for some
iterators.
ACCUMULO-3097 Add some class-level javadoc for some iterators.
Formatter also fixed some whitespace issues.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/b0c3ba8c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/b0c3ba8c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/b0c3ba8c
Branch: refs/heads/1.6.1-SNAPSHOT
Commit: b0c3ba8c5a6654f4409172202d793e9e9dd5d423
Parents: f838cd9
Author: Josh Elser <el...@apache.org>
Authored: Wed Sep 3 13:24:44 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed Sep 3 13:29:15 2014 -0400
----------------------------------------------------------------------
.../core/iterators/SkippingIterator.java | 13 +++-
.../core/iterators/system/HeapIterator.java | 4 +
.../system/SourceSwitchingIterator.java | 77 +++++++++++---------
.../iterators/system/SynchronizedIterator.java | 23 +++---
4 files changed, 66 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b0c3ba8c/core/src/main/java/org/apache/accumulo/core/iterators/SkippingIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/SkippingIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/SkippingIterator.java
index a73d997..9c18590 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/SkippingIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/SkippingIterator.java
@@ -22,20 +22,25 @@ import java.util.Collection;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Range;
+/**
+ * Every call to {@link #next()} and {@link #seek(Range, Collection, boolean)} calls the parent's implementation and then calls the implementation's
+ * {@link #consume()}. This provides a cleaner way for implementers to write code that will read one to many key-value pairs, as opposed to returning every
+ * key-value pair seen.
+ */
public abstract class SkippingIterator extends WrappingIterator {
-
+
@Override
public void next() throws IOException {
super.next();
consume();
}
-
+
protected abstract void consume() throws IOException;
-
+
@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
super.seek(range, columnFamilies, inclusive);
consume();
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b0c3ba8c/core/src/main/java/org/apache/accumulo/core/iterators/system/HeapIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/HeapIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/HeapIterator.java
index 4b26dcc..8f2f66c 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/system/HeapIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/HeapIterator.java
@@ -24,6 +24,10 @@ import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+/**
+ * Constructs a {@link PriorityQueue} of multiple SortedKeyValueIterators. Provides a simple way to interact with multiple SortedKeyValueIterators in sorted
+ * order.
+ */
public abstract class HeapIterator implements SortedKeyValueIterator<Key,Value> {
private PriorityQueue<SortedKeyValueIterator<Key,Value>> heap;
private SortedKeyValueIterator<Key,Value> topIdx = null;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b0c3ba8c/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
index 46d2007..33d0ebf 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
@@ -32,33 +32,38 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+/**
+ * A SortedKeyValueIterator which presents a view over some section of data, regardless of whether or not it is backed by memory (InMemoryMap) or an RFile
+ * (InMemoryMap that was minor compacted to a file). Clients reading from a table that has data in memory should not see interruption in their scan when that
+ * data is minor compacted. This iterator is designed to manage this behind the scene.
+ */
public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value>, InterruptibleIterator {
-
+
public interface DataSource {
boolean isCurrent();
-
+
DataSource getNewDataSource();
-
+
DataSource getDeepCopyDataSource(IteratorEnvironment env);
-
+
SortedKeyValueIterator<Key,Value> iterator() throws IOException;
}
-
+
private DataSource source;
private SortedKeyValueIterator<Key,Value> iter;
-
+
private Key key;
private Value val;
-
+
private Range range;
private boolean inclusive;
private Collection<ByteSequence> columnFamilies;
-
+
private boolean onlySwitchAfterRow;
private AtomicBoolean iflag;
-
+
private final List<SourceSwitchingIterator> copies;
-
+
private SourceSwitchingIterator(DataSource source, boolean onlySwitchAfterRow, List<SourceSwitchingIterator> copies, AtomicBoolean iflag) {
this.source = source;
this.onlySwitchAfterRow = onlySwitchAfterRow;
@@ -66,51 +71,51 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value
this.iflag = iflag;
copies.add(this);
}
-
+
public SourceSwitchingIterator(DataSource source, boolean onlySwitchAfterRow) {
this(source, onlySwitchAfterRow, Collections.synchronizedList(new ArrayList<SourceSwitchingIterator>()), null);
}
-
+
public SourceSwitchingIterator(DataSource source) {
this(source, false);
}
-
+
@Override
public synchronized SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
return new SourceSwitchingIterator(source.getDeepCopyDataSource(env), onlySwitchAfterRow, copies, iflag);
}
-
+
@Override
public Key getTopKey() {
return key;
}
-
+
@Override
public Value getTopValue() {
return val;
}
-
+
@Override
public boolean hasTop() {
return key != null;
}
-
+
@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
throw new UnsupportedOperationException();
}
-
+
@Override
public void next() throws IOException {
readNext(false);
}
-
+
private synchronized void readNext(boolean initialSeek) throws IOException {
-
+
// check of initialSeek second is intentional so that it does not short
// circuit the call to switchSource
boolean seekNeeded = (!onlySwitchAfterRow && switchSource()) || initialSeek;
-
+
if (seekNeeded)
if (initialSeek)
iter.seek(range, columnFamilies, inclusive);
@@ -123,11 +128,11 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value
iter.seek(new Range(key.followingKey(PartialKey.ROW), true, range.getEndKey(), range.isEndKeyInclusive()), columnFamilies, inclusive);
}
}
-
+
if (iter.hasTop()) {
Key nextKey = iter.getTopKey();
Value nextVal = iter.getTopValue();
-
+
try {
key = (Key) nextKey.clone();
} catch (CloneNotSupportedException e) {
@@ -139,62 +144,62 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value
val = null;
}
}
-
+
private boolean switchSource() throws IOException {
while (!source.isCurrent()) {
source = source.getNewDataSource();
iter = source.iterator();
if (iflag != null)
((InterruptibleIterator) iter).setInterruptFlag(iflag);
-
+
return true;
}
-
+
return false;
}
-
+
@Override
public synchronized void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
this.range = range;
this.inclusive = inclusive;
this.columnFamilies = columnFamilies;
-
+
if (iter == null) {
iter = source.iterator();
if (iflag != null)
((InterruptibleIterator) iter).setInterruptFlag(iflag);
}
-
+
readNext(true);
}
-
+
private synchronized void _switchNow() throws IOException {
if (onlySwitchAfterRow)
throw new IllegalStateException("Can only switch on row boundries");
-
+
if (switchSource()) {
if (key != null) {
iter.seek(new Range(key, true, range.getEndKey(), range.isEndKeyInclusive()), columnFamilies, inclusive);
}
}
}
-
+
public void switchNow() throws IOException {
synchronized (copies) {
for (SourceSwitchingIterator ssi : copies)
ssi._switchNow();
}
}
-
+
@Override
public synchronized void setInterruptFlag(AtomicBoolean flag) {
if (copies.size() != 1)
throw new IllegalStateException("setInterruptFlag() called after deep copies made " + copies.size());
-
+
this.iflag = flag;
if (iter != null)
((InterruptibleIterator) iter).setInterruptFlag(flag);
-
+
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b0c3ba8c/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java
index 2657bab..ca7cd86 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java
@@ -28,50 +28,51 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
/***
- * SynchronizedIterator: wrap a SortedKeyValueIterator so that all of its methods are synchronized
+ * Wraps a SortedKeyValueIterator so that all of its methods are synchronized. The intent is that user iterators which are multi-threaded have the possibility
+ * to call parent methods concurrently. The SynchronizedIterators aims to reduce the likelihood of unwanted concurrent access.
*/
public class SynchronizedIterator<K extends WritableComparable<?>,V extends Writable> implements SortedKeyValueIterator<K,V> {
-
+
private SortedKeyValueIterator<K,V> source = null;
-
+
@Override
public synchronized void init(SortedKeyValueIterator<K,V> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
this.source = source;
source.init(source, options, env);
}
-
+
@Override
public synchronized boolean hasTop() {
return source.hasTop();
}
-
+
@Override
public synchronized void next() throws IOException {
source.next();
}
-
+
@Override
public synchronized void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
source.seek(range, columnFamilies, inclusive);
}
-
+
@Override
public synchronized K getTopKey() {
return source.getTopKey();
}
-
+
@Override
public synchronized V getTopValue() {
return source.getTopValue();
}
-
+
@Override
public synchronized SortedKeyValueIterator<K,V> deepCopy(IteratorEnvironment env) {
return new SynchronizedIterator<K,V>(source.deepCopy(env));
}
-
+
public SynchronizedIterator() {}
-
+
public SynchronizedIterator(SortedKeyValueIterator<K,V> source) {
this.source = source;
}