You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/10/06 17:56:58 UTC
[4/7] git commit: PHOENIX-1315 Optimize query for Pig loader
PHOENIX-1315 Optimize query for Pig loader
Conflicts:
phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java
phoenix-core/src/it/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancerIT.java
phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ca478a72
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ca478a72
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ca478a72
Branch: refs/heads/3.0
Commit: ca478a720ee2fe49edd80fa2af7ed819d27876f7
Parents: 29361c6
Author: James Taylor <jt...@salesforce.com>
Authored: Sun Oct 5 09:53:14 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Mon Oct 6 01:35:24 2014 -0700
----------------------------------------------------------------------
.../phoenix/end2end/EvaluationOfORIT.java | 9 ++--
...ipRangeParallelIteratorRegionSplitterIT.java | 5 ++
.../org/apache/phoenix/compile/QueryPlan.java | 3 ++
.../apache/phoenix/execute/AggregatePlan.java | 6 +++
.../phoenix/execute/DegenerateQueryPlan.java | 12 ++++-
.../apache/phoenix/execute/HashJoinPlan.java | 5 ++
.../org/apache/phoenix/execute/ScanPlan.java | 8 +++
.../phoenix/iterate/ConcatResultIterator.java | 34 +++++++++++++
.../iterate/LookAheadResultIterator.java | 21 ++++++++
.../phoenix/iterate/ParallelIterators.java | 39 ++++----------
.../apache/phoenix/jdbc/PhoenixStatement.java | 6 +++
.../phoenix/pig/PhoenixHBaseLoaderIT.java | 24 ++++-----
.../phoenix/pig/hadoop/PhoenixInputFormat.java | 13 ++---
.../phoenix/pig/hadoop/PhoenixInputSplit.java | 53 ++++++++++++++------
.../phoenix/pig/hadoop/PhoenixRecordReader.java | 25 +++++----
15 files changed, 184 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca478a72/phoenix-core/src/it/java/org/apache/phoenix/end2end/EvaluationOfORIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/EvaluationOfORIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/EvaluationOfORIT.java
index 052ff43..0e59542 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/EvaluationOfORIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/EvaluationOfORIT.java
@@ -28,21 +28,22 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;
+import org.apache.phoenix.util.PropertiesUtil;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-@Category(BaseHBaseManagedTimeIT.class)
+@Category(HBaseManagedTimeTest.class)
public class EvaluationOfORIT extends BaseHBaseManagedTimeIT{
@Test
public void testPKOrNotPKInOREvaluation() throws SQLException {
- Properties props = new Properties(TEST_PROPERTIES);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
conn.setAutoCommit(false);
- String create = "CREATE TABLE DIE ( ID INTEGER NOT NULL PRIMARY KEY,NAME VARCHAR(50) NOT NULL)";
+ String create = "CREATE TABLE DIE ( ID INTEGER NOT NULL PRIMARY KEY,NAME VARCHAR(50))";
PreparedStatement createStmt = conn.prepareStatement(create);
- createStmt.executeUpdate();
+ createStmt.execute();
PreparedStatement stmt = conn.prepareStatement(
"upsert into " +
"DIE VALUES (?, ?)");
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca478a72/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java
index 3d057ae..18d7910 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java
@@ -432,6 +432,11 @@ public class SkipRangeParallelIteratorRegionSplitterIT extends BaseClientManaged
public boolean isRowKeyOrdered() {
return true;
}
+
+ @Override
+ public List<List<Scan>> getScans() {
+ return null;
+ }
}, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()));
List<KeyRange> keyRanges = parallelIterators.getSplits();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca478a72/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
index 271ba30..a76993c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.compile;
import java.sql.SQLException;
import java.util.List;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.iterate.ResultIterator;
@@ -61,6 +62,8 @@ public interface QueryPlan extends StatementPlan {
List<KeyRange> getSplits();
+ List<List<Scan>> getScans();
+
FilterableStatement getStatement();
public boolean isDegenerate();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca478a72/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 9f294a1..d7a90fb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -69,6 +69,7 @@ public class AggregatePlan extends BaseQueryPlan {
private final Aggregators aggregators;
private final Expression having;
private List<KeyRange> splits;
+ private List<List<Scan>> scans;
public AggregatePlan(
StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector,
@@ -88,6 +89,11 @@ public class AggregatePlan extends BaseQueryPlan {
return splits;
}
+ @Override
+ public List<List<Scan>> getScans() {
+ return scans;
+ }
+
private static class OrderingResultIteratorFactory implements ParallelIteratorFactory {
private final QueryServices services;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca478a72/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
index 80c4727..70e59b9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
@@ -21,13 +21,16 @@ import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
-import org.apache.phoenix.compile.*;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
import org.apache.phoenix.parse.FilterableStatement;
-import org.apache.phoenix.query.*;
+import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.schema.TableRef;
public class DegenerateQueryPlan extends BaseQueryPlan {
@@ -43,6 +46,11 @@ public class DegenerateQueryPlan extends BaseQueryPlan {
}
@Override
+ public List<List<Scan>> getScans() {
+ return Collections.emptyList();
+ }
+
+ @Override
protected ResultIterator newIterator() throws SQLException {
return null;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca478a72/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 8d141dd..047b62d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -468,6 +468,11 @@ public class HashJoinPlan implements QueryPlan {
public boolean isRowKeyOrdered() {
return plan.isRowKeyOrdered();
}
+
+ @Override
+ public List<List<Scan>> getScans() {
+ return plan.getScans();
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca478a72/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index dfb8fec..24bf195 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -21,6 +21,7 @@ package org.apache.phoenix.execute;
import java.sql.SQLException;
import java.util.List;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.compile.RowProjector;
@@ -58,6 +59,7 @@ import org.apache.phoenix.util.ScanUtil;
*/
public class ScanPlan extends BaseQueryPlan {
private List<KeyRange> splits;
+ private List<List<Scan>> scans;
private boolean allowPageFilter;
public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) {
@@ -96,6 +98,11 @@ public class ScanPlan extends BaseQueryPlan {
}
@Override
+ public List<List<Scan>> getScans() {
+ return scans;
+ }
+
+ @Override
protected ResultIterator newIterator() throws SQLException {
// Set any scan attributes before creating the scanner, as it will be too late afterwards
context.getScan().setAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY, QueryConstants.TRUE);
@@ -112,6 +119,7 @@ public class ScanPlan extends BaseQueryPlan {
boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
ParallelIterators iterators = new ParallelIterators(this, !allowPageFilter || isOrdered ? null : limit, parallelIteratorFactory);
splits = iterators.getSplits();
+ scans = iterators.getScans();
if (isOrdered) {
scanner = new MergeSortTopNResultIterator(iterators, limit, orderBy.getOrderByExpressions());
} else {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca478a72/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
index 21ae7d8..cddf3b3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
@@ -86,4 +86,38 @@ public class ConcatResultIterator implements PeekingResultIterator {
return currentIterator().next();
}
+ @Override
+ public String toString() {
+ return "ConcatResultIterator [resultIterators=" + resultIterators
+ + ", iterators=" + iterators + ", index=" + index + "]";
+ }
+
+ public static PeekingResultIterator newConcatResultIterator(final List<PeekingResultIterator> concatIterators) {
+ if (concatIterators.isEmpty()) {
+ return PeekingResultIterator.EMPTY_ITERATOR;
+ }
+
+ if (concatIterators.size() == 1) {
+ return concatIterators.get(0);
+ }
+ return new ConcatResultIterator(new ResultIterators() {
+
+ @Override
+ public List<PeekingResultIterator> getIterators() throws SQLException {
+ return concatIterators;
+ }
+
+ @Override
+ public int size() {
+ return concatIterators.size();
+ }
+
+ @Override
+ public void explain(List<String> planSteps) {
+ // TODO: review what we should for explain plan here
+ concatIterators.get(0).explain(planSteps);
+ }
+
+ });
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca478a72/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
index d823ffd..a7f390f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
@@ -18,12 +18,33 @@
package org.apache.phoenix.iterate;
import java.sql.SQLException;
+import java.util.List;
import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.tuple.Tuple;
abstract public class LookAheadResultIterator implements PeekingResultIterator {
+ public static LookAheadResultIterator wrap(final ResultIterator iterator) {
+ return new LookAheadResultIterator() {
+
+ @Override
+ public void explain(List<String> planSteps) {
+ iterator.explain(planSteps);
+ }
+
+ @Override
+ public void close() throws SQLException {
+ iterator.close();
+ }
+
+ @Override
+ protected Tuple advance() throws SQLException {
+ return iterator.next();
+ }
+ };
+ }
+
private final static Tuple UNINITIALIZED = new ResultTuple();
private Tuple next = UNINITIALIZED;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca478a72/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 81dfbb6..c1c3775 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -268,6 +268,10 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
return splits;
}
+ public List<List<Scan>> getScans() {
+ return scans;
+ }
+
private static List<byte[]> toBoundaries(List<HRegionLocation> regionLocations) {
int nBoundaries = regionLocations.size() - 1;
List<byte[]> ranges = Lists.newArrayListWithExpectedSize(nBoundaries);
@@ -431,36 +435,6 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
return parallelScans;
}
- private static void addConcatResultIterator(List<PeekingResultIterator> iterators, final List<PeekingResultIterator> concatIterators) {
- if (!concatIterators.isEmpty()) {
- if (concatIterators.size() == 1) {
- iterators.add(concatIterators.get(0));
- } else {
- // TODO: should ConcatResultIterator have a constructor that takes
- // a List<PeekingResultIterator>?
- iterators.add(new ConcatResultIterator(new ResultIterators() {
-
- @Override
- public List<PeekingResultIterator> getIterators() throws SQLException {
- return concatIterators;
- }
-
- @Override
- public int size() {
- return concatIterators.size();
- }
-
- @Override
- public void explain(List<String> planSteps) {
- // TODO: review what we should for explain plan here
- concatIterators.get(0).explain(planSteps);
- }
-
- }));
- }
- }
- }
-
public static <T> List<T> reverseIfNecessary(List<T> list, boolean reverse) {
if (!reverse) {
return list;
@@ -468,6 +442,11 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
return Lists.reverse(list);
}
+ private static void addConcatResultIterator(List<PeekingResultIterator> iterators, final List<PeekingResultIterator> concatIterators) {
+ if (!concatIterators.isEmpty()) {
+ iterators.add(ConcatResultIterator.newConcatResultIterator(concatIterators));
+ }
+ }
/**
* Executes the scan in parallel across all regions, blocking until all scans are complete.
* @return the result iterators for the scan of each region
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca478a72/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index e396b22..519a7db 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -33,6 +33,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.ColumnProjector;
import org.apache.phoenix.compile.ColumnResolver;
@@ -413,6 +414,11 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
}
@Override
+ public List<List<Scan>> getScans() {
+ return Collections.emptyList();
+ }
+
+ @Override
public StatementContext getContext() {
return plan.getContext();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca478a72/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
index 6017065..4801d63 100644
--- a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
+++ b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
@@ -113,7 +113,7 @@ public class PhoenixHBaseLoaderIT {
conn.createStatement().execute(ddl);
pigServer.registerQuery(String.format(
- "A = load 'hbase://table/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", TABLE_FULL_NAME,
+ "A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE_FULL_NAME,
zkQuorum));
final Schema schema = pigServer.dumpSchema("A");
@@ -144,7 +144,7 @@ public class PhoenixHBaseLoaderIT {
final String selectColumns = "ID,NAME";
pigServer.registerQuery(String.format(
- "A = load 'hbase://table/%s/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');",
+ "A = load 'hbase://table/%s/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');",
TABLE_FULL_NAME, selectColumns, zkQuorum));
Schema schema = pigServer.dumpSchema("A");
@@ -175,7 +175,7 @@ public class PhoenixHBaseLoaderIT {
//sql query for LOAD
final String sqlQuery = "SELECT A_STRING,CF1.A_INTEGER,CF2.A_DOUBLE FROM " + TABLE_FULL_NAME;
pigServer.registerQuery(String.format(
- "A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');",
+ "A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');",
sqlQuery, zkQuorum));
//assert the schema.
@@ -209,7 +209,7 @@ public class PhoenixHBaseLoaderIT {
LOG.info(String.format("Generated SQL Query [%s]",sqlQuery));
pigServer.registerQuery(String.format(
- "raw = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s') AS (a:chararray,b:bigdecimal,c:int,d:double);",
+ "raw = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s') AS (a:chararray,b:bigdecimal,c:int,d:double);",
sqlQuery, zkQuorum));
//test the schema.
@@ -252,7 +252,7 @@ public class PhoenixHBaseLoaderIT {
//load data and filter rows whose age is > 25
pigServer.registerQuery(String.format(
- "A = load 'hbase://table/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", TABLE_FULL_NAME,
+ "A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE_FULL_NAME,
zkQuorum));
pigServer.registerQuery("B = FILTER A BY AGE > 25;");
@@ -339,7 +339,7 @@ public class PhoenixHBaseLoaderIT {
final String sqlQuery = String.format(" SELECT FOO, BAZ FROM %s WHERE BAR = -1 " , TABLE_FULL_NAME);
pigServer.registerQuery(String.format(
- "A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery,
+ "A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery,
zkQuorum));
final Iterator<Tuple> iterator = pigServer.openIterator("A");
@@ -403,7 +403,7 @@ public class PhoenixHBaseLoaderIT {
//load data and filter rows whose age is > 25
pigServer.setBatchOn();
pigServer.registerQuery(String.format(
- "A = load 'hbase://table/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", TABLE_FULL_NAME,
+ "A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE_FULL_NAME,
zkQuorum));
pigServer.registerQuery("B = GROUP A BY AGE;");
@@ -457,7 +457,7 @@ public class PhoenixHBaseLoaderIT {
//load data and filter rows whose age is > 25
pigServer.setBatchOn();
pigServer.registerQuery(String.format(
- "A = load 'hbase://table/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", TABLE_FULL_NAME,
+ "A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE_FULL_NAME,
zkQuorum));
pigServer.registerQuery("B = GROUP A BY AGE;");
@@ -470,11 +470,11 @@ public class PhoenixHBaseLoaderIT {
//validate the data with what is stored.
final String selectQuery = "SELECT AGE , MIN_SAL ,MAX_SAL FROM " + targetTable + " ORDER BY AGE";
final ResultSet rs = conn.createStatement().executeQuery(selectQuery);
- rs.next();
+ assertTrue(rs.next());
assertEquals(25, rs.getInt("AGE"));
assertEquals(0, rs.getInt("MIN_SAL"));
assertEquals(180, rs.getInt("MAX_SAL"));
- rs.next();
+ assertTrue(rs.next());
assertEquals(30, rs.getInt("AGE"));
assertEquals(0, rs.getInt("MIN_SAL"));
assertEquals(270, rs.getInt("MAX_SAL"));
@@ -512,7 +512,7 @@ public class PhoenixHBaseLoaderIT {
//sql query load data and filter rows whose age is > 25
final String sqlQuery = " SELECT NEXT VALUE FOR my_sequence AS my_seq,ID,NAME,AGE FROM " + TABLE_FULL_NAME + " WHERE AGE > 25";
pigServer.registerQuery(String.format(
- "A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery,
+ "A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery,
zkQuorum));
@@ -549,7 +549,7 @@ public class PhoenixHBaseLoaderIT {
final String sqlQuery = " SELECT UPPER(NAME) AS n FROM " + TABLE_FULL_NAME + " ORDER BY ID" ;
pigServer.registerQuery(String.format(
- "A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery,
+ "A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery,
zkQuorum));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca478a72/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
index bd1df97..2ef7914 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
@@ -28,6 +28,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -86,8 +87,8 @@ public final class PhoenixInputFormat extends InputFormat<NullWritable, PhoenixR
Preconditions.checkNotNull(qplan);
Preconditions.checkNotNull(splits);
final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size());
- for (KeyRange split : qplan.getSplits()) {
- psplits.add(new PhoenixInputSplit(split));
+ for (List<Scan> scans : qplan.getScans()) {
+ psplits.add(new PhoenixInputSplit(scans));
}
return psplits;
}
@@ -127,10 +128,10 @@ public final class PhoenixInputFormat extends InputFormat<NullWritable, PhoenixR
Preconditions.checkNotNull(selectStatement);
final Statement statement = connection.createStatement();
final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
- this.queryPlan = pstmt.compileQuery(selectStatement);
- // FIXME: why is getting the iterator necessary here, as it will
- // cause the query to run.
- this.queryPlan.iterator();
+ // Optimize the query plan so that we potentially use secondary indexes
+ this.queryPlan = pstmt.optimizeQuery(selectStatement);
+ // Initialize the query plan so it sets up the parallel scans
+ queryPlan.iterator();
} catch(Exception exception) {
LOG.error(String.format("Failed to get the query plan with error [%s]",exception.getMessage()));
throw new RuntimeException(exception);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca478a72/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
index 43d69b3..7414d67 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
@@ -22,12 +22,16 @@ package org.apache.phoenix.pig.hadoop;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.phoenix.query.KeyRange;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
/**
*
@@ -36,6 +40,7 @@ import com.google.common.base.Preconditions;
*/
public class PhoenixInputSplit extends InputSplit implements Writable {
+ private List<Scan> scans;
private KeyRange keyRange;
/**
@@ -48,21 +53,44 @@ public class PhoenixInputSplit extends InputSplit implements Writable {
*
* @param keyRange
*/
- public PhoenixInputSplit(final KeyRange keyRange) {
- Preconditions.checkNotNull(keyRange);
- this.keyRange = keyRange;
+ public PhoenixInputSplit(final List<Scan> scans) {
+ Preconditions.checkNotNull(scans);
+ Preconditions.checkState(!scans.isEmpty());
+ this.scans = scans;
+ init();
+ }
+
+ public List<Scan> getScans() {
+ return scans;
+ }
+
+ public KeyRange getKeyRange() {
+ return keyRange;
+ }
+
+ private void init() {
+ this.keyRange = KeyRange.getKeyRange(scans.get(0).getStartRow(), scans.get(scans.size()-1).getStopRow());
}
@Override
public void readFields(DataInput input) throws IOException {
- this.keyRange = new KeyRange ();
- this.keyRange.readFields(input);
+ int count = WritableUtils.readVInt(input);
+ scans = Lists.newArrayListWithExpectedSize(count);
+ for (int i = 0; i < count; i++) {
+ Scan scan = new Scan();
+ scan.readFields(input);
+ scans.add(scan);
+ }
+ init();
}
@Override
public void write(DataOutput output) throws IOException {
- Preconditions.checkNotNull(keyRange);
- keyRange.write(output);
+ Preconditions.checkNotNull(scans);
+ WritableUtils.writeVInt(output, scans.size());
+ for (Scan scan : scans) {
+ scan.write(output);
+ }
}
@Override
@@ -75,23 +103,18 @@ public class PhoenixInputSplit extends InputSplit implements Writable {
return new String[]{};
}
- /**
- * @return Returns the keyRange.
- */
- public KeyRange getKeyRange() {
- return keyRange;
- }
-
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + ((keyRange == null) ? 0 : keyRange.hashCode());
+ result = prime * result + keyRange.hashCode();
return result;
}
@Override
public boolean equals(Object obj) {
+ // TODO: review: it's a reasonable check to use the keyRange,
+ // but it's not perfect. Do we need an equals impl?
if (this == obj) { return true; }
if (obj == null) { return false; }
if (!(obj instanceof PhoenixInputSplit)) { return false; }
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca478a72/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
index a445ce6..2bff620 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
@@ -21,6 +21,7 @@ package org.apache.phoenix.pig.hadoop;
import java.io.IOException;
import java.sql.SQLException;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -30,16 +31,18 @@ import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.iterate.ConcatResultIterator;
+import org.apache.phoenix.iterate.LookAheadResultIterator;
+import org.apache.phoenix.iterate.PeekingResultIterator;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.SequenceResultIterator;
import org.apache.phoenix.iterate.TableResultIterator;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.pig.PhoenixPigConfiguration;
-import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.util.ScanUtil;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
/**
* RecordReader that process the scan and returns PhoenixRecord
@@ -94,17 +97,19 @@ public final class PhoenixRecordReader extends RecordReader<NullWritable,Phoenix
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
final PhoenixInputSplit pSplit = (PhoenixInputSplit)split;
- final KeyRange keyRange = pSplit.getKeyRange();
- final Scan splitScan = queryPlan.getContext().getScan();
- final Scan scan = new Scan(splitScan);
- ScanUtil.intersectScanRange(scan, keyRange.getLowerRange(), keyRange.getUpperRange(), queryPlan.getContext().getScanRanges().useSkipScanFilter());
+ final List<Scan> scans = pSplit.getScans();
try {
- TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(),scan);
+ List<PeekingResultIterator> iterators = Lists.newArrayListWithExpectedSize(scans.size());
+ for (Scan scan : scans) {
+ final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(),scan);
+ PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
+ iterators.add(peekingResultIterator);
+ }
+ ResultIterator iterator = ConcatResultIterator.newConcatResultIterator(iterators);
if(queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) {
- this.resultIterator = new SequenceResultIterator(tableResultIterator, queryPlan.getContext().getSequenceManager());
- } else {
- this.resultIterator = tableResultIterator;
+ iterator = new SequenceResultIterator(iterator, queryPlan.getContext().getSequenceManager());
}
+ this.resultIterator = iterator;
this.resultSet = new PhoenixResultSet(this.resultIterator, queryPlan.getProjector(),queryPlan.getContext().getStatement());
} catch (SQLException e) {
LOG.error(String.format(" Error [%s] initializing PhoenixRecordReader. ",e.getMessage()));