You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 20:23:38 UTC

[37/51] [partial] Initial commit

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
new file mode 100644
index 0000000..896e5f5
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -0,0 +1,385 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.SizedUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+
+
+/**
+ * Region observer that aggregates grouped rows (i.e. SQL query with GROUP BY clause)
+ * 
+ * @author jtaylor
+ * @since 0.1
+ */
+public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
+    private static final Logger logger = LoggerFactory.getLogger(GroupedAggregateRegionObserver.class);
+    
+    public static final String AGGREGATORS = "Aggs";
+    public static final String UNORDERED_GROUP_BY_EXPRESSIONS = "UnorderedGroupByExpressions";
+    public static final String KEY_ORDERED_GROUP_BY_EXPRESSIONS = "OrderedGroupByExpressions";
+
+    public static final String ESTIMATED_DISTINCT_VALUES = "EstDistinctValues";
+    public static final int DEFAULT_ESTIMATED_DISTINCT_VALUES = 10000;
+    public static final int MIN_DISTINCT_VALUES = 100;
+    
+
+    /**
+     * Replaces the RegionScanner s with a RegionScanner that groups by the key formed by the list of expressions from the scan
+     * and returns the aggregated rows of each group.  For example, given the following original rows in the RegionScanner:
+     * KEY    COL1
+     * row1   a
+     * row2   b
+     * row3   a
+     * row4   a
+     * 
+     * the following rows will be returned for COUNT(*):
+     * KEY    COUNT
+     * a      3
+     * b      1
+     *
+     * The client is required to do a sort and a final aggregation, since multiple rows with the same key may be returned from different regions.
+     */
+    @Override
+    protected RegionScanner doPostScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan, RegionScanner s) throws IOException {
+        boolean keyOrdered = false;
+        byte[] expressionBytes = scan.getAttribute(UNORDERED_GROUP_BY_EXPRESSIONS);
+
+        if (expressionBytes == null) {
+            expressionBytes = scan.getAttribute(KEY_ORDERED_GROUP_BY_EXPRESSIONS);
+            if (expressionBytes == null) {
+                return s;
+            }
+            keyOrdered = true;
+        }
+        List<Expression> expressions = deserializeGroupByExpressions(expressionBytes);
+        
+        ServerAggregators aggregators = ServerAggregators.deserialize(
+                scan.getAttribute(GroupedAggregateRegionObserver.AGGREGATORS), c.getEnvironment().getConfiguration());
+
+        final ScanProjector p = ScanProjector.deserializeProjectorFromScan(scan);
+        final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);        
+        RegionScanner innerScanner = s;
+        if (p != null || j != null) {
+            innerScanner = new HashJoinRegionScanner(s, p, j, ScanUtil.getTenantId(scan), c.getEnvironment());
+        }
+        
+        if (keyOrdered) { // Optimize by taking advantage that the rows are already in the required group by key order
+            return scanOrdered(c, scan, innerScanner, expressions, aggregators);
+        } else { // Otherwse, collect them all up in an in memory map
+            return scanUnordered(c, scan, innerScanner, expressions, aggregators);
+        }
+    }
+
+    private static int sizeOfUnorderedGroupByMap(int nRows, int valueSize) {
+        return SizedUtil.sizeOfMap(nRows, SizedUtil.IMMUTABLE_BYTES_WRITABLE_SIZE, valueSize);
+    }
+
+    public static void serializeIntoScan(Scan scan, String attribName, List<Expression> groupByExpressions) {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream(Math.max(1, groupByExpressions.size() * 10));
+        try {
+            if (groupByExpressions.isEmpty()) { // FIXME ?
+                stream.write(QueryConstants.TRUE);
+            } else {
+                DataOutputStream output = new DataOutputStream(stream);
+                for (Expression expression : groupByExpressions) {
+                    WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal());
+                    expression.write(output);
+                }
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e); // Impossible
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        scan.setAttribute(attribName, stream.toByteArray());
+
+    }
+
+    private List<Expression> deserializeGroupByExpressions(byte[] expressionBytes) throws IOException {
+        List<Expression> expressions = new ArrayList<Expression>(3);
+        ByteArrayInputStream stream = new ByteArrayInputStream(expressionBytes);
+        try {
+            DataInputStream input = new DataInputStream(stream);
+            while (true) {
+                try {
+                    int expressionOrdinal = WritableUtils.readVInt(input);
+                    Expression expression = ExpressionType.values()[expressionOrdinal].newInstance();
+                    expression.readFields(input);
+                    expressions.add(expression);
+                } catch (EOFException e) {
+                    break;
+                }
+            }
+        } finally {
+            stream.close();
+        }
+        return expressions;
+    }
+    
+    /**
+     * Used for an aggregate query in which the key order does not necessarily match the group by key order. In this case,
+     * we must collect all distinct groups within a region into a map, aggregating as we go.
+     */
+    private RegionScanner scanUnordered(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan, final RegionScanner s, List<Expression> expressions, ServerAggregators aggregators) throws IOException {
+        
+        if (logger.isDebugEnabled()) {
+            logger.debug("Grouped aggregation over unordered rows with scan " + scan + ", group by " + expressions + ", aggregators " + aggregators);
+        }
+        int estDistVals = DEFAULT_ESTIMATED_DISTINCT_VALUES;
+        byte[] estDistValsBytes = scan.getAttribute(ESTIMATED_DISTINCT_VALUES);
+        if (estDistValsBytes != null) {
+            estDistVals = Math.min(MIN_DISTINCT_VALUES, (int)(Bytes.toInt(estDistValsBytes) * 1.5f));  // Allocate 1.5x estimation
+        }
+        
+        TenantCache tenantCache = GlobalCache.getTenantCache(c.getEnvironment(), ScanUtil.getTenantId(scan));
+        int estSize = sizeOfUnorderedGroupByMap(estDistVals, aggregators.getSize());
+        final MemoryChunk chunk = tenantCache.getMemoryManager().allocate(estSize);
+        boolean success = false;
+        try {
+            // TODO: spool map to disk if map becomes too big
+            boolean hasMore;
+            int estValueSize = aggregators.getSize();
+            MultiKeyValueTuple result = new MultiKeyValueTuple();
+            Map<ImmutableBytesWritable, Aggregator[]> aggregateMap = new HashMap<ImmutableBytesWritable, Aggregator[]>(estDistVals);
+            HRegion region = c.getEnvironment().getRegion();
+            MultiVersionConsistencyControl.setThreadReadPoint(s.getMvccReadPoint());
+            region.startRegionOperation();
+            try {
+                do {
+                    List<KeyValue> results = new ArrayList<KeyValue>();
+                    // Results are potentially returned even when the return value of s.next is false
+                    // since this is an indication of whether or not there are more values after the
+                    // ones returned
+                    hasMore = s.nextRaw(results, null);
+                    if (!results.isEmpty()) {
+                        result.setKeyValues(results);
+                        ImmutableBytesWritable key = TupleUtil.getConcatenatedValue(result, expressions);
+                        Aggregator[] rowAggregators = aggregateMap.get(key);
+                        if (rowAggregators == null) {
+                            // If Aggregators not found for this distinct value, clone our original one (we need one per distinct value)
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("Adding new aggregate bucket for row key " + Bytes.toStringBinary(key.get(),key.getOffset(),key.getLength()));
+                            }
+                            rowAggregators = aggregators.newAggregators(c.getEnvironment().getConfiguration());
+                            aggregateMap.put(key, rowAggregators);
+                        }
+                        // Aggregate values here
+                        aggregators.aggregate(rowAggregators, result);
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Row passed filters: " + results + ", aggregated values: " + Arrays.asList(rowAggregators));
+                        }
+                            
+                        if (aggregateMap.size() > estDistVals) { // increase allocation
+                            estDistVals *= 1.5f;
+                            estSize = sizeOfUnorderedGroupByMap(estDistVals, estValueSize);
+                            chunk.resize(estSize);
+                        }
+                    }
+                } while (hasMore);
+            } finally {
+                region.closeRegionOperation();
+            }
+    
+            // Compute final allocation
+            estSize = sizeOfUnorderedGroupByMap(aggregateMap.size(), estValueSize);
+            chunk.resize(estSize);
+            
+            // TODO: spool list to disk if too big and free memory?
+            final List<KeyValue> aggResults = new ArrayList<KeyValue>(aggregateMap.size());
+            for (Map.Entry<ImmutableBytesWritable, Aggregator[]> entry : aggregateMap.entrySet()) {
+                ImmutableBytesWritable key = entry.getKey();
+                Aggregator[] rowAggregators = entry.getValue();
+                // Generate byte array of Aggregators and set as value of row
+                byte[] value = aggregators.toBytes(rowAggregators);
+                
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Adding new distinct group: " + Bytes.toStringBinary(key.get(),key.getOffset(), key.getLength()) + 
+                            " with aggregators " + Arrays.asList(rowAggregators).toString() + 
+                            " value = " + Bytes.toStringBinary(value));
+                }
+                KeyValue keyValue = KeyValueUtil.newKeyValue(key.get(),key.getOffset(), key.getLength(),SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length);
+                aggResults.add(keyValue);
+            }
+            // Do not sort here, but sort back on the client instead
+            // The reason is that if the scan ever extends beyond a region (which can happen
+            // if we're basing our parallelization split points on old metadata), we'll get
+            // incorrect query results.
+            RegionScanner scanner = new BaseRegionScanner() {
+                private int index = 0;
+    
+                @Override
+                public HRegionInfo getRegionInfo() {
+                    return s.getRegionInfo();
+                }
+    
+                @Override
+                public void close() throws IOException {
+                    try {
+                        s.close();
+                    } finally {
+                        chunk.close();
+                    }
+                }
+    
+                @Override
+                public boolean next(List<KeyValue> results) throws IOException {
+                    if (index >= aggResults.size()) return false;
+                    results.add(aggResults.get(index));
+                    index++;
+                    return index < aggResults.size();
+                }
+            };
+            success = true;
+            return scanner;
+        } finally {
+            if (!success)
+                chunk.close();
+        }
+    }
+
+    /**
+     * Used for an aggregate query in which the key order match the group by key order. In this case, we can do the
+     * aggregation as we scan, by detecting when the group by key changes.
+     */
+    private RegionScanner scanOrdered(final ObserverContext<RegionCoprocessorEnvironment> c, Scan scan, final RegionScanner s, final List<Expression> expressions, final ServerAggregators aggregators) {
+        
+        if (logger.isDebugEnabled()) {
+            logger.debug("Grouped aggregation over ordered rows with scan " + scan + ", group by " + expressions + ", aggregators " + aggregators);
+        }
+        return new BaseRegionScanner() {
+            private ImmutableBytesWritable currentKey = null;
+
+            @Override
+            public HRegionInfo getRegionInfo() {
+                return s.getRegionInfo();
+            }
+
+            @Override
+            public void close() throws IOException {
+                s.close();
+            }
+
+            @Override
+            public boolean next(List<KeyValue> results) throws IOException {
+                boolean hasMore;
+                boolean aggBoundary = false;
+                MultiKeyValueTuple result = new MultiKeyValueTuple();
+                ImmutableBytesWritable key = null;
+                Aggregator[] rowAggregators = aggregators.getAggregators();
+                HRegion region = c.getEnvironment().getRegion();
+                MultiVersionConsistencyControl.setThreadReadPoint(s.getMvccReadPoint());
+                region.startRegionOperation();
+                try {
+                    do {
+                        List<KeyValue> kvs = new ArrayList<KeyValue>();
+                        // Results are potentially returned even when the return value of s.next is false
+                        // since this is an indication of whether or not there are more values after the
+                        // ones returned
+                        hasMore = s.nextRaw(kvs, null);
+                        if (!kvs.isEmpty()) {
+                            result.setKeyValues(kvs);
+                            key = TupleUtil.getConcatenatedValue(result, expressions);
+                            aggBoundary = currentKey != null && currentKey.compareTo(key) != 0;
+                            if (!aggBoundary) {
+                                aggregators.aggregate(rowAggregators, result);
+                                if (logger.isDebugEnabled()) {
+                                    logger.debug("Row passed filters: " + kvs + ", aggregated values: " + Arrays.asList(rowAggregators));
+                                }
+                                currentKey = key;
+                            }
+                        }
+                    } while (hasMore && !aggBoundary);
+                } finally {
+                    region.closeRegionOperation();
+                }
+                
+                if (currentKey != null) {
+                    byte[] value = aggregators.toBytes(rowAggregators);
+                    KeyValue keyValue = KeyValueUtil.newKeyValue(currentKey.get(),currentKey.getOffset(), currentKey.getLength(),SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length);
+                    results.add(keyValue);
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Adding new aggregate row: " + keyValue + ",for current key " + Bytes.toStringBinary(currentKey.get(),currentKey.getOffset(), currentKey.getLength()) + ", aggregated values: " + Arrays.asList(rowAggregators));
+                    }
+                    // If we're at an aggregation boundary, reset the aggregators and
+                    // aggregate with the current result (which is not a part of the returned result).
+                    if (aggBoundary) {
+                        aggregators.reset(rowAggregators);
+                        aggregators.aggregate(rowAggregators, result);
+                        currentKey = key;
+                    }
+                }
+                // Continue if there are more
+                if (hasMore || aggBoundary) {
+                    return true;
+                }
+                currentKey = null;
+                return false;
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
new file mode 100644
index 0000000..51dcf43
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -0,0 +1,235 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.HashCache;
+import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.parse.JoinTableNode.JoinType;
+import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.TupleUtil;
+
+public class HashJoinRegionScanner implements RegionScanner {
+    
+    private final RegionScanner scanner;
+    private final ScanProjector projector;
+    private final HashJoinInfo joinInfo;
+    private Queue<List<KeyValue>> resultQueue;
+    private boolean hasMore;
+    private TenantCache cache;
+    
+    public HashJoinRegionScanner(RegionScanner scanner, ScanProjector projector, HashJoinInfo joinInfo, ImmutableBytesWritable tenantId, RegionCoprocessorEnvironment env) throws IOException {
+        this.scanner = scanner;
+        this.projector = projector;
+        this.joinInfo = joinInfo;
+        this.resultQueue = new LinkedList<List<KeyValue>>();
+        this.hasMore = true;
+        if (joinInfo != null) {
+            if (tenantId == null)
+                throw new IOException("Could not find tenant id for hash cache.");
+            for (JoinType type : joinInfo.getJoinTypes()) {
+                if (type == JoinType.Right)
+                    throw new IOException("The hashed table should not be LHS.");
+            }
+            this.cache = GlobalCache.getTenantCache(env, tenantId);
+        }
+    }
+    
+    @SuppressWarnings("unchecked")
+    private void processResults(List<KeyValue> result, boolean hasLimit) throws IOException {
+        if (result.isEmpty())
+            return;
+        
+        if (projector != null) {
+            List<KeyValue> kvs = new ArrayList<KeyValue>(result.size());
+            for (KeyValue kv : result) {
+                kvs.add(projector.getProjectedKeyValue(kv));
+            }
+            if (joinInfo != null) {
+                result = kvs;
+            } else {
+                resultQueue.offer(kvs);               
+            }
+        }
+        
+        if (joinInfo != null) {
+            if (hasLimit)
+                throw new UnsupportedOperationException("Cannot support join operations in scans with limit");
+            
+            int count = joinInfo.getJoinIds().length;
+            List<Tuple>[] tuples = new List[count];
+            Tuple tuple = new ResultTuple(new Result(result));
+            boolean cont = true;
+            for (int i = 0; i < count; i++) {
+                ImmutableBytesPtr key = TupleUtil.getConcatenatedValue(tuple, joinInfo.getJoinExpressions()[i]);
+                HashCache hashCache = (HashCache)cache.getServerCache(joinInfo.getJoinIds()[i]);
+                tuples[i] = hashCache.get(key);
+                JoinType type = joinInfo.getJoinTypes()[i];
+                if (type == JoinType.Inner && (tuples[i] == null || tuples[i].isEmpty())) {
+                    cont = false;
+                    break;
+                }
+            }
+            if (cont) {
+                resultQueue.offer(result);
+                for (int i = 0; i < count; i++) {
+                    if (tuples[i] == null || tuples[i].isEmpty())
+                        continue;
+                    int j = resultQueue.size();
+                    while (j-- > 0) {
+                        List<KeyValue> lhs = resultQueue.poll();
+                        for (Tuple t : tuples[i]) {
+                            List<KeyValue> rhs = ((ResultTuple) t).getResult().list();
+                            List<KeyValue> joined = new ArrayList<KeyValue>(lhs.size() + rhs.size());
+                            joined.addAll(lhs);
+                            joined.addAll(rhs); // we don't replace rowkey here, for further reference to the rowkey fields, needs to specify family as well.
+                            resultQueue.offer(joined);
+                        }
+                    }
+                }
+            }
+        }
+    }
+    
+    private boolean shouldAdvance() {
+        if (!resultQueue.isEmpty())
+            return false;
+        
+        return hasMore;
+    }
+    
+    private boolean nextInQueue(List<KeyValue> results) {
+        if (resultQueue.isEmpty())
+            return false;
+        
+        results.addAll(resultQueue.poll());
+        return resultQueue.isEmpty() ? hasMore : true;
+    }
+
+    @Override
+    public long getMvccReadPoint() {
+        return scanner.getMvccReadPoint();
+    }
+
+    @Override
+    public HRegionInfo getRegionInfo() {
+        return scanner.getRegionInfo();
+    }
+
+    @Override
+    public boolean isFilterDone() {
+        return scanner.isFilterDone() && resultQueue.isEmpty();
+    }
+
+    @Override
+    public boolean nextRaw(List<KeyValue> result, String metric) throws IOException {
+        while (shouldAdvance()) {
+            List<KeyValue> tempResult = new ArrayList<KeyValue>();
+            hasMore = scanner.nextRaw(tempResult, metric);
+            processResults(tempResult, false);
+        }
+        
+        return nextInQueue(result);
+    }
+
+    @Override
+    public boolean nextRaw(List<KeyValue> result, int limit, String metric)
+            throws IOException {
+        while (shouldAdvance()) {
+            List<KeyValue> tempResult = new ArrayList<KeyValue>();
+            hasMore = scanner.nextRaw(tempResult, limit, metric);
+            processResults(tempResult, true);
+        }
+        
+        return nextInQueue(result);
+    }
+
+    @Override
+    public boolean reseek(byte[] row) throws IOException {
+        return scanner.reseek(row);
+    }
+
+    @Override
+    public void close() throws IOException {
+        scanner.close();
+    }
+
+    @Override
+    public boolean next(List<KeyValue> result) throws IOException {
+        while (shouldAdvance()) {
+            List<KeyValue> tempResult = new ArrayList<KeyValue>();
+            hasMore = scanner.next(tempResult);
+            processResults(tempResult, false);
+        }
+        
+        return nextInQueue(result);
+    }
+
+    @Override
+    public boolean next(List<KeyValue> result, String metric) throws IOException {
+        while (shouldAdvance()) {
+            List<KeyValue> tempResult = new ArrayList<KeyValue>();
+            hasMore = scanner.next(tempResult, metric);
+            processResults(tempResult, false);
+        }
+        
+        return nextInQueue(result);
+    }
+
+    @Override
+    public boolean next(List<KeyValue> result, int limit) throws IOException {
+        while (shouldAdvance()) {
+            List<KeyValue> tempResult = new ArrayList<KeyValue>();
+            hasMore = scanner.next(tempResult, limit);
+            processResults(tempResult, true);
+        }
+        
+        return nextInQueue(result);
+    }
+
+    @Override
+    public boolean next(List<KeyValue> result, int limit, String metric)
+            throws IOException {
+        while (shouldAdvance()) {
+            List<KeyValue> tempResult = new ArrayList<KeyValue>();
+            hasMore = scanner.next(tempResult, limit, metric);
+            processResults(tempResult, true);
+        }
+        
+        return nextInQueue(result);
+    }
+
+}