You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/10/06 17:56:58 UTC

[4/7] git commit: PHOENIX-1315 Optimize query for Pig loader

PHOENIX-1315 Optimize query for Pig loader

Conflicts:
	phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java
	phoenix-core/src/it/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancerIT.java
	phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
	phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ca478a72
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ca478a72
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ca478a72

Branch: refs/heads/3.0
Commit: ca478a720ee2fe49edd80fa2af7ed819d27876f7
Parents: 29361c6
Author: James Taylor <jt...@salesforce.com>
Authored: Sun Oct 5 09:53:14 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Mon Oct 6 01:35:24 2014 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/EvaluationOfORIT.java       |  9 ++--
 ...ipRangeParallelIteratorRegionSplitterIT.java |  5 ++
 .../org/apache/phoenix/compile/QueryPlan.java   |  3 ++
 .../apache/phoenix/execute/AggregatePlan.java   |  6 +++
 .../phoenix/execute/DegenerateQueryPlan.java    | 12 ++++-
 .../apache/phoenix/execute/HashJoinPlan.java    |  5 ++
 .../org/apache/phoenix/execute/ScanPlan.java    |  8 +++
 .../phoenix/iterate/ConcatResultIterator.java   | 34 +++++++++++++
 .../iterate/LookAheadResultIterator.java        | 21 ++++++++
 .../phoenix/iterate/ParallelIterators.java      | 39 ++++----------
 .../apache/phoenix/jdbc/PhoenixStatement.java   |  6 +++
 .../phoenix/pig/PhoenixHBaseLoaderIT.java       | 24 ++++-----
 .../phoenix/pig/hadoop/PhoenixInputFormat.java  | 13 ++---
 .../phoenix/pig/hadoop/PhoenixInputSplit.java   | 53 ++++++++++++++------
 .../phoenix/pig/hadoop/PhoenixRecordReader.java | 25 +++++----
 15 files changed, 184 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca478a72/phoenix-core/src/it/java/org/apache/phoenix/end2end/EvaluationOfORIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/EvaluationOfORIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/EvaluationOfORIT.java
index 052ff43..0e59542 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/EvaluationOfORIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/EvaluationOfORIT.java
@@ -28,21 +28,22 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Properties;
 
+import org.apache.phoenix.util.PropertiesUtil;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category(BaseHBaseManagedTimeIT.class)
+@Category(HBaseManagedTimeTest.class)
 public class EvaluationOfORIT extends BaseHBaseManagedTimeIT{
 		
 	@Test
 	public void testPKOrNotPKInOREvaluation() throws SQLException {
-	    Properties props = new Properties(TEST_PROPERTIES);
+	    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
 	    Connection conn = DriverManager.getConnection(getUrl(), props);	    
 	    conn.setAutoCommit(false);
 	    
-            String create = "CREATE TABLE DIE ( ID INTEGER NOT NULL PRIMARY KEY,NAME VARCHAR(50) NOT NULL)";
+            String create = "CREATE TABLE DIE ( ID INTEGER NOT NULL PRIMARY KEY,NAME VARCHAR(50))";
             PreparedStatement createStmt = conn.prepareStatement(create);
-            createStmt.executeUpdate();
+            createStmt.execute();
             PreparedStatement stmt = conn.prepareStatement(
                     "upsert into " +
                     "DIE VALUES (?, ?)");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca478a72/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java
index 3d057ae..18d7910 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java
@@ -432,6 +432,11 @@ public class SkipRangeParallelIteratorRegionSplitterIT extends BaseClientManaged
             public boolean isRowKeyOrdered() {
                 return true;
             }
+
+            @Override
+            public List<List<Scan>> getScans() {
+                return null;
+            }
             
         }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()));
         List<KeyRange> keyRanges = parallelIterators.getSplits();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca478a72/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
