You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by an...@apache.org on 2014/02/24 06:42:56 UTC
[6/8] git commit: PHOENIX-29
PHOENIX-29
Project: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/commit/b75206db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/tree/b75206db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/diff/b75206db
Branch: refs/heads/master
Commit: b75206db6eac2cc9809feefcc6b4709569101b3b
Parents: 6ba276b
Author: anoopsjohn <an...@gmail.com>
Authored: Mon Feb 24 01:15:43 2014 +0530
Committer: anoopsjohn <an...@gmail.com>
Committed: Mon Feb 24 01:15:43 2014 +0530
----------------------------------------------------------------------
.../phoenix/iterate/ParallelIterators.java | 84 ++++++++++++++------
1 file changed, 58 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/b75206db/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 9f19ade..563d97f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -18,16 +18,9 @@
package org.apache.phoenix.iterate;
import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.*;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.Scan;
@@ -36,23 +29,16 @@ import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
-import org.apache.phoenix.compile.RowProjector;
-import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.compile.*;
+import org.apache.phoenix.filter.ColumnProjectionFilter;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.job.JobManager.JobCallable;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.HintNode;
-import org.apache.phoenix.query.ConnectionQueryServices;
-import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.SaltingUtil;
-import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.SQLCloseables;
-import org.apache.phoenix.util.ScanUtil;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.query.*;
+import org.apache.phoenix.schema.*;
+import org.apache.phoenix.schema.PTable.ViewType;
+import org.apache.phoenix.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,7 +71,9 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
}
};
- public ParallelIterators(StatementContext context, TableRef tableRef, FilterableStatement statement, RowProjector projector, GroupBy groupBy, Integer limit, ParallelIteratorFactory iteratorFactory) throws SQLException {
+ public ParallelIterators(StatementContext context, TableRef tableRef, FilterableStatement statement,
+ RowProjector projector, GroupBy groupBy, Integer limit, ParallelIteratorFactory iteratorFactory)
+ throws SQLException {
super(context, tableRef, groupBy);
this.splits = getSplits(context, tableRef, statement.getHint());
this.iteratorFactory = iteratorFactory;
@@ -96,7 +84,10 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
// If nothing projected into scan and we only have one column family, just allow everything
// to be projected and use a FirstKeyOnlyFilter to skip from row to row. This turns out to
// be quite a bit faster.
- if (familyMap.isEmpty() && table.getColumnFamilies().size() == 1) {
+ // Where condition columns also will get added into familyMap
+ // When where conditions are present, we can not add FirstKeyOnlyFilter at beginning.
+ if (familyMap.isEmpty() && context.getWhereCoditionColumns().isEmpty()
+ && table.getColumnFamilies().size() == 1) {
// Project the one column family. We must project a column family since it's possible
// that there are other non declared column families that we need to ignore.
scan.addFamily(table.getColumnFamilies().get(0).getName().getBytes());
@@ -113,6 +104,47 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
if (limit != null) {
ScanUtil.andFilterAtEnd(scan, new PageFilter(limit));
}
+
+ Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
+ if (familyMap != null && !familyMap.isEmpty()) {
+ // columnsTracker contain cf -> qualifiers which should get returned.
+ Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> columnsTracker =
+ new TreeMap<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>>();
+ for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
+ ImmutableBytesPtr cf = new ImmutableBytesPtr(entry.getKey());
+ NavigableSet<byte[]> qs = entry.getValue();
+ NavigableSet<ImmutableBytesPtr> cols = null;
+ if (qs != null) {
+ cols = new TreeSet<ImmutableBytesPtr>();
+ for (byte[] q : qs) {
+ cols.add(new ImmutableBytesPtr(q));
+ }
+ }
+ columnsTracker.put(cf, cols);
+ }
+ // Making sure that where condition CFs are getting scanned at HRS.
+ for (Pair<byte[], byte[]> whereCol : context.getWhereCoditionColumns()) {
+ if (!(familyMap.containsKey(whereCol.getFirst()))) {
+ scan.addFamily(whereCol.getFirst());
+ }
+ }
+ if (!columnsTracker.isEmpty()) {
+ for (ImmutableBytesPtr f : columnsTracker.keySet()) {
+ // This addFamily will remove explicit cols in scan familyMap and make it as entire row.
+ // We don't want the ExplicitColumnTracker to be used. Instead we have the ColumnProjectionFilter
+ scan.addFamily(f.get());
+ }
+ ScanUtil.andFilterAtEnd(scan, new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table),
+ columnsTracker));
+ }
+ if (table.getViewType() == ViewType.MAPPED) {
+ // Since we don't have the empty key value in MAPPED tables, we must select all CFs in HRS. But only the
+ // selected column values are returned back to client
+ for (PColumnFamily family : table.getColumnFamilies()) {
+ scan.addFamily(family.getName().getBytes());
+ }
+ }
+ }
}
/**