You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 20:23:27 UTC
[26/51] [partial] Initial commit
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java b/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java
new file mode 100644
index 0000000..f7af26c
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.*;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
+import org.apache.phoenix.compile.ColumnProjector;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * Result scanner that dedups the incoming tuples to make them distinct.
+ * <p>
+ * Note that the results are held in memory
+ *
+ * @author jtaylor
+ * @since 1.2
+ */
+public class DistinctAggregatingResultIterator implements AggregatingResultIterator {
+ private final AggregatingResultIterator delegate;
+ private final RowProjector rowProjector;
+ private Iterator<ResultEntry> resultIterator;
+ private final ImmutableBytesWritable ptr1 = new ImmutableBytesWritable();
+ private final ImmutableBytesWritable ptr2 = new ImmutableBytesWritable();
+
+ private class ResultEntry {
+ private final int hashCode;
+ private final Tuple result;
+
+ ResultEntry(Tuple result) {
+ final int prime = 31;
+ this.result = result;
+ int hashCode = 0;
+ for (ColumnProjector column : rowProjector.getColumnProjectors()) {
+ Expression e = column.getExpression();
+ if (e.evaluate(this.result, ptr1)) {
+ hashCode = prime * hashCode + ptr1.hashCode();
+ }
+ }
+ this.hashCode = hashCode;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ if (o == null) {
+ return false;
+ }
+ if (o.getClass() != this.getClass()) {
+ return false;
+ }
+ ResultEntry that = (ResultEntry) o;
+ for (ColumnProjector column : rowProjector.getColumnProjectors()) {
+ Expression e = column.getExpression();
+ boolean isNull1 = !e.evaluate(this.result, ptr1);
+ boolean isNull2 = !e.evaluate(that.result, ptr2);
+ if (isNull1 && isNull2) {
+ return true;
+ }
+ if (isNull1 || isNull2) {
+ return false;
+ }
+ if (ptr1.compareTo(ptr2) != 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return hashCode;
+ }
+
+ Tuple getResult() {
+ return result;
+ }
+ }
+
+ protected ResultIterator getDelegate() {
+ return delegate;
+ }
+
+ public DistinctAggregatingResultIterator(AggregatingResultIterator delegate,
+ RowProjector rowProjector) {
+ this.delegate = delegate;
+ this.rowProjector = rowProjector;
+ }
+
+ @Override
+ public Tuple next() throws SQLException {
+ Iterator<ResultEntry> iterator = getResultIterator();
+ if (iterator.hasNext()) {
+ ResultEntry entry = iterator.next();
+ Tuple tuple = entry.getResult();
+ aggregate(tuple);
+ return tuple;
+ }
+ resultIterator = Iterators.emptyIterator();
+ return null;
+ }
+
+ private Iterator<ResultEntry> getResultIterator() throws SQLException {
+ if (resultIterator != null) {
+ return resultIterator;
+ }
+
+ Set<ResultEntry> entries = Sets.<ResultEntry>newHashSet(); // TODO: size?
+ try {
+ for (Tuple result = delegate.next(); result != null; result = delegate.next()) {
+ ResultEntry entry = new ResultEntry(result);
+ entries.add(entry);
+ }
+ } finally {
+ delegate.close();
+ }
+
+ resultIterator = entries.iterator();
+ return resultIterator;
+ }
+
+ @Override
+ public void close() {
+ resultIterator = Iterators.emptyIterator();
+ }
+
+
+ @Override
+ public void explain(List<String> planSteps) {
+ delegate.explain(planSteps);
+ planSteps.add("CLIENT DISTINCT ON " + rowProjector.toString());
+ }
+
+ @Override
+ public void aggregate(Tuple result) {
+ delegate.aggregate(result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/iterate/ExplainTable.java b/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
new file mode 100644
index 0000000..f32b377
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
@@ -0,0 +1,271 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.text.Format;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.collect.Iterators;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.KeyRange.Bound;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.StringUtil;
+
+
+public abstract class ExplainTable {
+ private static final List<KeyRange> EVERYTHING = Collections.singletonList(KeyRange.EVERYTHING_RANGE);
+ protected final StatementContext context;
+ protected final TableRef tableRef;
+ protected final GroupBy groupBy;
+
+ public ExplainTable(StatementContext context, TableRef table) {
+ this(context,table,GroupBy.EMPTY_GROUP_BY);
+ }
+
+ public ExplainTable(StatementContext context, TableRef table, GroupBy groupBy) {
+ this.context = context;
+ this.tableRef = table;
+ this.groupBy = groupBy;
+ }
+
+ private boolean explainSkipScan(StringBuilder buf) {
+ ScanRanges scanRanges = context.getScanRanges();
+ if (scanRanges.useSkipScanFilter()) {
+ buf.append("SKIP SCAN ");
+ int count = 1;
+ boolean hasRanges = false;
+ for (List<KeyRange> ranges : scanRanges.getRanges()) {
+ count *= ranges.size();
+ for (KeyRange range : ranges) {
+ hasRanges |= !range.isSingleKey();
+ }
+ }
+ buf.append("ON ");
+ buf.append(count);
+ buf.append(hasRanges ? " RANGE" : " KEY");
+ buf.append(count > 1 ? "S " : " ");
+ return true;
+ } else {
+ buf.append("RANGE SCAN ");
+ }
+ return false;
+ }
+
+ protected void explain(String prefix, List<String> planSteps) {
+ StringBuilder buf = new StringBuilder(prefix);
+ ScanRanges scanRanges = context.getScanRanges();
+ boolean hasSkipScanFilter = false;
+ if (scanRanges.isEverything()) {
+ buf.append("FULL SCAN ");
+ } else {
+ hasSkipScanFilter = explainSkipScan(buf);
+ }
+ buf.append("OVER " + tableRef.getTable().getName().getString());
+ appendKeyRanges(buf);
+ planSteps.add(buf.toString());
+
+ Scan scan = context.getScan();
+ Filter filter = scan.getFilter();
+ PageFilter pageFilter = null;
+ if (filter != null) {
+ int offset = 0;
+ boolean hasFirstKeyOnlyFilter = false;
+ String filterDesc = "";
+ if (hasSkipScanFilter) {
+ if (filter instanceof FilterList) {
+ List<Filter> filterList = ((FilterList) filter).getFilters();
+ if (filterList.get(0) instanceof FirstKeyOnlyFilter) {
+ hasFirstKeyOnlyFilter = true;
+ offset = 1;
+ }
+ if (filterList.size() > offset+1) {
+ filterDesc = filterList.get(offset+1).toString();
+ if (filterList.size() > offset+2) {
+ pageFilter = (PageFilter) filterList.get(offset+2);
+ }
+ }
+ }
+ } else if (filter instanceof FilterList) {
+ List<Filter> filterList = ((FilterList) filter).getFilters();
+ if (filterList.get(0) instanceof FirstKeyOnlyFilter) {
+ hasFirstKeyOnlyFilter = true;
+ offset = 1;
+ }
+ if (filterList.size() > offset) {
+ filterDesc = filterList.get(offset).toString();
+ if (filterList.size() > offset+1) {
+ pageFilter = (PageFilter) filterList.get(offset+1);
+ }
+ }
+ } else {
+ if (filter instanceof FirstKeyOnlyFilter) {
+ hasFirstKeyOnlyFilter = true;
+ } else {
+ filterDesc = filter.toString();
+ }
+ }
+ if (filterDesc.length() > 0) {
+ planSteps.add(" SERVER FILTER BY " + (hasFirstKeyOnlyFilter ? "FIRST KEY ONLY AND " : "") + filterDesc);
+ } else if (hasFirstKeyOnlyFilter) {
+ planSteps.add(" SERVER FILTER BY FIRST KEY ONLY");
+ }
+ if (pageFilter != null) {
+ planSteps.add(" SERVER " + pageFilter.getPageSize() + " ROW LIMIT");
+ }
+ }
+ groupBy.explain(planSteps);
+ }
+
+ private void appendPKColumnValue(StringBuilder buf, byte[] range, Boolean isNull, int slotIndex) {
+ if (Boolean.TRUE.equals(isNull)) {
+ buf.append("null");
+ return;
+ }
+ if (Boolean.FALSE.equals(isNull)) {
+ buf.append("not null");
+ return;
+ }
+ if (range.length == 0) {
+ buf.append('*');
+ return;
+ }
+ ScanRanges scanRanges = context.getScanRanges();
+ PDataType type = scanRanges.getSchema().getField(slotIndex).getDataType();
+ ColumnModifier modifier = tableRef.getTable().getPKColumns().get(slotIndex).getColumnModifier();
+ if (modifier != null) {
+ buf.append('~');
+ range = modifier.apply(range, 0, new byte[range.length], 0, range.length);
+ }
+ Format formatter = context.getConnection().getFormatter(type);
+ buf.append(type.toStringLiteral(range, formatter));
+ }
+
+ private static class RowKeyValueIterator implements Iterator<byte[]> {
+ private final RowKeySchema schema;
+ private ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ private int position = 0;
+ private final int maxOffset;
+ private byte[] nextValue;
+
+ public RowKeyValueIterator(RowKeySchema schema, byte[] rowKey) {
+ this.schema = schema;
+ this.maxOffset = schema.iterator(rowKey, ptr);
+ iterate();
+ }
+
+ private void iterate() {
+ if (schema.next(ptr, position++, maxOffset) == null) {
+ nextValue = null;
+ } else {
+ nextValue = ptr.copyBytes();
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return nextValue != null;
+ }
+
+ @Override
+ public byte[] next() {
+ if (nextValue == null) {
+ throw new NoSuchElementException();
+ }
+ byte[] value = nextValue;
+ iterate();
+ return value;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
+ private void appendScanRow(StringBuilder buf, Bound bound) {
+ ScanRanges scanRanges = context.getScanRanges();
+ KeyRange minMaxRange = context.getMinMaxRange();
+ Iterator<byte[]> minMaxIterator = Iterators.emptyIterator();
+ if (minMaxRange != null) {
+ RowKeySchema schema = tableRef.getTable().getRowKeySchema();
+ if (!minMaxRange.isUnbound(bound)) {
+ minMaxIterator = new RowKeyValueIterator(schema, minMaxRange.getRange(bound));
+ }
+ }
+ int nRanges = scanRanges.getRanges().size();
+ for (int i = 0, minPos = 0; minPos < nRanges || minMaxIterator.hasNext(); i++) {
+ List<KeyRange> ranges = minPos >= nRanges ? EVERYTHING : scanRanges.getRanges().get(minPos++);
+ KeyRange range = bound == Bound.LOWER ? ranges.get(0) : ranges.get(ranges.size()-1);
+ byte[] b = range.getRange(bound);
+ Boolean isNull = KeyRange.IS_NULL_RANGE == range ? Boolean.TRUE : KeyRange.IS_NOT_NULL_RANGE == range ? Boolean.FALSE : null;
+ if (minMaxIterator.hasNext()) {
+ byte[] bMinMax = minMaxIterator.next();
+ int cmp = Bytes.compareTo(bMinMax, b) * (bound == Bound.LOWER ? 1 : -1);
+ if (cmp > 0) {
+ minPos = nRanges;
+ b = bMinMax;
+ isNull = null;
+ } else if (cmp < 0) {
+ minMaxIterator = Iterators.emptyIterator();
+ }
+ }
+ appendPKColumnValue(buf, b, isNull, i);
+ buf.append(',');
+ }
+ }
+
+ private void appendKeyRanges(StringBuilder buf) {
+ ScanRanges scanRanges = context.getScanRanges();
+ KeyRange minMaxRange = context.getMinMaxRange();
+ if (minMaxRange == null && (scanRanges == ScanRanges.EVERYTHING || scanRanges == ScanRanges.NOTHING)) {
+ return;
+ }
+ buf.append(" [");
+ StringBuilder buf1 = new StringBuilder();
+ appendScanRow(buf1, Bound.LOWER);
+ buf.append(buf1);
+ buf.setCharAt(buf.length()-1, ']');
+ StringBuilder buf2 = new StringBuilder();
+ appendScanRow(buf2, Bound.UPPER);
+ if (!StringUtil.equals(buf1, buf2)) {
+ buf.append( " - [");
+ buf.append(buf2);
+ }
+ buf.setCharAt(buf.length()-1, ']');
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java b/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java
new file mode 100644
index 0000000..d687480
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+/**
+ *
+ * Post aggregation filter for HAVING clause. Due to the way we cache aggregation values
+ * we cannot have a look ahead for this Iterator, because the expressions in the SELECT
+ * clause would return values for the peeked row instead of the current row. If we only
+ * use the Result argument in {@link org.apache.phoenix.expression.Expression}
+ * instead of our cached value in Aggregators, we could have a look ahead.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class FilterAggregatingResultIterator implements AggregatingResultIterator {
+ private final AggregatingResultIterator delegate;
+ private final Expression expression;
+ private final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+
+ public FilterAggregatingResultIterator(AggregatingResultIterator delegate, Expression expression) {
+ this.delegate = delegate;
+ this.expression = expression;
+ if (expression.getDataType() != PDataType.BOOLEAN) {
+ throw new IllegalArgumentException("FilterResultIterator requires a boolean expression, but got " + expression);
+ }
+ }
+
+ @Override
+ public Tuple next() throws SQLException {
+ Tuple next;
+ do {
+ next = delegate.next();
+ } while (next != null && expression.evaluate(next, ptr) && Boolean.FALSE.equals(expression.getDataType().toObject(ptr)));
+ return next;
+ }
+
+ @Override
+ public void close() throws SQLException {
+ delegate.close();
+ }
+
+ @Override
+ public void aggregate(Tuple result) {
+ delegate.aggregate(result);
+ }
+
+ @Override
+ public void explain(List<String> planSteps) {
+ delegate.explain(planSteps);
+ planSteps.add("CLIENT FILTER BY " + expression.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/iterate/FilterResultIterator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/iterate/FilterResultIterator.java b/src/main/java/org/apache/phoenix/iterate/FilterResultIterator.java
new file mode 100644
index 0000000..5518643
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/iterate/FilterResultIterator.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+/**
+ *
+ * Result scanner that filters out rows based on the results of a boolean
+ * expression (i.e. filters out if {@link org.apache.phoenix.expression.Expression#evaluate(Tuple, ImmutableBytesWritable)}
+ * returns false or the ptr contains a FALSE value}). May not be used where
+ * the delegate provided is an {@link org.apache.phoenix.iterate.AggregatingResultIterator}.
+ * For these, the {@link org.apache.phoenix.iterate.FilterAggregatingResultIterator} should be used.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class FilterResultIterator extends LookAheadResultIterator {
+ private final ResultIterator delegate;
+ private final Expression expression;
+ private final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+
+ public FilterResultIterator(ResultIterator delegate, Expression expression) {
+ if (delegate instanceof AggregatingResultIterator) {
+ throw new IllegalArgumentException("FilterResultScanner may not be used with an aggregate delegate. Use phoenix.iterate.FilterAggregateResultScanner instead");
+ }
+ this.delegate = delegate;
+ this.expression = expression;
+ if (expression.getDataType() != PDataType.BOOLEAN) {
+ throw new IllegalArgumentException("FilterResultIterator requires a boolean expression, but got " + expression);
+ }
+ }
+
+ @Override
+ protected Tuple advance() throws SQLException {
+ Tuple next;
+ do {
+ next = delegate.next();
+ } while (next != null && expression.evaluate(next, ptr) && Boolean.FALSE.equals(expression.getDataType().toObject(ptr)));
+ return next;
+ }
+
+ @Override
+ public void close() throws SQLException {
+ delegate.close();
+ }
+
+ @Override
+ public void explain(List<String> planSteps) {
+ delegate.explain(planSteps);
+ planSteps.add("CLIENT FILTER BY " + expression.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java b/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java
new file mode 100644
index 0000000..7985699
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.*;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+
+
+/**
+ *
+ * Result scanner that aggregates the row count value for rows with duplicate keys.
+ * The rows from the backing result iterator must be in key sorted order. For example,
+ * given the following input:
+ * a 1
+ * a 2
+ * b 1
+ * b 3
+ * c 1
+ * the following will be output:
+ * a 3
+ * b 4
+ * c 1
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class GroupedAggregatingResultIterator implements AggregatingResultIterator {
+ private final ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
+ private final PeekingResultIterator resultIterator;
+ protected final Aggregators aggregators;
+
+ public GroupedAggregatingResultIterator( PeekingResultIterator resultIterator, Aggregators aggregators) {
+ if (resultIterator == null) throw new NullPointerException();
+ if (aggregators == null) throw new NullPointerException();
+ this.resultIterator = resultIterator;
+ this.aggregators = aggregators;
+ }
+
+ @Override
+ public Tuple next() throws SQLException {
+ Tuple result = resultIterator.next();
+ if (result == null) {
+ return null;
+ }
+ Aggregator[] rowAggregators = aggregators.getAggregators();
+ aggregators.reset(rowAggregators);
+ while (true) {
+ aggregators.aggregate(rowAggregators, result);
+ Tuple nextResult = resultIterator.peek();
+ if (nextResult == null || !TupleUtil.equals(result, nextResult, tempPtr)) {
+ break;
+ }
+ result = resultIterator.next();
+ }
+
+ byte[] value = aggregators.toBytes(rowAggregators);
+ result.getKey(tempPtr);
+ return new SingleKeyValueTuple(KeyValueUtil.newKeyValue(tempPtr, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+ }
+
+ @Override
+ public void close() throws SQLException {
+ resultIterator.close();
+ }
+
+ @Override
+ public void aggregate(Tuple result) {
+ Aggregator[] rowAggregators = aggregators.getAggregators();
+ aggregators.reset(rowAggregators);
+ aggregators.aggregate(rowAggregators, result);
+ }
+
+ @Override
+ public void explain(List<String> planSteps) {
+ resultIterator.explain(planSteps);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java b/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java
new file mode 100644
index 0000000..ecea92d
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ *
+ * Iterates through tuples up to a limit
+ *
+ * @author jtaylor
+ * @since 1.2
+ */
+public class LimitingResultIterator extends DelegateResultIterator {
+ private int rowCount;
+ private final int limit;
+
+ public LimitingResultIterator(ResultIterator delegate, int limit) {
+ super(delegate);
+ this.limit = limit;
+ }
+
+ @Override
+ public Tuple next() throws SQLException {
+ if (rowCount++ >= limit) {
+ return null;
+ }
+ return super.next();
+ }
+
+ @Override
+ public void explain(List<String> planSteps) {
+ super.explain(planSteps);
+ planSteps.add("CLIENT " + limit + " ROW LIMIT");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java b/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
new file mode 100644
index 0000000..0e4b78b
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+
+import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+abstract public class LookAheadResultIterator implements PeekingResultIterator {
+ private final static Tuple UNINITIALIZED = new ResultTuple();
+ private Tuple next = UNINITIALIZED;
+
+ abstract protected Tuple advance() throws SQLException;
+
+ private void init() throws SQLException {
+ if (next == UNINITIALIZED) {
+ next = advance();
+ }
+ }
+
+ @Override
+ public Tuple next() throws SQLException {
+ init();
+ Tuple next = this.next;
+ this.next = advance();
+ return next;
+ }
+
+ @Override
+ public Tuple peek() throws SQLException {
+ init();
+ return next;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java b/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java
new file mode 100644
index 0000000..ddacd69
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java
@@ -0,0 +1,381 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.io.*;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.util.*;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.collect.MinMaxPriorityQueue;
+import org.apache.phoenix.iterate.OrderedResultIterator.ResultEntry;
+import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+public class MappedByteBufferSortedQueue extends AbstractQueue<ResultEntry> {
+ private Comparator<ResultEntry> comparator;
+ private final int limit;
+ private final int thresholdBytes;
+ private List<MappedByteBufferPriorityQueue> queues = new ArrayList<MappedByteBufferPriorityQueue>();
+ private MappedByteBufferPriorityQueue currentQueue = null;
+ private int currentIndex = 0;
+ MinMaxPriorityQueue<IndexedResultEntry> mergedQueue = null;
+
+ public MappedByteBufferSortedQueue(Comparator<ResultEntry> comparator,
+ Integer limit, int thresholdBytes) throws IOException {
+ this.comparator = comparator;
+ this.limit = limit == null ? -1 : limit;
+ this.thresholdBytes = thresholdBytes;
+ this.currentQueue = new MappedByteBufferPriorityQueue(0,
+ this.limit, thresholdBytes, comparator);
+ this.queues.add(currentQueue);
+ }
+
+ @Override
+ public boolean offer(ResultEntry e) {
+ try {
+ boolean isFlush = this.currentQueue.writeResult(e);
+ if (isFlush) {
+ currentIndex++;
+ currentQueue = new MappedByteBufferPriorityQueue(currentIndex,
+ limit, thresholdBytes, comparator);
+ queues.add(currentQueue);
+ }
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+
+ return true;
+ }
+
+ @Override
+ public ResultEntry poll() {
+ if (mergedQueue == null) {
+ mergedQueue = MinMaxPriorityQueue.<ResultEntry> orderedBy(
+ comparator).maximumSize(queues.size()).create();
+ for (MappedByteBufferPriorityQueue queue : queues) {
+ try {
+ IndexedResultEntry next = queue.getNextResult();
+ if (next != null) {
+ mergedQueue.add(next);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ if (!mergedQueue.isEmpty()) {
+ IndexedResultEntry re = mergedQueue.pollFirst();
+ if (re != null) {
+ IndexedResultEntry next = null;
+ try {
+ next = queues.get(re.getIndex()).getNextResult();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ if (next != null) {
+ mergedQueue.add(next);
+ }
+ return re;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public ResultEntry peek() {
+ if (mergedQueue == null) {
+ mergedQueue = MinMaxPriorityQueue.<ResultEntry> orderedBy(
+ comparator).maximumSize(queues.size()).create();
+ for (MappedByteBufferPriorityQueue queue : queues) {
+ try {
+ IndexedResultEntry next = queue.getNextResult();
+ if (next != null) {
+ mergedQueue.add(next);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ if (!mergedQueue.isEmpty()) {
+ IndexedResultEntry re = mergedQueue.peekFirst();
+ if (re != null) {
+ return re;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Iterator<ResultEntry> iterator() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int size() {
+ int size = 0;
+ for (MappedByteBufferPriorityQueue queue : queues) {
+ size += queue.size();
+ }
+ return size;
+ }
+
+ public long getByteSize() {
+ return currentQueue.getInMemByteSize();
+ }
+
+ public void close() {
+ if (queues != null) {
+ for (MappedByteBufferPriorityQueue queue : queues) {
+ queue.close();
+ }
+ }
+ }
+
+ private static class IndexedResultEntry extends ResultEntry {
+ private int index;
+
+ public IndexedResultEntry(int index, ResultEntry resultEntry) {
+ super(resultEntry.sortKeys, resultEntry.result);
+ this.index = index;
+ }
+
+ public int getIndex() {
+ return this.index;
+ }
+ }
+
+ private static class MappedByteBufferPriorityQueue {
+ private static final long DEFAULT_MAPPING_SIZE = 1024;
+
+ private final int limit;
+ private final int thresholdBytes;
+ private long totalResultSize = 0;
+ private int maxResultSize = 0;
+ private long mappingSize = 0;
+ private long writeIndex = 0;
+ private long readIndex = 0;
+ private MappedByteBuffer writeBuffer;
+ private MappedByteBuffer readBuffer;
+ private FileChannel fc;
+ private RandomAccessFile af;
+ private File file;
+ private boolean isClosed = false;
+ MinMaxPriorityQueue<ResultEntry> results = null;
+ private boolean flushBuffer = false;
+ private int index;
+ private int flushedCount;
+
+ public MappedByteBufferPriorityQueue(int index, int limit, int thresholdBytes,
+ Comparator<ResultEntry> comparator) throws IOException {
+ this.index = index;
+ this.limit = limit;
+ this.thresholdBytes = thresholdBytes;
+ results = limit < 0 ?
+ MinMaxPriorityQueue.<ResultEntry> orderedBy(comparator).create()
+ : MinMaxPriorityQueue.<ResultEntry> orderedBy(comparator).maximumSize(limit).create();
+ }
+
+ public int size() {
+ if (flushBuffer)
+ return flushedCount;
+ return results.size();
+ }
+
+ public long getInMemByteSize() {
+ if (flushBuffer)
+ return 0;
+ return totalResultSize;
+ }
+
+ private List<KeyValue> toKeyValues(ResultEntry entry) {
+ Tuple result = entry.getResult();
+ int size = result.size();
+ List<KeyValue> kvs = new ArrayList<KeyValue>(size);
+ for (int i = 0; i < size; i++) {
+ kvs.add(result.getValue(i));
+ }
+ return kvs;
+ }
+
+ private int sizeof(List<KeyValue> kvs) {
+ int size = Bytes.SIZEOF_INT; // totalLen
+
+ for (KeyValue kv : kvs) {
+ size += kv.getLength();
+ size += Bytes.SIZEOF_INT; // kv.getLength
+ }
+
+ return size;
+ }
+
+ private int sizeof(ImmutableBytesWritable[] sortKeys) {
+ int size = Bytes.SIZEOF_INT;
+ if (sortKeys != null) {
+ for (ImmutableBytesWritable sortKey : sortKeys) {
+ if (sortKey != null) {
+ size += sortKey.getLength();
+ }
+ size += Bytes.SIZEOF_INT;
+ }
+ }
+ return size;
+ }
+
+ public boolean writeResult(ResultEntry entry) throws IOException {
+ if (flushBuffer)
+ throw new IOException("Results already flushed");
+
+ int sortKeySize = sizeof(entry.sortKeys);
+ int resultSize = sizeof(toKeyValues(entry)) + sortKeySize;
+ boolean added = results.add(entry);
+ if (added) {
+ maxResultSize = Math.max(maxResultSize, resultSize);
+ totalResultSize = limit < 0 ? (totalResultSize + resultSize) : maxResultSize * results.size();
+ if (totalResultSize >= thresholdBytes) {
+ this.file = File.createTempFile(UUID.randomUUID().toString(), null);
+ this.af = new RandomAccessFile(file, "rw");
+ this.fc = af.getChannel();
+ mappingSize = Math.min(Math.max(maxResultSize, DEFAULT_MAPPING_SIZE), totalResultSize);
+ writeBuffer = fc.map(MapMode.READ_WRITE, writeIndex, mappingSize);
+
+ for (int i = 0; i < results.size(); i++) {
+ int totalLen = 0;
+ ResultEntry re = results.pollFirst();
+ List<KeyValue> keyValues = toKeyValues(re);
+ for (KeyValue kv : keyValues) {
+ totalLen += (kv.getLength() + Bytes.SIZEOF_INT);
+ }
+ writeBuffer.putInt(totalLen);
+ for (KeyValue kv : keyValues) {
+ writeBuffer.putInt(kv.getLength());
+ writeBuffer.put(kv.getBuffer(), kv.getOffset(), kv
+ .getLength());
+ }
+ ImmutableBytesWritable[] sortKeys = re.sortKeys;
+ writeBuffer.putInt(sortKeys.length);
+ for (ImmutableBytesWritable sortKey : sortKeys) {
+ if (sortKey != null) {
+ writeBuffer.putInt(sortKey.getLength());
+ writeBuffer.put(sortKey.get(), sortKey.getOffset(),
+ sortKey.getLength());
+ } else {
+ writeBuffer.putInt(0);
+ }
+ }
+ // buffer close to exhausted, re-map.
+ if (mappingSize - writeBuffer.position() < maxResultSize) {
+ writeIndex += writeBuffer.position();
+ writeBuffer = fc.map(MapMode.READ_WRITE, writeIndex, mappingSize);
+ }
+ }
+ writeBuffer.putInt(-1); // end
+ flushedCount = results.size();
+ results.clear();
+ flushBuffer = true;
+ }
+ }
+ return flushBuffer;
+ }
+
+ public IndexedResultEntry getNextResult() throws IOException {
+ if (isClosed)
+ return null;
+
+ if (!flushBuffer) {
+ ResultEntry re = results.poll();
+ if (re == null) {
+ reachedEnd();
+ return null;
+ }
+ return new IndexedResultEntry(index, re);
+ }
+
+ if (readBuffer == null) {
+ readBuffer = this.fc.map(MapMode.READ_ONLY, readIndex, mappingSize);
+ }
+
+ int length = readBuffer.getInt();
+ if (length < 0) {
+ reachedEnd();
+ return null;
+ }
+
+ byte[] rb = new byte[length];
+ readBuffer.get(rb);
+ Result result = new Result(new ImmutableBytesWritable(rb));
+ ResultTuple rt = new ResultTuple(result);
+ int sortKeySize = readBuffer.getInt();
+ ImmutableBytesWritable[] sortKeys = new ImmutableBytesWritable[sortKeySize];
+ for (int i = 0; i < sortKeySize; i++) {
+ int contentLength = readBuffer.getInt();
+ if (contentLength > 0) {
+ byte[] sortKeyContent = new byte[contentLength];
+ readBuffer.get(sortKeyContent);
+ sortKeys[i] = new ImmutableBytesWritable(sortKeyContent);
+ } else {
+ sortKeys[i] = null;
+ }
+ }
+ // buffer close to exhausted, re-map.
+ if (mappingSize - readBuffer.position() < maxResultSize) {
+ readIndex += readBuffer.position();
+ readBuffer = fc.map(MapMode.READ_ONLY, readIndex, mappingSize);
+ }
+
+ return new IndexedResultEntry(index, new ResultEntry(sortKeys, rt));
+ }
+
+ private void reachedEnd() {
+ this.isClosed = true;
+ if (this.fc != null) {
+ try {
+ this.fc.close();
+ } catch (IOException ignored) {
+ }
+ this.fc = null;
+ }
+ if (this.af != null) {
+ try {
+ this.af.close();
+ } catch (IOException ignored) {
+ }
+ this.af = null;
+ }
+ if (this.file != null) {
+ file.delete();
+ file = null;
+ }
+ }
+
+ public void close() {
+ if (!isClosed) {
+ this.reachedEnd();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/iterate/MaterializedResultIterator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/iterate/MaterializedResultIterator.java b/src/main/java/org/apache/phoenix/iterate/MaterializedResultIterator.java
new file mode 100644
index 0000000..c509b2c
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/iterate/MaterializedResultIterator.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.*;
+
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+/**
+ *
+ * Fully materialized result iterator backed by the result list provided.
+ * No copy is made of the backing results collection.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class MaterializedResultIterator implements PeekingResultIterator {
+ private final PeekingCollectionIterator iterator;
+
+ public MaterializedResultIterator(Collection<Tuple> results) {
+ iterator = new PeekingCollectionIterator(results);
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public Tuple next() throws SQLException {
+ return iterator.nextOrNull();
+ }
+
+ @Override
+ public Tuple peek() throws SQLException {
+ return iterator.peek();
+ }
+
+ private static class PeekingCollectionIterator implements Iterator<Tuple> {
+ private final Iterator<Tuple> iterator;
+ private Tuple current;
+
+ private PeekingCollectionIterator(Collection<Tuple> results) {
+ iterator = results.iterator();
+ advance();
+ }
+
+ private Tuple advance() {
+ if (iterator.hasNext()) {
+ current = iterator.next();
+ } else {
+ current = null;
+ }
+ return current;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return current != null;
+ }
+
+ @Override
+ public Tuple next() {
+ Tuple next = nextOrNull();
+ if (next == null) {
+ throw new NoSuchElementException();
+ }
+ return next;
+ }
+
+ public Tuple nextOrNull() {
+ if (current == null) {
+ return null;
+ }
+ Tuple next = current;
+ advance();
+ return next;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ public Tuple peek() {
+ return current;
+ }
+
+ }
+
+ @Override
+ public void explain(List<String> planSteps) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java b/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
new file mode 100644
index 0000000..58b7e40
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.SQLCloseables;
+
+
+/**
+ *
+ * Base class for a ResultIterator that does a merge sort on the list of iterators
+ * provided.
+ *
+ * @author jtaylor
+ * @since 1.2
+ */
+public abstract class MergeSortResultIterator implements PeekingResultIterator {
+ protected final ResultIterators resultIterators;
+ protected final ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
+ private List<PeekingResultIterator> iterators;
+
+ public MergeSortResultIterator(ResultIterators iterators) {
+ this.resultIterators = iterators;
+ }
+
+ private List<PeekingResultIterator> getIterators() throws SQLException {
+ if (iterators == null) {
+ iterators = resultIterators.getIterators();
+ }
+ return iterators;
+ }
+
+ @Override
+ public void close() throws SQLException {
+ if (iterators != null) {
+ SQLCloseables.closeAll(iterators);
+ }
+ }
+
+ abstract protected int compare(Tuple t1, Tuple t2);
+
+ private PeekingResultIterator minIterator() throws SQLException {
+ List<PeekingResultIterator> iterators = getIterators();
+ Tuple minResult = null;
+ PeekingResultIterator minIterator = EMPTY_ITERATOR;
+ for (int i = iterators.size()-1; i >= 0; i--) {
+ PeekingResultIterator iterator = iterators.get(i);
+ Tuple r = iterator.peek();
+ if (r != null) {
+ if (minResult == null || compare(r, minResult) < 0) {
+ minResult = r;
+ minIterator = iterator;
+ }
+ continue;
+ }
+ iterator.close();
+ iterators.remove(i);
+ }
+ return minIterator;
+ }
+
+ @Override
+ public Tuple peek() throws SQLException {
+ PeekingResultIterator iterator = minIterator();
+ return iterator.peek();
+ }
+
+ @Override
+ public Tuple next() throws SQLException {
+ PeekingResultIterator iterator = minIterator();
+ return iterator.next();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/iterate/MergeSortRowKeyResultIterator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/iterate/MergeSortRowKeyResultIterator.java b/src/main/java/org/apache/phoenix/iterate/MergeSortRowKeyResultIterator.java
new file mode 100644
index 0000000..49cb35a
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/iterate/MergeSortRowKeyResultIterator.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.util.List;
+
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.TupleUtil;
+
+
+/**
+ *
+ * ResultIterator that does a merge sort on the list of iterators provided,
+ * returning the rows in row key ascending order. The iterators provided
+ * must be in row key ascending order.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class MergeSortRowKeyResultIterator extends MergeSortResultIterator {
+ private final int keyOffset;
+
+ public MergeSortRowKeyResultIterator(ResultIterators iterators) {
+ this(iterators, 0);
+ }
+
+ public MergeSortRowKeyResultIterator(ResultIterators iterators, int keyOffset) {
+ super(iterators);
+ this.keyOffset = keyOffset;
+ }
+
+ @Override
+ protected int compare(Tuple t1, Tuple t2) {
+ return TupleUtil.compare(t1, t2, tempPtr, keyOffset);
+ }
+
+ @Override
+ public void explain(List<String> planSteps) {
+ resultIterators.explain(planSteps);
+ planSteps.add("CLIENT MERGE SORT");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/iterate/MergeSortTopNResultIterator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/iterate/MergeSortTopNResultIterator.java b/src/main/java/org/apache/phoenix/iterate/MergeSortTopNResultIterator.java
new file mode 100644
index 0000000..77d1c62
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/iterate/MergeSortTopNResultIterator.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.OrderByExpression;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ *
+ * ResultIterator that does a merge sort on the list of iterators provided,
+ * returning the rows ordered by the OrderByExpression. The input
+ * iterators must be ordered by the OrderByExpression.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class MergeSortTopNResultIterator extends MergeSortResultIterator {
+
+ private final int limit;
+ private int count = 0;
+ private final List<OrderByExpression> orderByColumns;
+ private final ImmutableBytesWritable ptr1 = new ImmutableBytesWritable();
+ private final ImmutableBytesWritable ptr2 = new ImmutableBytesWritable();
+
+ public MergeSortTopNResultIterator(ResultIterators iterators, Integer limit, List<OrderByExpression> orderByColumns) {
+ super(iterators);
+ this.limit = limit == null ? -1 : limit;
+ this.orderByColumns = orderByColumns;
+ }
+
+ @Override
+ protected int compare(Tuple t1, Tuple t2) {
+ for (int i = 0; i < orderByColumns.size(); i++) {
+ OrderByExpression order = orderByColumns.get(i);
+ Expression orderExpr = order.getExpression();
+ boolean isNull1 = !orderExpr.evaluate(t1, ptr1) || ptr1.getLength() == 0;
+ boolean isNull2 = !orderExpr.evaluate(t2, ptr2) || ptr2.getLength() == 0;
+ if (isNull1 && isNull2) {
+ continue;
+ } else if (isNull1) {
+ return order.isNullsLast() ? 1 : -1;
+ } else if (isNull2) {
+ return order.isNullsLast() ? -1 : 1;
+ }
+ int cmp = ptr1.compareTo(ptr2);
+ if (cmp == 0) {
+ continue;
+ }
+ return order.isAscending() ? cmp : -cmp;
+ }
+ return 0;
+ }
+
+ @Override
+ public Tuple peek() throws SQLException {
+ if (limit >= 0 && count >= limit) {
+ return null;
+ }
+ return super.peek();
+ }
+
+ @Override
+ public Tuple next() throws SQLException {
+ if (limit >= 0 && count++ >= limit) {
+ return null;
+ }
+ return super.next();
+ }
+
+
+ @Override
+ public void explain(List<String> planSteps) {
+ resultIterators.explain(planSteps);
+ planSteps.add(" SERVER TOP " + limit + " ROW" + (limit == 1 ? "" : "S") + " SORTED BY " + orderByColumns.toString());
+ planSteps.add("CLIENT MERGE SORT");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java b/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java
new file mode 100644
index 0000000..2b7011a
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.expression.OrderByExpression;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+/**
+ * Result scanner that sorts aggregated rows by columns specified in the ORDER BY clause.
+ * <p>
+ * Note that currently the sort is entirely done in memory.
+ *
+ * @author syyang
+ * @since 0.1
+ */
+public class OrderedAggregatingResultIterator extends OrderedResultIterator implements AggregatingResultIterator {
+
+ public OrderedAggregatingResultIterator(AggregatingResultIterator delegate,
+ List<OrderByExpression> orderByExpressions,
+ int thresholdBytes, Integer limit) throws SQLException {
+ super (delegate, orderByExpressions, thresholdBytes, limit);
+ }
+
+ @Override
+ protected AggregatingResultIterator getDelegate() {
+ return (AggregatingResultIterator)super.getDelegate();
+ }
+
+ @Override
+ public Tuple next() throws SQLException {
+ Tuple tuple = super.next();
+ if (tuple != null) {
+ aggregate(tuple);
+ }
+ return tuple;
+ }
+
+ @Override
+ public void aggregate(Tuple result) {
+ getDelegate().aggregate(result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java b/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
new file mode 100644
index 0000000..288c465
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
@@ -0,0 +1,254 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkPositionIndex;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Ordering;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.OrderByExpression;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.SizedUtil;
+
+/**
+ * Result scanner that sorts aggregated rows by columns specified in the ORDER BY clause.
+ * <p>
+ * Note that currently the sort is entirely done in memory.
+ *
+ * @author syyang, jtaylor
+ * @since 0.1
+ */
+public class OrderedResultIterator implements PeekingResultIterator {
+
+ /** A container that holds pointers to a {@link Result} and its sort keys. */
+ protected static class ResultEntry {
+ protected final ImmutableBytesWritable[] sortKeys;
+ protected final Tuple result;
+
+ ResultEntry(ImmutableBytesWritable[] sortKeys, Tuple result) {
+ this.sortKeys = sortKeys;
+ this.result = result;
+ }
+
+ ImmutableBytesWritable getSortKey(int index) {
+ checkPositionIndex(index, sortKeys.length);
+ return sortKeys[index];
+ }
+
+ Tuple getResult() {
+ return result;
+ }
+ }
+
+ /** A function that returns Nth key for a given {@link ResultEntry}. */
+ private static class NthKey implements Function<ResultEntry, ImmutableBytesWritable> {
+ private final int index;
+
+ NthKey(int index) {
+ this.index = index;
+ }
+ @Override
+ public ImmutableBytesWritable apply(ResultEntry entry) {
+ return entry.getSortKey(index);
+ }
+ }
+
+ /** Returns the expression of a given {@link OrderByExpression}. */
+ private static final Function<OrderByExpression, Expression> TO_EXPRESSION = new Function<OrderByExpression, Expression>() {
+ @Override
+ public Expression apply(OrderByExpression column) {
+ return column.getExpression();
+ }
+ };
+
+ private final int thresholdBytes;
+ private final Integer limit;
+ private final ResultIterator delegate;
+ private final List<OrderByExpression> orderByExpressions;
+ private final long estimatedByteSize;
+
+ private PeekingResultIterator resultIterator;
+ private long byteSize;
+
+ protected ResultIterator getDelegate() {
+ return delegate;
+ }
+
+ public OrderedResultIterator(ResultIterator delegate,
+ List<OrderByExpression> orderByExpressions,
+ int thresholdBytes, Integer limit) {
+ this(delegate, orderByExpressions, thresholdBytes, limit, 0);
+ }
+
+ public OrderedResultIterator(ResultIterator delegate,
+ List<OrderByExpression> orderByExpressions, int thresholdBytes) throws SQLException {
+ this(delegate, orderByExpressions, thresholdBytes, null);
+ }
+
+ public OrderedResultIterator(ResultIterator delegate, List<OrderByExpression> orderByExpressions,
+ int thresholdBytes, Integer limit, int estimatedRowSize) {
+ checkArgument(!orderByExpressions.isEmpty());
+ this.delegate = delegate;
+ this.orderByExpressions = orderByExpressions;
+ this.thresholdBytes = thresholdBytes;
+ this.limit = limit;
+ long estimatedEntrySize =
+ // ResultEntry
+ SizedUtil.OBJECT_SIZE +
+ // ImmutableBytesWritable[]
+ SizedUtil.ARRAY_SIZE + orderByExpressions.size() * SizedUtil.IMMUTABLE_BYTES_WRITABLE_SIZE +
+ // Tuple
+ SizedUtil.OBJECT_SIZE + estimatedRowSize;
+
+ // Make sure we don't overflow Long, though this is really unlikely to happen.
+ assert(limit == null || Long.MAX_VALUE / estimatedEntrySize >= limit);
+
+ this.estimatedByteSize = limit == null ? 0 : limit * estimatedEntrySize;
+ }
+
+ public Integer getLimit() {
+ return limit;
+ }
+
+ public long getEstimatedByteSize() {
+ return estimatedByteSize;
+ }
+
+ public long getByteSize() {
+ return byteSize;
+ }
+ /**
+ * Builds a comparator from the list of columns in ORDER BY clause.
+ * @param orderByExpressions the columns in ORDER BY clause.
+ * @return the comparator built from the list of columns in ORDER BY clause.
+ */
+ // ImmutableBytesWritable.Comparator doesn't implement generics
+ @SuppressWarnings("unchecked")
+ private static Comparator<ResultEntry> buildComparator(List<OrderByExpression> orderByExpressions) {
+ Ordering<ResultEntry> ordering = null;
+ int pos = 0;
+ for (OrderByExpression col : orderByExpressions) {
+ Ordering<ImmutableBytesWritable> o = Ordering.from(new ImmutableBytesWritable.Comparator());
+ if(!col.isAscending()) o = o.reverse();
+ o = col.isNullsLast() ? o.nullsLast() : o.nullsFirst();
+ Ordering<ResultEntry> entryOrdering = o.onResultOf(new NthKey(pos++));
+ ordering = ordering == null ? entryOrdering : ordering.compound(entryOrdering);
+ }
+ return ordering;
+ }
+
+ @Override
+ public Tuple next() throws SQLException {
+ return getResultIterator().next();
+ }
+
+ private PeekingResultIterator getResultIterator() throws SQLException {
+ if (resultIterator != null) {
+ return resultIterator;
+ }
+
+ final int numSortKeys = orderByExpressions.size();
+ List<Expression> expressions = Lists.newArrayList(Collections2.transform(orderByExpressions, TO_EXPRESSION));
+ final Comparator<ResultEntry> comparator = buildComparator(orderByExpressions);
+ try{
+ final MappedByteBufferSortedQueue queueEntries = new MappedByteBufferSortedQueue(comparator, limit, thresholdBytes);
+ resultIterator = new PeekingResultIterator() {
+ int count = 0;
+ @Override
+ public Tuple next() throws SQLException {
+ ResultEntry entry = queueEntries.poll();
+ if (entry == null || (limit != null && ++count > limit)) {
+ resultIterator = PeekingResultIterator.EMPTY_ITERATOR;
+ return null;
+ }
+ return entry.getResult();
+ }
+
+ @Override
+ public Tuple peek() throws SQLException {
+ if (limit != null && count > limit) {
+ return null;
+ }
+ ResultEntry entry = queueEntries.peek();
+ if (entry == null) {
+ return null;
+ }
+ return entry.getResult();
+ }
+
+ @Override
+ public void explain(List<String> planSteps) {
+ }
+
+ @Override
+ public void close() throws SQLException {
+ queueEntries.close();
+ }
+ };
+ for (Tuple result = delegate.next(); result != null; result = delegate.next()) {
+ int pos = 0;
+ ImmutableBytesWritable[] sortKeys = new ImmutableBytesWritable[numSortKeys];
+ for (Expression expression : expressions) {
+ final ImmutableBytesWritable sortKey = new ImmutableBytesWritable();
+ boolean evaluated = expression.evaluate(result, sortKey);
+ // set the sort key that failed to get evaluated with null
+ sortKeys[pos++] = evaluated && sortKey.getLength() > 0 ? sortKey : null;
+ }
+ queueEntries.add(new ResultEntry(sortKeys, result));
+ }
+ this.byteSize = queueEntries.getByteSize();
+ } catch (IOException e) {
+ throw new SQLException("", e);
+ } finally {
+ delegate.close();
+ }
+
+ return resultIterator;
+ }
+
+ @Override
+ public Tuple peek() throws SQLException {
+ return getResultIterator().peek();
+ }
+
+ @Override
+ public void close() {
+ resultIterator = PeekingResultIterator.EMPTY_ITERATOR;
+ }
+
+
+ @Override
+ public void explain(List<String> planSteps) {
+ delegate.explain(planSteps);
+ planSteps.add("CLIENT" + (limit == null ? "" : " TOP " + limit + " ROW" + (limit == 1 ? "" : "S")) + " SORTED BY " + orderByExpressions.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/iterate/ParallelIteratorRegionSplitter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/iterate/ParallelIteratorRegionSplitter.java b/src/main/java/org/apache/phoenix/iterate/ParallelIteratorRegionSplitter.java
new file mode 100644
index 0000000..a14b1e8
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/iterate/ParallelIteratorRegionSplitter.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.query.KeyRange;
+
+
+/**
+ * Interface for strategies determining how to split regions in ParallelIterators.
+ *
+ * @author zhuang
+ */
+public interface ParallelIteratorRegionSplitter {
+
+ public List<KeyRange> getSplits() throws SQLException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/iterate/ParallelIteratorRegionSplitterFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/iterate/ParallelIteratorRegionSplitterFactory.java b/src/main/java/org/apache/phoenix/iterate/ParallelIteratorRegionSplitterFactory.java
new file mode 100644
index 0000000..88efc44
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/iterate/ParallelIteratorRegionSplitterFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.parse.HintNode;
+import org.apache.phoenix.schema.TableRef;
+
+
+/**
+ * Factory class for the Region Splitter used by the project.
+ */
+public class ParallelIteratorRegionSplitterFactory {
+
+ public static ParallelIteratorRegionSplitter getSplitter(StatementContext context, TableRef table, HintNode hintNode) throws SQLException {
+ if (context.getScanRanges().useSkipScanFilter()) {
+ return SkipRangeParallelIteratorRegionSplitter.getInstance(context, table, hintNode);
+ }
+ return DefaultParallelIteratorRegionSplitter.getInstance(context, table, hintNode);
+ }
+}