index 271ba30..a76993c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.compile;
 import java.sql.SQLException;
 import java.util.List;
 
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.iterate.ResultIterator;
@@ -61,6 +62,8 @@ public interface QueryPlan extends StatementPlan {
 
     List<KeyRange> getSplits();
 
+    List<List<Scan>> getScans();
+
     FilterableStatement getStatement();
 
     public boolean isDegenerate();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca478a72/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 9f294a1..d7a90fb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -69,6 +69,7 @@ public class AggregatePlan extends BaseQueryPlan {
     private final Aggregators aggregators;
     private final Expression having;
     private List<KeyRange> splits;
+    private List<List<Scan>> scans;
 
     public AggregatePlan(
             StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector,
@@ -88,6 +89,11 @@ public class AggregatePlan extends BaseQueryPlan {
         return splits;
     }
 
+    @Override
+    public List<List<Scan>> getScans() {
+        return scans;
+    }
+
     private static class OrderingResultIteratorFactory implements ParallelIteratorFactory {
         private final QueryServices services;
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca478a72/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
index 80c4727..70e59b9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
@@ -21,13 +21,16 @@ import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
-import org.apache.phoenix.compile.*;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
 import org.apache.phoenix.parse.FilterableStatement;
-import org.apache.phoenix.query.*;
+import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.TableRef;
 
 public class DegenerateQueryPlan extends BaseQueryPlan {
@@ -43,6 +46,11 @@ public class DegenerateQueryPlan extends BaseQueryPlan {
     }
 
     @Override
+    public List<List<Scan>> getScans() {
+        return Collections.emptyList();
+    }
+
+    @Override
     protected ResultIterator newIterator() throws SQLException {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca478a72/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 8d141dd..047b62d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -468,6 +468,11 @@ public class HashJoinPlan implements QueryPlan {
     public boolean isRowKeyOrdered() {
         return plan.isRowKeyOrdered();
     }
+
+    @Override
+    public List<List<Scan>> getScans() {
+        return plan.getScans();
+    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca478a72/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index dfb8fec..24bf195 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -21,6 +21,7 @@ package org.apache.phoenix.execute;
 import java.sql.SQLException;
 import java.util.List;
 
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.RowProjector;
@@ -58,6 +59,7 @@ import org.apache.phoenix.util.ScanUtil;
  */
 public class ScanPlan extends BaseQueryPlan {
     private List<KeyRange> splits;
+    private List<List<Scan>> scans;
     private boolean allowPageFilter;
 
     public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) {
@@ -96,6 +98,11 @@ public class ScanPlan extends BaseQueryPlan {
     }
 
     @Override
+    public List<List<Scan>> getScans() {
+        return scans;
+    }
+
+    @Override
     protected ResultIterator newIterator() throws SQLException {
         // Set any scan attributes before creating the scanner, as it will be too late afterwards
         context.getScan().setAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY, QueryConstants.TRUE);
@@ -112,6 +119,7 @@ public class ScanPlan extends BaseQueryPlan {
         boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
         ParallelIterators iterators = new ParallelIterators(this, !allowPageFilter || isOrdered ? null : limit, parallelIteratorFactory);
         splits = iterators.getSplits();
+        scans = iterators.getScans();
         if (isOrdered) {
             scanner = new MergeSortTopNResultIterator(iterators, limit, orderBy.getOrderByExpressions());
         } else {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca478a72/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
index 21ae7d8..cddf3b3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
@@ -86,4 +86,38 @@ public class ConcatResultIterator implements PeekingResultIterator {
         return currentIterator().next();
     }
 
+	@Override
+	public String toString() {
+		return "ConcatResultIterator [resultIterators=" + resultIterators
+				+ ", iterators=" + iterators + ", index=" + index + "]";
+	}
+
+    public static PeekingResultIterator newConcatResultIterator(final List<PeekingResultIterator> concatIterators) {
+        if (concatIterators.isEmpty()) {
+            return PeekingResultIterator.EMPTY_ITERATOR;
+        } 
+        
+        if (concatIterators.size() == 1) {
+            return concatIterators.get(0);
+        }
+        return new ConcatResultIterator(new ResultIterators() {
+
+            @Override
+            public List<PeekingResultIterator> getIterators() throws SQLException {
+                return concatIterators;
+            }
+
+            @Override
+            public int size() {
+                return concatIterators.size();
+            }
+
+            @Override
+            public void explain(List<String> planSteps) {
+                // TODO: review what we should for explain plan here
+                concatIterators.get(0).explain(planSteps);
+            }
+            
+        });
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca478a72/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
index d823ffd..a7f390f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
@@ -18,12 +18,33 @@
 package org.apache.phoenix.iterate;
 
 import java.sql.SQLException;
+import java.util.List;
 
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 
 
 abstract public class LookAheadResultIterator implements PeekingResultIterator {
+    public static LookAheadResultIterator wrap(final ResultIterator iterator) {
+        return new LookAheadResultIterator() {
+
+            @Override
+            public void explain(List<String> planSteps) {
+                iterator.explain(planSteps);
+            }
+
+            @Override
+            public void close() throws SQLException {
+                iterator.close();
+            }
+
+            @Override
+            protected Tuple advance() throws SQLException {
+                return iterator.next();
+            }
+        };
+    }
+    
     private final static Tuple UNINITIALIZED = new ResultTuple();
     private Tuple next = UNINITIALIZED;
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca478a72/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 81dfbb6..c1c3775 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -268,6 +268,10 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
         return splits;
     }
 
+    public List<List<Scan>> getScans() {
+        return scans;
+    }
+
     private static List<byte[]> toBoundaries(List<HRegionLocation> regionLocations) {
         int nBoundaries = regionLocations.size() - 1;
         List<byte[]> ranges = Lists.newArrayListWithExpectedSize(nBoundaries);
@@ -431,36 +435,6 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
         return parallelScans;
     }
 
-    private static void addConcatResultIterator(List<PeekingResultIterator> iterators, final List<PeekingResultIterator> concatIterators) {
-        if (!concatIterators.isEmpty()) {
-            if (concatIterators.size() == 1) {
-                iterators.add(concatIterators.get(0));
-            } else {
-                // TODO: should ConcatResultIterator have a constructor that takes
-                // a List<PeekingResultIterator>?
-                iterators.add(new ConcatResultIterator(new ResultIterators() {
-    
-                    @Override
-                    public List<PeekingResultIterator> getIterators() throws SQLException {
-                        return concatIterators;
-                    }
-    
-                    @Override
-                    public int size() {
-                        return concatIterators.size();
-                    }
-    
-                    @Override
-                    public void explain(List<String> planSteps) {
-                        // TODO: review what we should for explain plan here
-                        concatIterators.get(0).explain(planSteps);
-                    }
-                    
-                }));
-            }
-        }
-    }
-    
     public static <T> List<T> reverseIfNecessary(List<T> list, boolean reverse) {
         if (!reverse) {
             return list;
@@ -468,6 +442,11 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
         return Lists.reverse(list);
     }
     
+    private static void addConcatResultIterator(List<PeekingResultIterator> iterators, final List<PeekingResultIterator> concatIterators) {
+        if (!concatIterators.isEmpty()) {
+            iterators.add(ConcatResultIterator.newConcatResultIterator(concatIterators));
+        }
+    }
     /**
      * Executes the scan in parallel across all regions, blocking until all scans are complete.
      * @return the result iterators for the scan of each region

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca478a72/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index e396b22..519a7db 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -33,6 +33,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
 
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.ColumnProjector;
 import org.apache.phoenix.compile.ColumnResolver;
@@ -413,6 +414,11 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
                 }
 
                 @Override
+                public List<List<Scan>> getScans() {
+                    return Collections.emptyList();
+                }
+
+                @Override
                 public StatementContext getContext() {
                     return plan.getContext();
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca478a72/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
index 6017065..4801d63 100644
--- a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
+++ b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
@@ -113,7 +113,7 @@ public class PhoenixHBaseLoaderIT {
         conn.createStatement().execute(ddl);
 
         pigServer.registerQuery(String.format(
-                "A = load 'hbase://table/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", TABLE_FULL_NAME,
+                "A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE_FULL_NAME,
                 zkQuorum));
         
         final Schema schema = pigServer.dumpSchema("A");
@@ -144,7 +144,7 @@ public class PhoenixHBaseLoaderIT {
         
         final String selectColumns = "ID,NAME";
         pigServer.registerQuery(String.format(
-                "A = load 'hbase://table/%s/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');",
+                "A = load 'hbase://table/%s/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');",
                 TABLE_FULL_NAME, selectColumns, zkQuorum));
         
         Schema schema = pigServer.dumpSchema("A");
@@ -175,7 +175,7 @@ public class PhoenixHBaseLoaderIT {
         //sql query for LOAD
         final String sqlQuery = "SELECT A_STRING,CF1.A_INTEGER,CF2.A_DOUBLE FROM " + TABLE_FULL_NAME;
         pigServer.registerQuery(String.format(
-                "A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');",
+                "A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');",
                 sqlQuery, zkQuorum));
         
         //assert the schema.
@@ -209,7 +209,7 @@ public class PhoenixHBaseLoaderIT {
         LOG.info(String.format("Generated SQL Query [%s]",sqlQuery));
         
         pigServer.registerQuery(String.format(
-                "raw = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s') AS (a:chararray,b:bigdecimal,c:int,d:double);",
+                "raw = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s') AS (a:chararray,b:bigdecimal,c:int,d:double);",
                 sqlQuery, zkQuorum));
         
         //test the schema.
@@ -252,7 +252,7 @@ public class PhoenixHBaseLoaderIT {
          
         //load data and filter rows whose age is > 25
         pigServer.registerQuery(String.format(
-                "A = load 'hbase://table/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", TABLE_FULL_NAME,
+                "A = load 'hbase://table/%s' using "  + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE_FULL_NAME,
                 zkQuorum));
         pigServer.registerQuery("B = FILTER A BY AGE > 25;");
         
@@ -339,7 +339,7 @@ public class PhoenixHBaseLoaderIT {
         final String sqlQuery = String.format(" SELECT FOO, BAZ FROM %s WHERE BAR = -1 " , TABLE_FULL_NAME);
       
         pigServer.registerQuery(String.format(
-                "A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery,
+                "A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery,
                 zkQuorum));
         
         final Iterator<Tuple> iterator = pigServer.openIterator("A");
@@ -403,7 +403,7 @@ public class PhoenixHBaseLoaderIT {
          //load data and filter rows whose age is > 25
         pigServer.setBatchOn();
         pigServer.registerQuery(String.format(
-                "A = load 'hbase://table/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", TABLE_FULL_NAME,
+                "A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE_FULL_NAME,
                 zkQuorum));
         
         pigServer.registerQuery("B = GROUP A BY AGE;");
@@ -457,7 +457,7 @@ public class PhoenixHBaseLoaderIT {
          //load data and filter rows whose age is > 25
         pigServer.setBatchOn();
         pigServer.registerQuery(String.format(
-                "A = load 'hbase://table/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", TABLE_FULL_NAME,
+                "A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE_FULL_NAME,
                 zkQuorum));
         
         pigServer.registerQuery("B = GROUP A BY AGE;");
@@ -470,11 +470,11 @@ public class PhoenixHBaseLoaderIT {
         //validate the data with what is stored.
         final String selectQuery = "SELECT AGE , MIN_SAL ,MAX_SAL FROM " + targetTable + " ORDER BY AGE";
         final ResultSet rs = conn.createStatement().executeQuery(selectQuery);
-        rs.next();
+        assertTrue(rs.next());
         assertEquals(25, rs.getInt("AGE"));
         assertEquals(0, rs.getInt("MIN_SAL"));
         assertEquals(180, rs.getInt("MAX_SAL"));
-        rs.next();
+        assertTrue(rs.next());
         assertEquals(30, rs.getInt("AGE"));
         assertEquals(0, rs.getInt("MIN_SAL"));
         assertEquals(270, rs.getInt("MAX_SAL"));
@@ -512,7 +512,7 @@ public class PhoenixHBaseLoaderIT {
         //sql query load data and filter rows whose age is > 25
         final String sqlQuery = " SELECT NEXT VALUE FOR my_sequence AS my_seq,ID,NAME,AGE FROM " + TABLE_FULL_NAME + " WHERE AGE > 25";
         pigServer.registerQuery(String.format(
-                "A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery,
+                "A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery,
                 zkQuorum));
         
         
@@ -549,7 +549,7 @@ public class PhoenixHBaseLoaderIT {
         final String sqlQuery = " SELECT UPPER(NAME) AS n FROM " + TABLE_FULL_NAME + " ORDER BY ID" ;
 
         pigServer.registerQuery(String.format(
-                "A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery,
+                "A = load 'hbase://query/%s' using "  + PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery,
                 zkQuorum));
         
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca478a72/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
index bd1df97..2ef7914 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
@@ -28,6 +28,7 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -86,8 +87,8 @@ public final class PhoenixInputFormat extends InputFormat<NullWritable, PhoenixR
         Preconditions.checkNotNull(qplan);
         Preconditions.checkNotNull(splits);
         final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size());
-        for (KeyRange split : qplan.getSplits()) {
-            psplits.add(new PhoenixInputSplit(split));
+        for (List<Scan> scans : qplan.getScans()) {
+            psplits.add(new PhoenixInputSplit(scans));
         }
         return psplits;
     }
@@ -127,10 +128,10 @@ public final class PhoenixInputFormat extends InputFormat<NullWritable, PhoenixR
                 Preconditions.checkNotNull(selectStatement);
                 final Statement statement = connection.createStatement();
                 final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
-                this.queryPlan = pstmt.compileQuery(selectStatement);
-                // FIXME: why is getting the iterator necessary here, as it will
-                // cause the query to run.
-                this.queryPlan.iterator();
+                // Optimize the query plan so that we potentially use secondary indexes
+                this.queryPlan = pstmt.optimizeQuery(selectStatement);
+                // Initialize the query plan so it sets up the parallel scans
+                queryPlan.iterator();
             } catch(Exception exception) {
                 LOG.error(String.format("Failed to get the query plan with error [%s]",exception.getMessage()));
                 throw new RuntimeException(exception);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca478a72/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
index 43d69b3..7414d67 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
@@ -22,12 +22,16 @@ package org.apache.phoenix.pig.hadoop;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.List;
 
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.phoenix.query.KeyRange;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 
 /**
  * 
@@ -36,6 +40,7 @@ import com.google.common.base.Preconditions;
  */
 public class PhoenixInputSplit extends InputSplit implements Writable {
 
+    private List<Scan> scans;
     private KeyRange keyRange;
    
     /**
@@ -48,21 +53,44 @@ public class PhoenixInputSplit extends InputSplit implements Writable {
     * 
     * @param keyRange
     */
-    public PhoenixInputSplit(final KeyRange keyRange) {
-        Preconditions.checkNotNull(keyRange);
-        this.keyRange = keyRange;
+    public PhoenixInputSplit(final List<Scan> scans) {
+        Preconditions.checkNotNull(scans);
+        Preconditions.checkState(!scans.isEmpty());
+        this.scans = scans;
+        init();
+    }
+    
+    public List<Scan> getScans() {
+        return scans;
+    }
+    
+    public KeyRange getKeyRange() {
+        return keyRange;
+    }
+    
+    private void init() {
+        this.keyRange = KeyRange.getKeyRange(scans.get(0).getStartRow(), scans.get(scans.size()-1).getStopRow());
     }
     
     @Override
     public void readFields(DataInput input) throws IOException {
-        this.keyRange = new KeyRange ();
-        this.keyRange.readFields(input);
+        int count = WritableUtils.readVInt(input);
+        scans = Lists.newArrayListWithExpectedSize(count);
+        for (int i = 0; i < count; i++) {
+            Scan scan = new Scan();
+            scan.readFields(input);
+            scans.add(scan);
+        }
+        init();
     }
     
     @Override
     public void write(DataOutput output) throws IOException {
-        Preconditions.checkNotNull(keyRange);
-        keyRange.write(output);
+        Preconditions.checkNotNull(scans);
+        WritableUtils.writeVInt(output, scans.size());
+        for (Scan scan : scans) {
+            scan.write(output);
+        }
     }
 
     @Override
@@ -75,23 +103,18 @@ public class PhoenixInputSplit extends InputSplit implements Writable {
         return new String[]{};
     }
 
-    /**
-     * @return Returns the keyRange.
-     */
-    public KeyRange getKeyRange() {
-        return keyRange;
-    }
-
     @Override
     public int hashCode() {
         final int prime = 31;
         int result = 1;
-        result = prime * result + ((keyRange == null) ? 0 : keyRange.hashCode());
+        result = prime * result + keyRange.hashCode();
         return result;
     }
 
     @Override
     public boolean equals(Object obj) {
+        // TODO: review: it's a reasonable check to use the keyRange,
+        // but it's not perfect. Do we need an equals impl?
         if (this == obj) { return true; }
         if (obj == null) { return false; }
         if (!(obj instanceof PhoenixInputSplit)) { return false; }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca478a72/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
index a445ce6..2bff620 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
@@ -21,6 +21,7 @@ package org.apache.phoenix.pig.hadoop;
 
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -30,16 +31,18 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.iterate.ConcatResultIterator;
+import org.apache.phoenix.iterate.LookAheadResultIterator;
+import org.apache.phoenix.iterate.PeekingResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.SequenceResultIterator;
 import org.apache.phoenix.iterate.TableResultIterator;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.pig.PhoenixPigConfiguration;
-import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.util.ScanUtil;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
 
 /**
  * RecordReader that process the scan and returns PhoenixRecord
@@ -94,17 +97,19 @@ public final class PhoenixRecordReader extends RecordReader<NullWritable,Phoenix
     @Override
     public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
         final PhoenixInputSplit pSplit = (PhoenixInputSplit)split;
-        final KeyRange keyRange = pSplit.getKeyRange();
-        final Scan splitScan = queryPlan.getContext().getScan();
-        final Scan scan = new Scan(splitScan);
-        ScanUtil.intersectScanRange(scan, keyRange.getLowerRange(), keyRange.getUpperRange(), queryPlan.getContext().getScanRanges().useSkipScanFilter());
+        final List<Scan> scans = pSplit.getScans();
         try {
-            TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(),scan);
+            List<PeekingResultIterator> iterators = Lists.newArrayListWithExpectedSize(scans.size());
+            for (Scan scan : scans) {
+                final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(),scan);
+                PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
+                iterators.add(peekingResultIterator);
+            }
+            ResultIterator iterator = ConcatResultIterator.newConcatResultIterator(iterators);
             if(queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) {
-                this.resultIterator = new SequenceResultIterator(tableResultIterator, queryPlan.getContext().getSequenceManager());
-            } else {
-                this.resultIterator = tableResultIterator;
+                iterator = new SequenceResultIterator(iterator, queryPlan.getContext().getSequenceManager());
             }
+            this.resultIterator = iterator;
             this.resultSet = new PhoenixResultSet(this.resultIterator, queryPlan.getProjector(),queryPlan.getContext().getStatement());
         } catch (SQLException e) {
             LOG.error(String.format(" Error [%s] initializing PhoenixRecordReader. ",e.getMessage()));