You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by an...@apache.org on 2014/02/24 06:42:56 UTC

[6/8] git commit: PHOENIX-29

PHOENIX-29


Project: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/commit/b75206db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/tree/b75206db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/diff/b75206db

Branch: refs/heads/master
Commit: b75206db6eac2cc9809feefcc6b4709569101b3b
Parents: 6ba276b
Author: anoopsjohn <an...@gmail.com>
Authored: Mon Feb 24 01:15:43 2014 +0530
Committer: anoopsjohn <an...@gmail.com>
Committed: Mon Feb 24 01:15:43 2014 +0530

----------------------------------------------------------------------
 .../phoenix/iterate/ParallelIterators.java      | 84 ++++++++++++++------
 1 file changed, 58 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/b75206db/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 9f19ade..563d97f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -18,16 +18,9 @@
 package org.apache.phoenix.iterate;
 
 import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.*;
 
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.client.Scan;
@@ -36,23 +29,16 @@ import org.apache.hadoop.hbase.filter.PageFilter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
-import org.apache.phoenix.compile.RowProjector;
-import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.compile.*;
+import org.apache.phoenix.filter.ColumnProjectionFilter;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.job.JobManager.JobCallable;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.HintNode;
-import org.apache.phoenix.query.ConnectionQueryServices;
-import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.SaltingUtil;
-import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.SQLCloseables;
-import org.apache.phoenix.util.ScanUtil;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.query.*;
+import org.apache.phoenix.schema.*;
+import org.apache.phoenix.schema.PTable.ViewType;
+import org.apache.phoenix.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -85,7 +71,9 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
         }
     };
 
-    public ParallelIterators(StatementContext context, TableRef tableRef, FilterableStatement statement, RowProjector projector, GroupBy groupBy, Integer limit, ParallelIteratorFactory iteratorFactory) throws SQLException {
+    public ParallelIterators(StatementContext context, TableRef tableRef, FilterableStatement statement,
+            RowProjector projector, GroupBy groupBy, Integer limit, ParallelIteratorFactory iteratorFactory)
+            throws SQLException {
         super(context, tableRef, groupBy);
         this.splits = getSplits(context, tableRef, statement.getHint());
         this.iteratorFactory = iteratorFactory;
@@ -96,7 +84,10 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
             // If nothing projected into scan and we only have one column family, just allow everything
             // to be projected and use a FirstKeyOnlyFilter to skip from row to row. This turns out to
             // be quite a bit faster.
-            if (familyMap.isEmpty() && table.getColumnFamilies().size() == 1) {
+            // Where condition columns also will get added into familyMap
+            // When where conditions are present, we can not add FirstKeyOnlyFilter at beginning.
+            if (familyMap.isEmpty() && context.getWhereCoditionColumns().isEmpty()
+                    && table.getColumnFamilies().size() == 1) {
                 // Project the one column family. We must project a column family since it's possible
                 // that there are other non declared column families that we need to ignore.
                 scan.addFamily(table.getColumnFamilies().get(0).getName().getBytes());
@@ -113,6 +104,47 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
         if (limit != null) {
             ScanUtil.andFilterAtEnd(scan, new PageFilter(limit));
         }
+
+        Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
+        if (familyMap != null && !familyMap.isEmpty()) {
+            // columnsTracker contain cf -> qualifiers which should get returned.
+            Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> columnsTracker = 
+                    new TreeMap<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>>();
+            for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
+                ImmutableBytesPtr cf = new ImmutableBytesPtr(entry.getKey());
+                NavigableSet<byte[]> qs = entry.getValue();
+                NavigableSet<ImmutableBytesPtr> cols = null;
+                if (qs != null) {
+                    cols = new TreeSet<ImmutableBytesPtr>();
+                    for (byte[] q : qs) {
+                        cols.add(new ImmutableBytesPtr(q));
+                    }
+                }
+                columnsTracker.put(cf, cols);
+            }
+            // Making sure that where condition CFs are getting scanned at HRS.
+            for (Pair<byte[], byte[]> whereCol : context.getWhereCoditionColumns()) {
+                if (!(familyMap.containsKey(whereCol.getFirst()))) {
+                    scan.addFamily(whereCol.getFirst());
+                }
+            }
+            if (!columnsTracker.isEmpty()) {
+                for (ImmutableBytesPtr f : columnsTracker.keySet()) {
+                    // This addFamily will remove explicit cols in scan familyMap and make it as entire row.
+                    // We don't want the ExplicitColumnTracker to be used. Instead we have the ColumnProjectionFilter
+                    scan.addFamily(f.get());
+                }
+                ScanUtil.andFilterAtEnd(scan, new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table),
+                        columnsTracker));
+            }
+            if (table.getViewType() == ViewType.MAPPED) {
+                // Since we don't have the empty key value in MAPPED tables, we must select all CFs in HRS. But only the
+                // selected column values are returned back to client
+                for (PColumnFamily family : table.getColumnFamilies()) {
+                    scan.addFamily(family.getName().getBytes());
+                }
+            }
+        }
     }
 
     /**