You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 23:15:41 UTC
[25/51] [partial] Initial commit of master branch from github
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
new file mode 100644
index 0000000..3f56bb8
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.index;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+
+import com.google.common.collect.Multimap;
+import org.apache.hadoop.hbase.index.table.HTableInterfaceReference;
+import org.apache.hadoop.hbase.index.write.KillServerOnFailurePolicy;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
+import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.util.SchemaUtil;
+
+/**
+ *
+ * Handler called in the event that index updates cannot be written to their
+ * region server. First attempts to disable the index and failing that falls
+ * back to the default behavior of killing the region server.
+ *
+ * TODO: use delegate pattern instead
+ *
+ * @author jtaylor
+ * @since 2.1
+ */
+public class PhoenixIndexFailurePolicy extends KillServerOnFailurePolicy {
+ private static final Log LOG = LogFactory.getLog(PhoenixIndexFailurePolicy.class);
+ private RegionCoprocessorEnvironment env;
+
+ public PhoenixIndexFailurePolicy() {
+ }
+
+ @Override
+ public void setup(Stoppable parent, RegionCoprocessorEnvironment env) {
+ super.setup(parent, env);
+ this.env = env;
+ }
+
+ @Override
+ public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException {
+ Set<HTableInterfaceReference> refs = attempted.asMap().keySet();
+ StringBuilder buf = new StringBuilder("Disabled index" + (refs.size() > 1 ? "es " : " "));
+ try {
+ for (HTableInterfaceReference ref : refs) {
+ // Disable the index by using the updateIndexState method of MetaDataProtocol end point coprocessor.
+ String indexTableName = ref.getTableName();
+ byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName);
+ HTableInterface systemTable = env.getTable(PhoenixDatabaseMetaData.TYPE_TABLE_NAME_BYTES);
+ MetaDataProtocol mdProxy = systemTable.coprocessorProxy(MetaDataProtocol.class, indexTableKey);
+ // Mimic the Put that gets generated by the client on an update of the index state
+ Put put = new Put(indexTableKey);
+ put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES, PIndexState.DISABLE.getSerializedBytes());
+ List<Mutation> tableMetadata = Collections.<Mutation>singletonList(put);
+ MetaDataMutationResult result = mdProxy.updateIndexState(tableMetadata);
+ if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
+ LOG.warn("Attempt to disable index " + indexTableName + " failed with code = " + result.getMutationCode() + ". Will use default failure policy instead.");
+ super.handleFailure(attempted, cause);
+ }
+ LOG.info("Successfully disabled index " + indexTableName);
+ buf.append(indexTableName);
+ buf.append(',');
+ }
+ buf.setLength(buf.length()-1);
+ buf.append(" due to an exception while writing updates");
+ } catch (Throwable t) {
+ super.handleFailure(attempted, cause);
+ }
+ throw new DoNotRetryIOException(buf.toString(), cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/AggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/AggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/AggregatingResultIterator.java
new file mode 100644
index 0000000..237a89a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/AggregatingResultIterator.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+/**
+ *
+ * Interface for scanners that either do aggregation
+ * or delegate to scanners that do aggregation.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public interface AggregatingResultIterator extends ResultIterator {
+ /**
+ * Provides a means of re-aggregating a result row. For
+ * scanners that need to look ahead (i.e. {@link org.apache.phoenix.iterate.OrderedAggregatingResultIterator}
+ * @param result the row to re-aggregate
+ */
+ void aggregate(Tuple result);
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterator.java
new file mode 100644
index 0000000..ed6c360
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterator.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ *
+ * Abstract base class for ResultIterator implementations that
+ * do nothing on close and have no explain plan steps
+ *
+ * @author jtaylor
+ * @since 1.2
+ */
+public abstract class BaseResultIterator implements ResultIterator {
+
+ @Override
+ public void close() throws SQLException {
+ }
+
+ @Override
+ public void explain(List<String> planSteps) {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
new file mode 100644
index 0000000..08b4639
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+/**
+ *
+ * Result iterator that concatenates a list of other iterators.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ConcatResultIterator implements PeekingResultIterator {
+ private final ResultIterators resultIterators;
+ private List<PeekingResultIterator> iterators;
+ private int index;
+
+ public ConcatResultIterator(ResultIterators iterators) {
+ this.resultIterators = iterators;
+ }
+
+ private List<PeekingResultIterator> getIterators() throws SQLException {
+ if (iterators == null) {
+ iterators = resultIterators.getIterators();
+ }
+ return iterators;
+ }
+
+ @Override
+ public void close() throws SQLException {
+ if (iterators != null) {
+ for (;index < iterators.size(); index++) {
+ PeekingResultIterator iterator = iterators.get(index);
+ iterator.close();
+ }
+ }
+ }
+
+
+ @Override
+ public void explain(List<String> planSteps) {
+ resultIterators.explain(planSteps);
+ }
+
+ private PeekingResultIterator currentIterator() throws SQLException {
+ List<PeekingResultIterator> iterators = getIterators();
+ while (index < iterators.size()) {
+ PeekingResultIterator iterator = iterators.get(index);
+ Tuple r = iterator.peek();
+ if (r != null) {
+ return iterator;
+ }
+ iterator.close();
+ index++;
+ }
+ return EMPTY_ITERATOR;
+ }
+
+ @Override
+ public Tuple peek() throws SQLException {
+ return currentIterator().peek();
+ }
+
+ @Override
+ public Tuple next() throws SQLException {
+ return currentIterator().next();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
new file mode 100644
index 0000000..ab194f6
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
@@ -0,0 +1,229 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.parse.HintNode;
+import org.apache.phoenix.parse.HintNode.Hint;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.query.StatsManager;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.ReadOnlyProps;
+
+
+/**
+ * Default strategy for splitting regions in ParallelIterator. Refactored from the
+ * original version.
+ *
+ * @author jtaylor
+ * @author zhuang
+ */
+public class DefaultParallelIteratorRegionSplitter implements ParallelIteratorRegionSplitter {
+
+ protected final int targetConcurrency;
+ protected final int maxConcurrency;
+ protected final int maxIntraRegionParallelization;
+ protected final StatementContext context;
+ protected final TableRef tableRef;
+
+ public static DefaultParallelIteratorRegionSplitter getInstance(StatementContext context, TableRef table, HintNode hintNode) {
+ return new DefaultParallelIteratorRegionSplitter(context, table, hintNode);
+ }
+
+ protected DefaultParallelIteratorRegionSplitter(StatementContext context, TableRef table, HintNode hintNode) {
+ this.context = context;
+ this.tableRef = table;
+ ReadOnlyProps props = context.getConnection().getQueryServices().getProps();
+ this.targetConcurrency = props.getInt(QueryServices.TARGET_QUERY_CONCURRENCY_ATTRIB,
+ QueryServicesOptions.DEFAULT_TARGET_QUERY_CONCURRENCY);
+ this.maxConcurrency = props.getInt(QueryServices.MAX_QUERY_CONCURRENCY_ATTRIB,
+ QueryServicesOptions.DEFAULT_MAX_QUERY_CONCURRENCY);
+ Preconditions.checkArgument(targetConcurrency >= 1, "Invalid target concurrency: " + targetConcurrency);
+ Preconditions.checkArgument(maxConcurrency >= targetConcurrency , "Invalid max concurrency: " + maxConcurrency);
+ this.maxIntraRegionParallelization = hintNode.hasHint(Hint.NO_INTRA_REGION_PARALLELIZATION) ? 1 : props.getInt(QueryServices.MAX_INTRA_REGION_PARALLELIZATION_ATTRIB,
+ QueryServicesOptions.DEFAULT_MAX_INTRA_REGION_PARALLELIZATION);
+ Preconditions.checkArgument(maxIntraRegionParallelization >= 1 , "Invalid max intra region parallelization: " + maxIntraRegionParallelization);
+ }
+
+ // Get the mapping between key range and the regions that contains them.
+ protected List<HRegionLocation> getAllRegions() throws SQLException {
+ Scan scan = context.getScan();
+ PTable table = tableRef.getTable();
+ List<HRegionLocation> allTableRegions = context.getConnection().getQueryServices().getAllTableRegions(table.getPhysicalName().getBytes());
+ // If we're not salting, then we've already intersected the minMaxRange with the scan range
+ // so there's nothing to do here.
+ return filterRegions(allTableRegions, scan.getStartRow(), scan.getStopRow());
+ }
+
+ /**
+ * Filters out regions that intersect with key range specified by the startKey and stopKey
+ * @param allTableRegions all region infos for a given table
+ * @param startKey the lower bound of key range, inclusive
+ * @param stopKey the upper bound of key range, inclusive
+ * @return regions that intersect with the key range given by the startKey and stopKey
+ */
+ // exposed for tests
+ public static List<HRegionLocation> filterRegions(List<HRegionLocation> allTableRegions, byte[] startKey, byte[] stopKey) {
+ Iterable<HRegionLocation> regions;
+ final KeyRange keyRange = KeyRange.getKeyRange(startKey, true, stopKey, false);
+ if (keyRange == KeyRange.EVERYTHING_RANGE) {
+ return allTableRegions;
+ }
+
+ regions = Iterables.filter(allTableRegions, new Predicate<HRegionLocation>() {
+ @Override
+ public boolean apply(HRegionLocation location) {
+ KeyRange regionKeyRange = KeyRange.getKeyRange(location.getRegionInfo().getStartKey(), location.getRegionInfo().getEndKey());
+ return keyRange.intersect(regionKeyRange) != KeyRange.EMPTY_RANGE;
+ }
+ });
+ return Lists.newArrayList(regions);
+ }
+
+ protected List<KeyRange> genKeyRanges(List<HRegionLocation> regions) {
+ if (regions.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ StatsManager statsManager = context.getConnection().getQueryServices().getStatsManager();
+ // the splits are computed as follows:
+ //
+ // let's suppose:
+ // t = target concurrency
+ // m = max concurrency
+ // r = the number of regions we need to scan
+ //
+ // if r >= t:
+ // scan using regional boundaries
+ // elif r > t/2:
+ // split each region in s splits such that:
+ // s = max(x) where s * x < m
+ // else:
+ // split each region in s splits such that:
+ // s = max(x) where s * x < t
+ //
+ // The idea is to align splits with region boundaries. If rows are not evenly
+ // distributed across regions, using this scheme compensates for regions that
+ // have more rows than others, by applying tighter splits and therefore spawning
+ // off more scans over the overloaded regions.
+ int splitsPerRegion = regions.size() >= targetConcurrency ? 1 : (regions.size() > targetConcurrency / 2 ? maxConcurrency : targetConcurrency) / regions.size();
+ splitsPerRegion = Math.min(splitsPerRegion, maxIntraRegionParallelization);
+ // Create a multi-map of ServerName to List<KeyRange> which we'll use to round robin from to ensure
+ // that we keep each region server busy for each query.
+ ListMultimap<HRegionLocation,KeyRange> keyRangesPerRegion = ArrayListMultimap.create(regions.size(),regions.size() * splitsPerRegion);;
+ if (splitsPerRegion == 1) {
+ for (HRegionLocation region : regions) {
+ keyRangesPerRegion.put(region, ParallelIterators.TO_KEY_RANGE.apply(region));
+ }
+ } else {
+ // Maintain bucket for each server and then returns KeyRanges in round-robin
+ // order to ensure all servers are utilized.
+ for (HRegionLocation region : regions) {
+ byte[] startKey = region.getRegionInfo().getStartKey();
+ byte[] stopKey = region.getRegionInfo().getEndKey();
+ boolean lowerUnbound = Bytes.compareTo(startKey, HConstants.EMPTY_START_ROW) == 0;
+ boolean upperUnbound = Bytes.compareTo(stopKey, HConstants.EMPTY_END_ROW) == 0;
+ /*
+ * If lower/upper unbound, get the min/max key from the stats manager.
+ * We use this as the boundary to split on, but we still use the empty
+ * byte as the boundary in the actual scan (in case our stats are out
+ * of date).
+ */
+ if (lowerUnbound) {
+ startKey = statsManager.getMinKey(tableRef);
+ if (startKey == null) {
+ keyRangesPerRegion.put(region,ParallelIterators.TO_KEY_RANGE.apply(region));
+ continue;
+ }
+ }
+ if (upperUnbound) {
+ stopKey = statsManager.getMaxKey(tableRef);
+ if (stopKey == null) {
+ keyRangesPerRegion.put(region,ParallelIterators.TO_KEY_RANGE.apply(region));
+ continue;
+ }
+ }
+
+ byte[][] boundaries = null;
+ // Both startKey and stopKey will be empty the first time
+ if (Bytes.compareTo(startKey, stopKey) >= 0 || (boundaries = Bytes.split(startKey, stopKey, splitsPerRegion - 1)) == null) {
+ // Bytes.split may return null if the key space
+ // between start and end key is too small
+ keyRangesPerRegion.put(region,ParallelIterators.TO_KEY_RANGE.apply(region));
+ } else {
+ keyRangesPerRegion.put(region,KeyRange.getKeyRange(lowerUnbound ? KeyRange.UNBOUND : boundaries[0], boundaries[1]));
+ if (boundaries.length > 1) {
+ for (int i = 1; i < boundaries.length-2; i++) {
+ keyRangesPerRegion.put(region,KeyRange.getKeyRange(boundaries[i], true, boundaries[i+1], false));
+ }
+ keyRangesPerRegion.put(region,KeyRange.getKeyRange(boundaries[boundaries.length-2], true, upperUnbound ? KeyRange.UNBOUND : boundaries[boundaries.length-1], false));
+ }
+ }
+ }
+ }
+ List<KeyRange> splits = Lists.newArrayListWithCapacity(regions.size() * splitsPerRegion);
+ // as documented for ListMultimap
+ Collection<Collection<KeyRange>> values = keyRangesPerRegion.asMap().values();
+ List<Collection<KeyRange>> keyRangesList = Lists.newArrayList(values);
+ // Randomize range order to ensure that we don't hit the region servers in the same order every time
+ // thus helping to distribute the load more evenly
+ Collections.shuffle(keyRangesList);
+ // Transpose values in map to get regions in round-robin server order. This ensures that
+ // all servers will be used to process the set of parallel threads available in our executor.
+ int i = 0;
+ boolean done;
+ do {
+ done = true;
+ for (int j = 0; j < keyRangesList.size(); j++) {
+ List<KeyRange> keyRanges = (List<KeyRange>)keyRangesList.get(j);
+ if (i < keyRanges.size()) {
+ splits.add(keyRanges.get(i));
+ done = false;
+ }
+ }
+ i++;
+ } while (!done);
+ return splits;
+ }
+
+ @Override
+ public List<KeyRange> getSplits() throws SQLException {
+ return genKeyRanges(getAllRegions());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/DelegateResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DelegateResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DelegateResultIterator.java
new file mode 100644
index 0000000..25df5ad
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DelegateResultIterator.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+public class DelegateResultIterator implements ResultIterator {
+ private final ResultIterator delegate;
+
+ public DelegateResultIterator(ResultIterator delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void close() throws SQLException {
+ delegate.close();
+ }
+
+ @Override
+ public Tuple next() throws SQLException {
+ return delegate.next();
+ }
+
+ @Override
+ public void explain(List<String> planSteps) {
+ delegate.explain(planSteps);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java
new file mode 100644
index 0000000..f7af26c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.*;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
+import org.apache.phoenix.compile.ColumnProjector;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * Result scanner that dedups the incoming tuples to make them distinct.
+ * <p>
+ * Note that the results are held in memory
+ *
+ * @author jtaylor
+ * @since 1.2
+ */
+public class DistinctAggregatingResultIterator implements AggregatingResultIterator {
+ private final AggregatingResultIterator delegate;
+ private final RowProjector rowProjector;
+ private Iterator<ResultEntry> resultIterator;
+ private final ImmutableBytesWritable ptr1 = new ImmutableBytesWritable();
+ private final ImmutableBytesWritable ptr2 = new ImmutableBytesWritable();
+
+ private class ResultEntry {
+ private final int hashCode;
+ private final Tuple result;
+
+ ResultEntry(Tuple result) {
+ final int prime = 31;
+ this.result = result;
+ int hashCode = 0;
+ for (ColumnProjector column : rowProjector.getColumnProjectors()) {
+ Expression e = column.getExpression();
+ if (e.evaluate(this.result, ptr1)) {
+ hashCode = prime * hashCode + ptr1.hashCode();
+ }
+ }
+ this.hashCode = hashCode;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ if (o == null) {
+ return false;
+ }
+ if (o.getClass() != this.getClass()) {
+ return false;
+ }
+ ResultEntry that = (ResultEntry) o;
+ for (ColumnProjector column : rowProjector.getColumnProjectors()) {
+ Expression e = column.getExpression();
+ boolean isNull1 = !e.evaluate(this.result, ptr1);
+ boolean isNull2 = !e.evaluate(that.result, ptr2);
+ if (isNull1 && isNull2) {
+ return true;
+ }
+ if (isNull1 || isNull2) {
+ return false;
+ }
+ if (ptr1.compareTo(ptr2) != 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return hashCode;
+ }
+
+ Tuple getResult() {
+ return result;
+ }
+ }
+
+ protected ResultIterator getDelegate() {
+ return delegate;
+ }
+
+ public DistinctAggregatingResultIterator(AggregatingResultIterator delegate,
+ RowProjector rowProjector) {
+ this.delegate = delegate;
+ this.rowProjector = rowProjector;
+ }
+
+ @Override
+ public Tuple next() throws SQLException {
+ Iterator<ResultEntry> iterator = getResultIterator();
+ if (iterator.hasNext()) {
+ ResultEntry entry = iterator.next();
+ Tuple tuple = entry.getResult();
+ aggregate(tuple);
+ return tuple;
+ }
+ resultIterator = Iterators.emptyIterator();
+ return null;
+ }
+
+ private Iterator<ResultEntry> getResultIterator() throws SQLException {
+ if (resultIterator != null) {
+ return resultIterator;
+ }
+
+ Set<ResultEntry> entries = Sets.<ResultEntry>newHashSet(); // TODO: size?
+ try {
+ for (Tuple result = delegate.next(); result != null; result = delegate.next()) {
+ ResultEntry entry = new ResultEntry(result);
+ entries.add(entry);
+ }
+ } finally {
+ delegate.close();
+ }
+
+ resultIterator = entries.iterator();
+ return resultIterator;
+ }
+
+ @Override
+ public void close() {
+ resultIterator = Iterators.emptyIterator();
+ }
+
+
+ @Override
+ public void explain(List<String> planSteps) {
+ delegate.explain(planSteps);
+ planSteps.add("CLIENT DISTINCT ON " + rowProjector.toString());
+ }
+
+ @Override
+ public void aggregate(Tuple result) {
+ delegate.aggregate(result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
new file mode 100644
index 0000000..f32b377
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
@@ -0,0 +1,271 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.text.Format;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.collect.Iterators;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.KeyRange.Bound;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.StringUtil;
+
+
+public abstract class ExplainTable {
+ private static final List<KeyRange> EVERYTHING = Collections.singletonList(KeyRange.EVERYTHING_RANGE);
+ protected final StatementContext context;
+ protected final TableRef tableRef;
+ protected final GroupBy groupBy;
+
+ public ExplainTable(StatementContext context, TableRef table) {
+ this(context,table,GroupBy.EMPTY_GROUP_BY);
+ }
+
+ public ExplainTable(StatementContext context, TableRef table, GroupBy groupBy) {
+ this.context = context;
+ this.tableRef = table;
+ this.groupBy = groupBy;
+ }
+
+ private boolean explainSkipScan(StringBuilder buf) {
+ ScanRanges scanRanges = context.getScanRanges();
+ if (scanRanges.useSkipScanFilter()) {
+ buf.append("SKIP SCAN ");
+ int count = 1;
+ boolean hasRanges = false;
+ for (List<KeyRange> ranges : scanRanges.getRanges()) {
+ count *= ranges.size();
+ for (KeyRange range : ranges) {
+ hasRanges |= !range.isSingleKey();
+ }
+ }
+ buf.append("ON ");
+ buf.append(count);
+ buf.append(hasRanges ? " RANGE" : " KEY");
+ buf.append(count > 1 ? "S " : " ");
+ return true;
+ } else {
+ buf.append("RANGE SCAN ");
+ }
+ return false;
+ }
+
+ protected void explain(String prefix, List<String> planSteps) {
+ StringBuilder buf = new StringBuilder(prefix);
+ ScanRanges scanRanges = context.getScanRanges();
+ boolean hasSkipScanFilter = false;
+ if (scanRanges.isEverything()) {
+ buf.append("FULL SCAN ");
+ } else {
+ hasSkipScanFilter = explainSkipScan(buf);
+ }
+ buf.append("OVER " + tableRef.getTable().getName().getString());
+ appendKeyRanges(buf);
+ planSteps.add(buf.toString());
+
+ Scan scan = context.getScan();
+ Filter filter = scan.getFilter();
+ PageFilter pageFilter = null;
+ if (filter != null) {
+ int offset = 0;
+ boolean hasFirstKeyOnlyFilter = false;
+ String filterDesc = "";
+ if (hasSkipScanFilter) {
+ if (filter instanceof FilterList) {
+ List<Filter> filterList = ((FilterList) filter).getFilters();
+ if (filterList.get(0) instanceof FirstKeyOnlyFilter) {
+ hasFirstKeyOnlyFilter = true;
+ offset = 1;
+ }
+ if (filterList.size() > offset+1) {
+ filterDesc = filterList.get(offset+1).toString();
+ if (filterList.size() > offset+2) {
+ pageFilter = (PageFilter) filterList.get(offset+2);
+ }
+ }
+ }
+ } else if (filter instanceof FilterList) {
+ List<Filter> filterList = ((FilterList) filter).getFilters();
+ if (filterList.get(0) instanceof FirstKeyOnlyFilter) {
+ hasFirstKeyOnlyFilter = true;
+ offset = 1;
+ }
+ if (filterList.size() > offset) {
+ filterDesc = filterList.get(offset).toString();
+ if (filterList.size() > offset+1) {
+ pageFilter = (PageFilter) filterList.get(offset+1);
+ }
+ }
+ } else {
+ if (filter instanceof FirstKeyOnlyFilter) {
+ hasFirstKeyOnlyFilter = true;
+ } else {
+ filterDesc = filter.toString();
+ }
+ }
+ if (filterDesc.length() > 0) {
+ planSteps.add(" SERVER FILTER BY " + (hasFirstKeyOnlyFilter ? "FIRST KEY ONLY AND " : "") + filterDesc);
+ } else if (hasFirstKeyOnlyFilter) {
+ planSteps.add(" SERVER FILTER BY FIRST KEY ONLY");
+ }
+ if (pageFilter != null) {
+ planSteps.add(" SERVER " + pageFilter.getPageSize() + " ROW LIMIT");
+ }
+ }
+ groupBy.explain(planSteps);
+ }
+
+ private void appendPKColumnValue(StringBuilder buf, byte[] range, Boolean isNull, int slotIndex) {
+ if (Boolean.TRUE.equals(isNull)) {
+ buf.append("null");
+ return;
+ }
+ if (Boolean.FALSE.equals(isNull)) {
+ buf.append("not null");
+ return;
+ }
+ if (range.length == 0) {
+ buf.append('*');
+ return;
+ }
+ ScanRanges scanRanges = context.getScanRanges();
+ PDataType type = scanRanges.getSchema().getField(slotIndex).getDataType();
+ ColumnModifier modifier = tableRef.getTable().getPKColumns().get(slotIndex).getColumnModifier();
+ if (modifier != null) {
+ buf.append('~');
+ range = modifier.apply(range, 0, new byte[range.length], 0, range.length);
+ }
+ Format formatter = context.getConnection().getFormatter(type);
+ buf.append(type.toStringLiteral(range, formatter));
+ }
+
+ private static class RowKeyValueIterator implements Iterator<byte[]> {
+ private final RowKeySchema schema;
+ private ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ private int position = 0;
+ private final int maxOffset;
+ private byte[] nextValue;
+
+ public RowKeyValueIterator(RowKeySchema schema, byte[] rowKey) {
+ this.schema = schema;
+ this.maxOffset = schema.iterator(rowKey, ptr);
+ iterate();
+ }
+
+ private void iterate() {
+ if (schema.next(ptr, position++, maxOffset) == null) {
+ nextValue = null;
+ } else {
+ nextValue = ptr.copyBytes();
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return nextValue != null;
+ }
+
+ @Override
+ public byte[] next() {
+ if (nextValue == null) {
+ throw new NoSuchElementException();
+ }
+ byte[] value = nextValue;
+ iterate();
+ return value;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
+ private void appendScanRow(StringBuilder buf, Bound bound) {
+ ScanRanges scanRanges = context.getScanRanges();
+ KeyRange minMaxRange = context.getMinMaxRange();
+ Iterator<byte[]> minMaxIterator = Iterators.emptyIterator();
+ if (minMaxRange != null) {
+ RowKeySchema schema = tableRef.getTable().getRowKeySchema();
+ if (!minMaxRange.isUnbound(bound)) {
+ minMaxIterator = new RowKeyValueIterator(schema, minMaxRange.getRange(bound));
+ }
+ }
+ int nRanges = scanRanges.getRanges().size();
+ for (int i = 0, minPos = 0; minPos < nRanges || minMaxIterator.hasNext(); i++) {
+ List<KeyRange> ranges = minPos >= nRanges ? EVERYTHING : scanRanges.getRanges().get(minPos++);
+ KeyRange range = bound == Bound.LOWER ? ranges.get(0) : ranges.get(ranges.size()-1);
+ byte[] b = range.getRange(bound);
+ Boolean isNull = KeyRange.IS_NULL_RANGE == range ? Boolean.TRUE : KeyRange.IS_NOT_NULL_RANGE == range ? Boolean.FALSE : null;
+ if (minMaxIterator.hasNext()) {
+ byte[] bMinMax = minMaxIterator.next();
+ int cmp = Bytes.compareTo(bMinMax, b) * (bound == Bound.LOWER ? 1 : -1);
+ if (cmp > 0) {
+ minPos = nRanges;
+ b = bMinMax;
+ isNull = null;
+ } else if (cmp < 0) {
+ minMaxIterator = Iterators.emptyIterator();
+ }
+ }
+ appendPKColumnValue(buf, b, isNull, i);
+ buf.append(',');
+ }
+ }
+
+ private void appendKeyRanges(StringBuilder buf) {
+ ScanRanges scanRanges = context.getScanRanges();
+ KeyRange minMaxRange = context.getMinMaxRange();
+ if (minMaxRange == null && (scanRanges == ScanRanges.EVERYTHING || scanRanges == ScanRanges.NOTHING)) {
+ return;
+ }
+ buf.append(" [");
+ StringBuilder buf1 = new StringBuilder();
+ appendScanRow(buf1, Bound.LOWER);
+ buf.append(buf1);
+ buf.setCharAt(buf.length()-1, ']');
+ StringBuilder buf2 = new StringBuilder();
+ appendScanRow(buf2, Bound.UPPER);
+ if (!StringUtil.equals(buf1, buf2)) {
+ buf.append( " - [");
+ buf.append(buf2);
+ }
+ buf.setCharAt(buf.length()-1, ']');
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java
new file mode 100644
index 0000000..d687480
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+/**
+ *
+ * Post aggregation filter for HAVING clause. Due to the way we cache aggregation values
+ * we cannot have a look ahead for this Iterator, because the expressions in the SELECT
+ * clause would return values for the peeked row instead of the current row. If we only
+ * use the Result argument in {@link org.apache.phoenix.expression.Expression}
+ * instead of our cached value in Aggregators, we could have a look ahead.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class FilterAggregatingResultIterator implements AggregatingResultIterator {
+ private final AggregatingResultIterator delegate;
+ private final Expression expression;
+ private final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+
+ public FilterAggregatingResultIterator(AggregatingResultIterator delegate, Expression expression) {
+ this.delegate = delegate;
+ this.expression = expression;
+ if (expression.getDataType() != PDataType.BOOLEAN) {
+ throw new IllegalArgumentException("FilterResultIterator requires a boolean expression, but got " + expression);
+ }
+ }
+
+ @Override
+ public Tuple next() throws SQLException {
+ Tuple next;
+ do {
+ next = delegate.next();
+ } while (next != null && expression.evaluate(next, ptr) && Boolean.FALSE.equals(expression.getDataType().toObject(ptr)));
+ return next;
+ }
+
+ @Override
+ public void close() throws SQLException {
+ delegate.close();
+ }
+
+ @Override
+ public void aggregate(Tuple result) {
+ delegate.aggregate(result);
+ }
+
+ @Override
+ public void explain(List<String> planSteps) {
+ delegate.explain(planSteps);
+ planSteps.add("CLIENT FILTER BY " + expression.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterResultIterator.java
new file mode 100644
index 0000000..5518643
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterResultIterator.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+/**
+ *
+ * Result scanner that filters out rows based on the results of a boolean
+ * expression (i.e. filters out if {@link org.apache.phoenix.expression.Expression#evaluate(Tuple, ImmutableBytesWritable)}
+ * returns false or the ptr contains a FALSE value}). May not be used where
+ * the delegate provided is an {@link org.apache.phoenix.iterate.AggregatingResultIterator}.
+ * For these, the {@link org.apache.phoenix.iterate.FilterAggregatingResultIterator} should be used.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class FilterResultIterator extends LookAheadResultIterator {
+ private final ResultIterator delegate;
+ private final Expression expression;
+ private final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+
+ public FilterResultIterator(ResultIterator delegate, Expression expression) {
+ if (delegate instanceof AggregatingResultIterator) {
+ throw new IllegalArgumentException("FilterResultScanner may not be used with an aggregate delegate. Use phoenix.iterate.FilterAggregateResultScanner instead");
+ }
+ this.delegate = delegate;
+ this.expression = expression;
+ if (expression.getDataType() != PDataType.BOOLEAN) {
+ throw new IllegalArgumentException("FilterResultIterator requires a boolean expression, but got " + expression);
+ }
+ }
+
+ @Override
+ protected Tuple advance() throws SQLException {
+ Tuple next;
+ do {
+ next = delegate.next();
+ } while (next != null && expression.evaluate(next, ptr) && Boolean.FALSE.equals(expression.getDataType().toObject(ptr)));
+ return next;
+ }
+
+ @Override
+ public void close() throws SQLException {
+ delegate.close();
+ }
+
+ @Override
+ public void explain(List<String> planSteps) {
+ delegate.explain(planSteps);
+ planSteps.add("CLIENT FILTER BY " + expression.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java
new file mode 100644
index 0000000..7985699
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.*;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+
+
+/**
+ *
+ * Result scanner that aggregates the row count value for rows with duplicate keys.
+ * The rows from the backing result iterator must be in key sorted order. For example,
+ * given the following input:
+ * a 1
+ * a 2
+ * b 1
+ * b 3
+ * c 1
+ * the following will be output:
+ * a 3
+ * b 4
+ * c 1
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class GroupedAggregatingResultIterator implements AggregatingResultIterator {
+ private final ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
+ private final PeekingResultIterator resultIterator;
+ protected final Aggregators aggregators;
+
+ public GroupedAggregatingResultIterator( PeekingResultIterator resultIterator, Aggregators aggregators) {
+ if (resultIterator == null) throw new NullPointerException();
+ if (aggregators == null) throw new NullPointerException();
+ this.resultIterator = resultIterator;
+ this.aggregators = aggregators;
+ }
+
+ @Override
+ public Tuple next() throws SQLException {
+ Tuple result = resultIterator.next();
+ if (result == null) {
+ return null;
+ }
+ Aggregator[] rowAggregators = aggregators.getAggregators();
+ aggregators.reset(rowAggregators);
+ while (true) {
+ aggregators.aggregate(rowAggregators, result);
+ Tuple nextResult = resultIterator.peek();
+ if (nextResult == null || !TupleUtil.equals(result, nextResult, tempPtr)) {
+ break;
+ }
+ result = resultIterator.next();
+ }
+
+ byte[] value = aggregators.toBytes(rowAggregators);
+ result.getKey(tempPtr);
+ return new SingleKeyValueTuple(KeyValueUtil.newKeyValue(tempPtr, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+ }
+
+ @Override
+ public void close() throws SQLException {
+ resultIterator.close();
+ }
+
+ @Override
+ public void aggregate(Tuple result) {
+ Aggregator[] rowAggregators = aggregators.getAggregators();
+ aggregators.reset(rowAggregators);
+ aggregators.aggregate(rowAggregators, result);
+ }
+
+ @Override
+ public void explain(List<String> planSteps) {
+ resultIterator.explain(planSteps);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java
new file mode 100644
index 0000000..ecea92d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ *
+ * Iterates through tuples up to a limit
+ *
+ * @author jtaylor
+ * @since 1.2
+ */
+public class LimitingResultIterator extends DelegateResultIterator {
+ private int rowCount;
+ private final int limit;
+
+ public LimitingResultIterator(ResultIterator delegate, int limit) {
+ super(delegate);
+ this.limit = limit;
+ }
+
+ @Override
+ public Tuple next() throws SQLException {
+ if (rowCount++ >= limit) {
+ return null;
+ }
+ return super.next();
+ }
+
+ @Override
+ public void explain(List<String> planSteps) {
+ super.explain(planSteps);
+ planSteps.add("CLIENT " + limit + " ROW LIMIT");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
new file mode 100644
index 0000000..0e4b78b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+
+import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+abstract public class LookAheadResultIterator implements PeekingResultIterator {
+ private final static Tuple UNINITIALIZED = new ResultTuple();
+ private Tuple next = UNINITIALIZED;
+
+ abstract protected Tuple advance() throws SQLException;
+
+ private void init() throws SQLException {
+ if (next == UNINITIALIZED) {
+ next = advance();
+ }
+ }
+
+ @Override
+ public Tuple next() throws SQLException {
+ init();
+ Tuple next = this.next;
+ this.next = advance();
+ return next;
+ }
+
+ @Override
+ public Tuple peek() throws SQLException {
+ init();
+ return next;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java
new file mode 100644
index 0000000..1606ae6
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java
@@ -0,0 +1,389 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.util.AbstractQueue;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.collect.MinMaxPriorityQueue;
+import org.apache.phoenix.iterate.OrderedResultIterator.ResultEntry;
+import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+public class MappedByteBufferSortedQueue extends AbstractQueue<ResultEntry> {
+ private Comparator<ResultEntry> comparator;
+ private final int limit;
+ private final int thresholdBytes;
+ private List<MappedByteBufferPriorityQueue> queues = new ArrayList<MappedByteBufferPriorityQueue>();
+ private MappedByteBufferPriorityQueue currentQueue = null;
+ private int currentIndex = 0;
+ MinMaxPriorityQueue<IndexedResultEntry> mergedQueue = null;
+
+ public MappedByteBufferSortedQueue(Comparator<ResultEntry> comparator,
+ Integer limit, int thresholdBytes) throws IOException {
+ this.comparator = comparator;
+ this.limit = limit == null ? -1 : limit;
+ this.thresholdBytes = thresholdBytes;
+ this.currentQueue = new MappedByteBufferPriorityQueue(0,
+ this.limit, thresholdBytes, comparator);
+ this.queues.add(currentQueue);
+ }
+
+ @Override
+ public boolean offer(ResultEntry e) {
+ try {
+ boolean isFlush = this.currentQueue.writeResult(e);
+ if (isFlush) {
+ currentIndex++;
+ currentQueue = new MappedByteBufferPriorityQueue(currentIndex,
+ limit, thresholdBytes, comparator);
+ queues.add(currentQueue);
+ }
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+
+ return true;
+ }
+
+ @Override
+ public ResultEntry poll() {
+ if (mergedQueue == null) {
+ mergedQueue = MinMaxPriorityQueue.<ResultEntry> orderedBy(
+ comparator).maximumSize(queues.size()).create();
+ for (MappedByteBufferPriorityQueue queue : queues) {
+ try {
+ IndexedResultEntry next = queue.getNextResult();
+ if (next != null) {
+ mergedQueue.add(next);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ if (!mergedQueue.isEmpty()) {
+ IndexedResultEntry re = mergedQueue.pollFirst();
+ if (re != null) {
+ IndexedResultEntry next = null;
+ try {
+ next = queues.get(re.getIndex()).getNextResult();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ if (next != null) {
+ mergedQueue.add(next);
+ }
+ return re;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public ResultEntry peek() {
+ if (mergedQueue == null) {
+ mergedQueue = MinMaxPriorityQueue.<ResultEntry> orderedBy(
+ comparator).maximumSize(queues.size()).create();
+ for (MappedByteBufferPriorityQueue queue : queues) {
+ try {
+ IndexedResultEntry next = queue.getNextResult();
+ if (next != null) {
+ mergedQueue.add(next);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ if (!mergedQueue.isEmpty()) {
+ IndexedResultEntry re = mergedQueue.peekFirst();
+ if (re != null) {
+ return re;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Iterator<ResultEntry> iterator() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int size() {
+ int size = 0;
+ for (MappedByteBufferPriorityQueue queue : queues) {
+ size += queue.size();
+ }
+ return size;
+ }
+
+ public long getByteSize() {
+ return currentQueue.getInMemByteSize();
+ }
+
+ public void close() {
+ if (queues != null) {
+ for (MappedByteBufferPriorityQueue queue : queues) {
+ queue.close();
+ }
+ }
+ }
+
+ private static class IndexedResultEntry extends ResultEntry {
+ private int index;
+
+ public IndexedResultEntry(int index, ResultEntry resultEntry) {
+ super(resultEntry.sortKeys, resultEntry.result);
+ this.index = index;
+ }
+
+ public int getIndex() {
+ return this.index;
+ }
+ }
+
+ private static class MappedByteBufferPriorityQueue {
+ private static final long DEFAULT_MAPPING_SIZE = 1024;
+
+ private final int limit;
+ private final int thresholdBytes;
+ private long totalResultSize = 0;
+ private int maxResultSize = 0;
+ private long mappingSize = 0;
+ private long writeIndex = 0;
+ private long readIndex = 0;
+ private MappedByteBuffer writeBuffer;
+ private MappedByteBuffer readBuffer;
+ private FileChannel fc;
+ private RandomAccessFile af;
+ private File file;
+ private boolean isClosed = false;
+ MinMaxPriorityQueue<ResultEntry> results = null;
+ private boolean flushBuffer = false;
+ private int index;
+ private int flushedCount;
+
+ public MappedByteBufferPriorityQueue(int index, int limit, int thresholdBytes,
+ Comparator<ResultEntry> comparator) throws IOException {
+ this.index = index;
+ this.limit = limit;
+ this.thresholdBytes = thresholdBytes;
+ results = limit < 0 ?
+ MinMaxPriorityQueue.<ResultEntry> orderedBy(comparator).create()
+ : MinMaxPriorityQueue.<ResultEntry> orderedBy(comparator).maximumSize(limit).create();
+ }
+
+ public int size() {
+ if (flushBuffer)
+ return flushedCount;
+ return results.size();
+ }
+
+ public long getInMemByteSize() {
+ if (flushBuffer)
+ return 0;
+ return totalResultSize;
+ }
+
+ private List<KeyValue> toKeyValues(ResultEntry entry) {
+ Tuple result = entry.getResult();
+ int size = result.size();
+ List<KeyValue> kvs = new ArrayList<KeyValue>(size);
+ for (int i = 0; i < size; i++) {
+ kvs.add(result.getValue(i));
+ }
+ return kvs;
+ }
+
+ private int sizeof(List<KeyValue> kvs) {
+ int size = Bytes.SIZEOF_INT; // totalLen
+
+ for (KeyValue kv : kvs) {
+ size += kv.getLength();
+ size += Bytes.SIZEOF_INT; // kv.getLength
+ }
+
+ return size;
+ }
+
+ private int sizeof(ImmutableBytesWritable[] sortKeys) {
+ int size = Bytes.SIZEOF_INT;
+ if (sortKeys != null) {
+ for (ImmutableBytesWritable sortKey : sortKeys) {
+ if (sortKey != null) {
+ size += sortKey.getLength();
+ }
+ size += Bytes.SIZEOF_INT;
+ }
+ }
+ return size;
+ }
+
+ public boolean writeResult(ResultEntry entry) throws IOException {
+ if (flushBuffer)
+ throw new IOException("Results already flushed");
+
+ int sortKeySize = sizeof(entry.sortKeys);
+ int resultSize = sizeof(toKeyValues(entry)) + sortKeySize;
+ boolean added = results.add(entry);
+ if (added) {
+ maxResultSize = Math.max(maxResultSize, resultSize);
+ totalResultSize = limit < 0 ? (totalResultSize + resultSize) : maxResultSize * results.size();
+ if (totalResultSize >= thresholdBytes) {
+ this.file = File.createTempFile(UUID.randomUUID().toString(), null);
+ this.af = new RandomAccessFile(file, "rw");
+ this.fc = af.getChannel();
+ mappingSize = Math.min(Math.max(maxResultSize, DEFAULT_MAPPING_SIZE), totalResultSize);
+ writeBuffer = fc.map(MapMode.READ_WRITE, writeIndex, mappingSize);
+
+ int resSize = results.size();
+ for (int i = 0; i < resSize; i++) {
+ int totalLen = 0;
+ ResultEntry re = results.pollFirst();
+ List<KeyValue> keyValues = toKeyValues(re);
+ for (KeyValue kv : keyValues) {
+ totalLen += (kv.getLength() + Bytes.SIZEOF_INT);
+ }
+ writeBuffer.putInt(totalLen);
+ for (KeyValue kv : keyValues) {
+ writeBuffer.putInt(kv.getLength());
+ writeBuffer.put(kv.getBuffer(), kv.getOffset(), kv
+ .getLength());
+ }
+ ImmutableBytesWritable[] sortKeys = re.sortKeys;
+ writeBuffer.putInt(sortKeys.length);
+ for (ImmutableBytesWritable sortKey : sortKeys) {
+ if (sortKey != null) {
+ writeBuffer.putInt(sortKey.getLength());
+ writeBuffer.put(sortKey.get(), sortKey.getOffset(),
+ sortKey.getLength());
+ } else {
+ writeBuffer.putInt(0);
+ }
+ }
+ // buffer close to exhausted, re-map.
+ if (mappingSize - writeBuffer.position() < maxResultSize) {
+ writeIndex += writeBuffer.position();
+ writeBuffer = fc.map(MapMode.READ_WRITE, writeIndex, mappingSize);
+ }
+ }
+ writeBuffer.putInt(-1); // end
+ flushedCount = results.size();
+ results.clear();
+ flushBuffer = true;
+ }
+ }
+ return flushBuffer;
+ }
+
+ public IndexedResultEntry getNextResult() throws IOException {
+ if (isClosed)
+ return null;
+
+ if (!flushBuffer) {
+ ResultEntry re = results.poll();
+ if (re == null) {
+ reachedEnd();
+ return null;
+ }
+ return new IndexedResultEntry(index, re);
+ }
+
+ if (readBuffer == null) {
+ readBuffer = this.fc.map(MapMode.READ_ONLY, readIndex, mappingSize);
+ }
+
+ int length = readBuffer.getInt();
+ if (length < 0) {
+ reachedEnd();
+ return null;
+ }
+
+ byte[] rb = new byte[length];
+ readBuffer.get(rb);
+ Result result = new Result(new ImmutableBytesWritable(rb));
+ ResultTuple rt = new ResultTuple(result);
+ int sortKeySize = readBuffer.getInt();
+ ImmutableBytesWritable[] sortKeys = new ImmutableBytesWritable[sortKeySize];
+ for (int i = 0; i < sortKeySize; i++) {
+ int contentLength = readBuffer.getInt();
+ if (contentLength > 0) {
+ byte[] sortKeyContent = new byte[contentLength];
+ readBuffer.get(sortKeyContent);
+ sortKeys[i] = new ImmutableBytesWritable(sortKeyContent);
+ } else {
+ sortKeys[i] = null;
+ }
+ }
+ // buffer close to exhausted, re-map.
+ if (mappingSize - readBuffer.position() < maxResultSize) {
+ readIndex += readBuffer.position();
+ readBuffer = fc.map(MapMode.READ_ONLY, readIndex, mappingSize);
+ }
+
+ return new IndexedResultEntry(index, new ResultEntry(sortKeys, rt));
+ }
+
+ private void reachedEnd() {
+ this.isClosed = true;
+ if (this.fc != null) {
+ try {
+ this.fc.close();
+ } catch (IOException ignored) {
+ }
+ this.fc = null;
+ }
+ if (this.af != null) {
+ try {
+ this.af.close();
+ } catch (IOException ignored) {
+ }
+ this.af = null;
+ }
+ if (this.file != null) {
+ file.delete();
+ file = null;
+ }
+ }
+
+ public void close() {
+ if (!isClosed) {
+ this.reachedEnd();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedResultIterator.java
new file mode 100644
index 0000000..c509b2c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedResultIterator.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.*;
+
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+/**
+ *
+ * Fully materialized result iterator backed by the result list provided.
+ * No copy is made of the backing results collection.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class MaterializedResultIterator implements PeekingResultIterator {
+ private final PeekingCollectionIterator iterator;
+
+ public MaterializedResultIterator(Collection<Tuple> results) {
+ iterator = new PeekingCollectionIterator(results);
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public Tuple next() throws SQLException {
+ return iterator.nextOrNull();
+ }
+
+ @Override
+ public Tuple peek() throws SQLException {
+ return iterator.peek();
+ }
+
+ private static class PeekingCollectionIterator implements Iterator<Tuple> {
+ private final Iterator<Tuple> iterator;
+ private Tuple current;
+
+ private PeekingCollectionIterator(Collection<Tuple> results) {
+ iterator = results.iterator();
+ advance();
+ }
+
+ private Tuple advance() {
+ if (iterator.hasNext()) {
+ current = iterator.next();
+ } else {
+ current = null;
+ }
+ return current;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return current != null;
+ }
+
+ @Override
+ public Tuple next() {
+ Tuple next = nextOrNull();
+ if (next == null) {
+ throw new NoSuchElementException();
+ }
+ return next;
+ }
+
+ public Tuple nextOrNull() {
+ if (current == null) {
+ return null;
+ }
+ Tuple next = current;
+ advance();
+ return next;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ public Tuple peek() {
+ return current;
+ }
+
+ }
+
+ @Override
+ public void explain(List<String> planSteps) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
new file mode 100644
index 0000000..58b7e40
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.SQLCloseables;
+
+
+/**
+ *
+ * Base class for a ResultIterator that does a merge sort on the list of iterators
+ * provided.
+ *
+ * @author jtaylor
+ * @since 1.2
+ */
+public abstract class MergeSortResultIterator implements PeekingResultIterator {
+ protected final ResultIterators resultIterators;
+ protected final ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
+ private List<PeekingResultIterator> iterators;
+
+ public MergeSortResultIterator(ResultIterators iterators) {
+ this.resultIterators = iterators;
+ }
+
+ private List<PeekingResultIterator> getIterators() throws SQLException {
+ if (iterators == null) {
+ iterators = resultIterators.getIterators();
+ }
+ return iterators;
+ }
+
+ @Override
+ public void close() throws SQLException {
+ if (iterators != null) {
+ SQLCloseables.closeAll(iterators);
+ }
+ }
+
+ abstract protected int compare(Tuple t1, Tuple t2);
+
+ private PeekingResultIterator minIterator() throws SQLException {
+ List<PeekingResultIterator> iterators = getIterators();
+ Tuple minResult = null;
+ PeekingResultIterator minIterator = EMPTY_ITERATOR;
+ for (int i = iterators.size()-1; i >= 0; i--) {
+ PeekingResultIterator iterator = iterators.get(i);
+ Tuple r = iterator.peek();
+ if (r != null) {
+ if (minResult == null || compare(r, minResult) < 0) {
+ minResult = r;
+ minIterator = iterator;
+ }
+ continue;
+ }
+ iterator.close();
+ iterators.remove(i);
+ }
+ return minIterator;
+ }
+
+ @Override
+ public Tuple peek() throws SQLException {
+ PeekingResultIterator iterator = minIterator();
+ return iterator.peek();
+ }
+
+ @Override
+ public Tuple next() throws SQLException {
+ PeekingResultIterator iterator = minIterator();
+ return iterator.next();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortRowKeyResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortRowKeyResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortRowKeyResultIterator.java
new file mode 100644
index 0000000..a411bab
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortRowKeyResultIterator.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.util.List;
+
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.TupleUtil;
+
+
+/**
+ *
+ * ResultIterator that does a merge sort on the list of iterators provided,
+ * returning the rows in row key ascending order. The iterators provided
+ * must be in row key ascending order.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class MergeSortRowKeyResultIterator extends MergeSortResultIterator {
+ private final int keyOffset;
+ private final int factor;
+
+ public MergeSortRowKeyResultIterator(ResultIterators iterators) {
+ this(iterators, 0, false);
+ }
+
+ public MergeSortRowKeyResultIterator(ResultIterators iterators, int keyOffset, boolean isReverse) {
+ super(iterators);
+ this.keyOffset = keyOffset;
+ this.factor = isReverse ? -1 : 1;
+ }
+
+ @Override
+ protected int compare(Tuple t1, Tuple t2) {
+ return factor * TupleUtil.compare(t1, t2, tempPtr, keyOffset);
+ }
+
+ @Override
+ public void explain(List<String> planSteps) {
+ resultIterators.explain(planSteps);
+ planSteps.add("CLIENT MERGE SORT");
+ }
+}
\ No newline at end of file