You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bi...@apache.org on 2011/10/18 16:24:06 UTC

svn commit: r1185680 - in /incubator/accumulo/trunk/src: core/src/main/java/org/apache/accumulo/core/client/ core/src/main/java/org/apache/accumulo/core/client/mapreduce/ core/src/main/java/org/apache/accumulo/core/util/ core/src/test/java/org/apache/a...

Author: billie
Date: Tue Oct 18 14:24:05 2011
New Revision: 1185680

URL: http://svn.apache.org/viewvc?rev=1185680&view=rev
Log:
ACCUMULO-44 redesigned RowIterator and updated classes that use it

Added:
    incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/util/PeekingIterator.java
      - copied, changed from r1183357, incubator/accumulo/trunk/src/examples/src/main/java/org/apache/accumulo/examples/filedata/PeekingIterator.java
Removed:
    incubator/accumulo/trunk/src/examples/src/main/java/org/apache/accumulo/examples/filedata/PeekingIterator.java
Modified:
    incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/RowIterator.java
    incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
    incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
    incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/RowIteratorTest.java
    incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java
    incubator/accumulo/trunk/src/examples/src/main/java/org/apache/accumulo/examples/filedata/ChunkInputFormat.java
    incubator/accumulo/trunk/src/examples/src/main/java/org/apache/accumulo/examples/filedata/ChunkInputStream.java
    incubator/accumulo/trunk/src/examples/src/main/java/org/apache/accumulo/examples/filedata/FileDataQuery.java
    incubator/accumulo/trunk/src/examples/src/test/java/org/apache/accumulo/examples/filedata/ChunkInputStreamTest.java
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/IsolatedScan.java

Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/RowIterator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/RowIterator.java?rev=1185680&r1=1185679&r2=1185680&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/RowIterator.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/RowIterator.java Tue Oct 18 14:24:05 2011
@@ -1,73 +1,131 @@
 package org.apache.accumulo.core.client;
 
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.PeekingIterator;
 import org.apache.hadoop.io.Text;
 
-
 /**
- * Group Key/Value pairs into lists corresponding to rows.
- *
- * Obviously, do not use this if your row will not fit into memory.
+ * Group Key/Value pairs into Iterators over rows.
  */
-public class RowIterator implements Iterator<List<Entry<Key, Value>>> {
-    
-    private final Iterator<Entry<Key, Value>> impl;
-    private Entry<Key, Value> nextRow = null;
+public class RowIterator implements Iterator<Iterator<Entry<Key, Value>>> {
     
     /**
+     * Iterate over entries in a single row.
+     */
+    private static class SingleRowIter implements Iterator<Entry<Key,Value>> {
+        private PeekingIterator<Entry<Key, Value>> source;
+        private Text currentRow = null;
+        private long count = 0;
+        private boolean disabled = false;
+
+        /**
+         * SingleRowIter must be passed a PeekingIterator so that it can peek at 
+         * the next entry to see if it belongs in the current row or not.
+         */
+        public SingleRowIter(PeekingIterator<Entry<Key,Value>> source) {
+            this.source = source;
+            if (source.hasNext())
+                currentRow = source.peek().getKey().getRow();
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (disabled)
+                throw new IllegalStateException("SingleRowIter no longer valid");
+            return currentRow!=null;
+        }
+
+        @Override
+        public Entry<Key, Value> next() {
+            if (disabled)
+                throw new IllegalStateException("SingleRowIter no longer valid");
+            return _next();
+        }
+
+        private Entry<Key,Value> _next() {
+            if (currentRow==null)
+                throw new NoSuchElementException();
+            count++;
+            Entry<Key, Value> kv = source.next();
+            if (!source.hasNext() || !source.peek().getKey().getRow().equals(currentRow)) {
+                currentRow = null;
+            }
+            return kv;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+        /**
+         * Get a count of entries read from the row (only equals the number of 
+         * entries in the row when the row has been read fully).
+         */
+        public long getCount() {
+            return count;
+        }
+
+        /**
+         * Consume the rest of the row.  Disables the iterator from future use.
+         */
+        public void consume() {
+            disabled = true;
+            while (currentRow!=null)
+                _next();
+        }
+    }
+
+    private final PeekingIterator<Entry<Key, Value>> iter;
+    private long count = 0;
+    private SingleRowIter lastIter = null;
+
+    /**
      * Create an iterator from an (ordered) sequence of KeyValue pairs. 
      * @param iterator
      */
     public RowIterator(Iterator<Entry<Key, Value>> iterator) {
-        this.impl = iterator;
-        if (iterator.hasNext())
-            nextRow = iterator.next();
+        this.iter = new PeekingIterator<Entry<Key,Value>>(iterator);
     }
 
     /**
-     * Create an iterator from a Scanner.
-     * @param scanner
+     * Create an iterator from an Iterable.
+     * @param iterable
      */
-    public RowIterator(Scanner scanner) {
-        this(scanner.iterator());
+    public RowIterator(Iterable<Entry<Key, Value>> iterable) {
+        this(iterable.iterator());
     }
 
     /**
      * Returns true if there is at least one more row to get.
+     * 
+     * If the last row hasn't been fully read, this method will read through 
+     * the end of the last row so it can determine if the underlying iterator 
+     * has a next row.  The last row is disabled from future use.
      */
     @Override
     public boolean hasNext() {
-        return nextRow != null;
+        if (lastIter!=null) {
+            lastIter.consume();
+            count += lastIter.getCount();
+            lastIter = null;
+        }
+        return iter.hasNext();
     }
 
     /**
      * Fetch the next row.
      */
     @Override
-    public List<Entry<Key, Value>> next() {
-        final ArrayList<Entry<Key, Value>> result = new ArrayList<Entry<Key, Value>>();
-        if (nextRow == null)
+    public Iterator<Entry<Key, Value>> next() {
+        if (!hasNext())
             throw new NoSuchElementException();
-        
-        final Text row = nextRow.getKey().getRow();
-        result.add(nextRow);
-        nextRow = null;
-        while (impl.hasNext()) {
-            nextRow = impl.next();
-            if (nextRow.getKey().getRow().compareTo(row) == 0)
-                result.add(nextRow);
-            else
-                break;
-            nextRow = null;
-        }
-        return result;
+        return lastIter = new SingleRowIter(iter);
     }
 
     /**
@@ -77,4 +135,11 @@ public class RowIterator implements Iter
     public void remove() {
         throw new UnsupportedOperationException();
     }
+
+    /**
+     * Get a count of the total number of entries in all rows read so far.
+     */
+    public long getKVCount() {
+        return count;
+    }
 }

Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java?rev=1185680&r1=1185679&r2=1185680&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java Tue Oct 18 14:24:05 2011
@@ -1,25 +1,24 @@
 package org.apache.accumulo.core.client.mapreduce;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map.Entry;
 
 import org.apache.accumulo.core.client.RowIterator;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.PeekingIterator;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 
-public class AccumuloRowInputFormat extends InputFormatBase<Text, List<Entry<Key, Value>>> {
+public class AccumuloRowInputFormat extends InputFormatBase<Text, PeekingIterator<Entry<Key, Value>>> {
 	@Override
-	public RecordReader<Text, List<Entry<Key, Value>>> createRecordReader(
+	public RecordReader<Text, PeekingIterator<Entry<Key, Value>>> createRecordReader(
 			InputSplit split, TaskAttemptContext context) throws IOException,
 			InterruptedException {
-		return new RecordReaderBase<Text, List<Entry<Key, Value>>>() {
+		return new RecordReaderBase<Text, PeekingIterator<Entry<Key, Value>>>() {
 			RowIterator rowIterator;
 			
 			@Override
@@ -28,17 +27,17 @@ public class AccumuloRowInputFormat exte
 				super.initialize(inSplit, attempt);
 				rowIterator = new RowIterator(scannerIterator);
 				currentK = new Text();
-				currentV = new ArrayList<Entry<Key,Value>>();
+				currentV = null;
 			}
 			
 			@Override
 			public boolean nextKeyValue() throws IOException,
 					InterruptedException {
 				if (!rowIterator.hasNext()) return false;
-				currentV = rowIterator.next();
-				numKeysRead += currentV.size();
-				currentK = new Text(currentV.get(0).getKey().getRow());
-				currentKey = currentV.get(currentV.size()-1).getKey();
+				currentV = new PeekingIterator<Entry<Key, Value>>(rowIterator.next());
+				numKeysRead = rowIterator.getKVCount();
+				currentKey = currentV.peek().getKey();
+				currentK = new Text(currentKey.getRow());
 				return true;
 			}			
 		};

Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java?rev=1185680&r1=1185679&r2=1185680&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java Tue Oct 18 14:24:05 2011
@@ -571,7 +571,7 @@ public abstract class InputFormatBase<K,
 	
 	
 	protected abstract static class RecordReaderBase<K,V> extends RecordReader<K,V> {
-		protected int numKeysRead;
+		protected long numKeysRead;
 		protected Iterator<Entry<Key, Value>> scannerIterator;
 		private boolean scannerRegexEnabled = false;
 		protected RangeInputSplit split;

Copied: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/util/PeekingIterator.java (from r1183357, incubator/accumulo/trunk/src/examples/src/main/java/org/apache/accumulo/examples/filedata/PeekingIterator.java)
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/util/PeekingIterator.java?p2=incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/util/PeekingIterator.java&p1=incubator/accumulo/trunk/src/examples/src/main/java/org/apache/accumulo/examples/filedata/PeekingIterator.java&r1=1183357&r2=1185680&rev=1185680&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/examples/src/main/java/org/apache/accumulo/examples/filedata/PeekingIterator.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/util/PeekingIterator.java Tue Oct 18 14:24:05 2011
@@ -1,4 +1,4 @@
-package org.apache.accumulo.examples.filedata;
+package org.apache.accumulo.core.util;
 
 import java.util.Iterator;
 

Modified: incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/RowIteratorTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/RowIteratorTest.java?rev=1185680&r1=1185679&r2=1185680&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/RowIteratorTest.java (original)
+++ incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/RowIteratorTest.java Tue Oct 18 14:24:05 2011
@@ -1,5 +1,10 @@
 package org.apache.accumulo.core.client;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -8,11 +13,9 @@ import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.TreeMap;
 
-import org.apache.accumulo.core.client.RowIterator;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.junit.Test;
-import static org.junit.Assert.*;
 
 
 
@@ -33,7 +36,10 @@ public class RowIteratorTest {
         List<List<Entry<Key, Value>>> result = new ArrayList<List<Entry<Key, Value>>>();
         RowIterator riter = new RowIterator(iter);
         while (riter.hasNext()) {
-            result.add(riter.next());
+        	Iterator<Entry<Key,Value>> row = riter.next();
+        	List<Entry<Key,Value>> rlist = new ArrayList<Entry<Key,Value>>();
+        	while (row.hasNext()) rlist.add(row.next());
+            result.add(rlist);
         }
         return result; 
     }
@@ -64,14 +70,48 @@ public class RowIteratorTest {
         
         i = new RowIterator(makeIterator("a b c d", "a 1 2 3"));
         assertTrue(i.hasNext());
-        i.next();
+        Iterator<Entry<Key, Value>> row = i.next();
+        assertTrue(row.hasNext());
+        row.next();
+        assertTrue(row.hasNext());
+        row.next();
+        assertFalse(row.hasNext());
+        try {
+            row.next();
+            fail();
+        } catch (NoSuchElementException ex) {
+        }
+        assertEquals(0,i.getKVCount());
         assertFalse(i.hasNext());
+        assertEquals(2,i.getKVCount());
         try {
             i.next();
             fail();
         } catch (NoSuchElementException ex) {
         }
     }
-    
 
+    @Test
+    public void testUnreadRow() {
+        RowIterator i = new RowIterator(makeIterator("a b c d", "a 1 2 3", "b 1 2 3"));
+        assertTrue(i.hasNext());
+        Iterator<Entry<Key,Value>> firstRow = i.next();
+        assertEquals(0,i.getKVCount());
+        assertTrue(i.hasNext());
+        assertEquals(2,i.getKVCount());
+        Iterator<Entry<Key,Value>> nextRow = i.next();
+        assertEquals(2,i.getKVCount());
+        assertFalse(i.hasNext());
+        assertEquals(3,i.getKVCount());
+        try {
+            firstRow.hasNext();
+            fail();
+        } catch (IllegalStateException e) {
+        }
+        try {
+            nextRow.next();
+            fail();
+        } catch (IllegalStateException e) {
+        }
+    }
 }

Modified: incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java?rev=1185680&r1=1185679&r2=1185680&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java (original)
+++ incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java Tue Oct 18 14:24:05 2011
@@ -5,6 +5,7 @@ import static org.junit.Assert.assertFal
 import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
@@ -19,6 +20,7 @@ import org.apache.accumulo.core.data.Mut
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.PeekingIterator;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -53,6 +55,16 @@ public class AccumuloRowInputFormatTest 
 		}
 	}
 	
+	public static void checkLists(List<Entry<Key,Value>> a, Iterator<Entry<Key,Value>> b) {
+		int i = 0;
+		while (b.hasNext()) {
+			Entry<Key,Value> e = b.next();
+			assertEquals(a.get(i).getKey(),e.getKey());
+			assertEquals(a.get(i).getValue(),e.getValue());
+			i++;
+		}
+	}
+	
 	public static void insertList(BatchWriter bw, List<Entry<Key,Value>> list) throws Exception {
 		for (Entry<Key,Value> e : list) {
 			Key k = e.getKey();
@@ -80,7 +92,7 @@ public class AccumuloRowInputFormatTest 
 		AccumuloRowInputFormat crif = new AccumuloRowInputFormat();
 		RangeInputSplit ris = new RangeInputSplit();
 		TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(),new TaskAttemptID());
-		RecordReader<Text, List<Entry<Key, Value>>> rr = crif.createRecordReader(ris, tac);
+		RecordReader<Text, PeekingIterator<Entry<Key, Value>>> rr = crif.createRecordReader(ris, tac);
 		rr.initialize(ris, tac);
 		
 		assertTrue(rr.nextKeyValue());

Modified: incubator/accumulo/trunk/src/examples/src/main/java/org/apache/accumulo/examples/filedata/ChunkInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/src/main/java/org/apache/accumulo/examples/filedata/ChunkInputFormat.java?rev=1185680&r1=1185679&r2=1185680&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/examples/src/main/java/org/apache/accumulo/examples/filedata/ChunkInputFormat.java (original)
+++ incubator/accumulo/trunk/src/examples/src/main/java/org/apache/accumulo/examples/filedata/ChunkInputFormat.java Tue Oct 18 14:24:05 2011
@@ -9,6 +9,7 @@ import java.util.Map.Entry;
 import org.apache.accumulo.core.client.mapreduce.InputFormatBase;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.PeekingIterator;
 import org.apache.accumulo.core.util.format.DefaultFormatter;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;

Modified: incubator/accumulo/trunk/src/examples/src/main/java/org/apache/accumulo/examples/filedata/ChunkInputStream.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/src/main/java/org/apache/accumulo/examples/filedata/ChunkInputStream.java?rev=1185680&r1=1185679&r2=1185680&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/examples/src/main/java/org/apache/accumulo/examples/filedata/ChunkInputStream.java (original)
+++ incubator/accumulo/trunk/src/examples/src/main/java/org/apache/accumulo/examples/filedata/ChunkInputStream.java Tue Oct 18 14:24:05 2011
@@ -9,6 +9,7 @@ import java.util.TreeSet;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.PeekingIterator;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 

Modified: incubator/accumulo/trunk/src/examples/src/main/java/org/apache/accumulo/examples/filedata/FileDataQuery.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/src/main/java/org/apache/accumulo/examples/filedata/FileDataQuery.java?rev=1185680&r1=1185679&r2=1185680&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/examples/src/main/java/org/apache/accumulo/examples/filedata/FileDataQuery.java (original)
+++ incubator/accumulo/trunk/src/examples/src/main/java/org/apache/accumulo/examples/filedata/FileDataQuery.java Tue Oct 18 14:24:05 2011
@@ -15,6 +15,7 @@ import org.apache.accumulo.core.data.Key
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.PeekingIterator;
 
 
 public class FileDataQuery {

Modified: incubator/accumulo/trunk/src/examples/src/test/java/org/apache/accumulo/examples/filedata/ChunkInputStreamTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/src/test/java/org/apache/accumulo/examples/filedata/ChunkInputStreamTest.java?rev=1185680&r1=1185679&r2=1185680&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/examples/src/test/java/org/apache/accumulo/examples/filedata/ChunkInputStreamTest.java (original)
+++ incubator/accumulo/trunk/src/examples/src/test/java/org/apache/accumulo/examples/filedata/ChunkInputStreamTest.java Tue Oct 18 14:24:05 2011
@@ -21,9 +21,9 @@ import org.apache.accumulo.core.data.Mut
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.PeekingIterator;
 import org.apache.accumulo.examples.filedata.ChunkInputStream;
 import org.apache.accumulo.examples.filedata.FileDataIngest;
-import org.apache.accumulo.examples.filedata.PeekingIterator;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1185680&r1=1185679&r2=1185680&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/Master.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/Master.java Tue Oct 18 14:24:05 2011
@@ -626,14 +626,16 @@ public class Master implements LiveTServ
 					Text ert = ByteBufferUtil.toText(endRow);
 					
 					while(ri.hasNext()){
-						List<Entry<Key, Value>> row = ri.next();
+						Iterator<Entry<Key, Value>> row = ri.next();
 						long tabletFlushID = -1;
 						int logs = 0;
 						boolean online = false;
 						
 						TServerInstance server = null;
 						
-						for (Entry<Key, Value> entry : row) {
+						Entry<Key, Value> entry = null;
+						while (row.hasNext()) {
+							entry = row.next();
 							Key key = entry.getKey();
 
 							if(Constants.METADATA_FLUSH_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier())){
@@ -659,7 +661,7 @@ public class Master implements LiveTServ
 						
 						tabletCount++;
 						
-						Text tabletEndRow = new KeyExtent(row.get(0).getKey().getRow(), (Text)null).getEndRow();
+						Text tabletEndRow = new KeyExtent(entry.getKey().getRow(), (Text)null).getEndRow();
 						if(tabletEndRow == null || (ert != null && tabletEndRow.compareTo(ert) >=0))
 							break;
 					}

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java?rev=1185680&r1=1185679&r2=1185680&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java Tue Oct 18 14:24:05 2011
@@ -1,7 +1,7 @@
 package org.apache.accumulo.server.master.tableOps;
 
 import java.util.Collections;
-import java.util.List;
+import java.util.Iterator;
 import java.util.Map.Entry;
 
 import org.apache.accumulo.core.Constants;
@@ -22,8 +22,8 @@ import org.apache.accumulo.core.master.s
 import org.apache.accumulo.core.util.ColumnFQ;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.fate.Repo;
-import org.apache.accumulo.server.master.Master;
 import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
+import org.apache.accumulo.server.master.Master;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.accumulo.server.util.MapCounter;
@@ -78,12 +78,14 @@ class CompactionDriver extends MasterRep
 		int tabletCount = 0;
 
 		while(ri.hasNext()){
-			List<Entry<Key,Value>> row = ri.next();
+			Iterator<Entry<Key,Value>> row = ri.next();
 			long tabletCompactID = -1;
 			
 			TServerInstance server = null;
 
-			for (Entry<Key, Value> entry : row) {
+			Entry<Key, Value> entry = null;
+			while (row.hasNext()) {
+				entry = row.next();
 				Key key = entry.getKey();
 
 				if(Constants.METADATA_COMPACT_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier()))
@@ -101,7 +103,7 @@ class CompactionDriver extends MasterRep
 
 			tabletCount++;
 			
-			Text tabletEndRow = new KeyExtent(row.get(0).getKey().getRow(), (Text)null).getEndRow();
+			Text tabletEndRow = new KeyExtent(entry.getKey().getRow(), (Text)null).getEndRow();
 			if(tabletEndRow == null || (endRow != null && tabletEndRow.compareTo(new Text(endRow)) >=0))
 				break;
 		}

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/IsolatedScan.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/IsolatedScan.java?rev=1185680&r1=1185679&r2=1185680&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/IsolatedScan.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/IsolatedScan.java Tue Oct 18 14:24:05 2011
@@ -14,6 +14,7 @@ import org.apache.accumulo.core.client.T
 import org.apache.accumulo.core.client.TableOfflineException;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.PeekingIterator;
 import org.apache.accumulo.server.test.randomwalk.State;
 import org.apache.accumulo.server.test.randomwalk.Test;
 
@@ -36,10 +37,14 @@ public class IsolatedScan extends Test {
 			RowIterator iter = new RowIterator(new IsolatedScanner(conn.createScanner(tableName, Constants.NO_AUTHS)));
 			
 			while(iter.hasNext()){
-				List<Entry<Key, Value>> row = iter.next();
-				for(int i=1; i < row.size(); i++)
-					if(!row.get(0).getValue().equals(row.get(i).getValue()))
-						throw new Exception("values not equal "+row.get(0)+" "+row.get(i));
+				PeekingIterator<Entry<Key, Value>> row = new PeekingIterator<Entry<Key,Value>>(iter.next());
+				Entry<Key,Value> kv = null;
+				if (row.hasNext()) kv = row.peek();
+				while (row.hasNext()) {
+					Entry<Key, Value> currentKV = row.next();
+					if (!kv.getValue().equals(currentKV.getValue()))
+						throw new Exception("values not equal "+kv+" "+currentKV);
+				}
 			}
 			log.debug("Isolated scan "+tableName);
 		} catch (TableDeletedException e){