You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2016/05/04 13:21:00 UTC
[3/3] phoenix git commit: PHOENIX-2628 Ensure split when iterating
through results handled correctly(Rajeshbabu)
PHOENIX-2628 Ensure split when iterating through results handled correctly(Rajeshbabu)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d700c1f0
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d700c1f0
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d700c1f0
Branch: refs/heads/master
Commit: d700c1f032a0f5d119c669100648caf040233ebe
Parents: 99713a6
Author: Rajeshbabu Chintaguntla <ra...@apache.org>
Authored: Wed May 4 18:55:49 2016 +0530
Committer: Rajeshbabu Chintaguntla <ra...@apache.org>
Committed: Wed May 4 18:55:49 2016 +0530
----------------------------------------------------------------------
.../org/apache/phoenix/end2end/BaseViewIT.java | 5 +
.../phoenix/end2end/index/LocalIndexIT.java | 51 +--
.../phoenix/end2end/index/MutableIndexIT.java | 239 ++++++++++-
.../DelayedTableResultIteratorFactory.java | 9 +-
.../iterate/MockParallelIteratorFactory.java | 3 +-
.../regionserver/IndexHalfStoreFileReader.java | 412 +------------------
.../IndexHalfStoreFileReaderGenerator.java | 123 +++---
.../hbase/regionserver/LocalIndexSplitter.java | 37 --
.../LocalIndexStoreFileScanner.java | 254 ++++++++++++
.../phoenix/compile/ListJarsQueryPlan.java | 4 +
.../MutatingParallelIteratorFactory.java | 2 +-
.../org/apache/phoenix/compile/QueryPlan.java | 2 +
.../org/apache/phoenix/compile/ScanRanges.java | 11 +-
.../apache/phoenix/compile/TraceQueryPlan.java | 5 +
.../coprocessor/BaseScannerRegionObserver.java | 94 ++++-
.../GroupedAggregateRegionObserver.java | 16 +-
.../UngroupedAggregateRegionObserver.java | 8 +-
.../apache/phoenix/execute/AggregatePlan.java | 21 +-
.../apache/phoenix/execute/BaseQueryPlan.java | 20 +-
.../phoenix/execute/ClientAggregatePlan.java | 6 +
.../apache/phoenix/execute/ClientScanPlan.java | 7 +-
.../apache/phoenix/execute/CorrelatePlan.java | 8 +-
.../phoenix/execute/DegenerateQueryPlan.java | 2 +-
.../apache/phoenix/execute/HashJoinPlan.java | 8 +-
.../execute/LiteralResultIterationPlan.java | 2 +-
.../org/apache/phoenix/execute/ScanPlan.java | 9 +-
.../phoenix/execute/SortMergeJoinPlan.java | 7 +-
.../phoenix/execute/TupleProjectionPlan.java | 8 +-
.../org/apache/phoenix/execute/UnionPlan.java | 6 +-
.../apache/phoenix/execute/UnnestArrayPlan.java | 8 +-
.../phoenix/iterate/BaseResultIterators.java | 112 +++--
.../phoenix/iterate/ChunkedResultIterator.java | 50 +--
.../DefaultTableResultIteratorFactory.java | 5 +-
.../iterate/ParallelIteratorFactory.java | 5 +-
.../phoenix/iterate/ParallelIterators.java | 14 +-
.../apache/phoenix/iterate/SerialIterators.java | 10 +-
.../phoenix/iterate/SpoolingResultIterator.java | 3 +-
.../phoenix/iterate/TableResultIterator.java | 59 ++-
.../iterate/TableResultIteratorFactory.java | 3 +-
.../apache/phoenix/jdbc/PhoenixStatement.java | 5 +
.../phoenix/mapreduce/PhoenixRecordReader.java | 5 +-
.../java/org/apache/phoenix/util/ScanUtil.java | 64 +++
.../java/org/apache/phoenix/query/BaseTest.java | 2 +-
.../query/ParallelIteratorsSplitTest.java | 9 +-
.../hive/mapreduce/PhoenixRecordReader.java | 5 +-
45 files changed, 1078 insertions(+), 660 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseViewIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseViewIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseViewIT.java
index 7e7175f..65f1f93 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseViewIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseViewIT.java
@@ -33,6 +33,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -45,6 +46,7 @@ import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.BeforeClass;
@@ -98,6 +100,9 @@ public abstract class BaseViewIT extends BaseOwnClusterHBaseManagedTimeIT {
if (saltBuckets == null) {
try (Connection conn = DriverManager.getConnection(getUrl())) {
HTableInterface htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName));
+ if(ScanUtil.isLocalIndex(scan)) {
+ ScanUtil.setLocalIndexAttributes(scan, 0, HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, scan.getStartRow(), scan.getStopRow());
+ }
ResultScanner scanner = htable.getScanner(scan);
Result result = scanner.next();
// Confirm index has rows
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
index 2d79f36..f7edea7 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
@@ -790,15 +790,27 @@ public class LocalIndexIT extends BaseHBaseManagedTimeIT {
admin.getConnection(), indexTable, false);
}
assertEquals(4 + i, regionsOfIndexTable.size());
+ String[] tIdColumnValues = new String[26];
+ String[] v1ColumnValues = new String[26];
+ int[] k1ColumnValue = new int[26];
String query = "SELECT t_id,k1,v1 FROM " + tableName;
rs = conn1.createStatement().executeQuery(query);
Thread.sleep(1000);
for (int j = 0; j < 26; j++) {
assertTrue(rs.next());
- assertEquals(strings[25-j], rs.getString("t_id"));
- assertEquals(25-j, rs.getInt("k1"));
- assertEquals(strings[j], rs.getString("V1"));
+ tIdColumnValues[j] = rs.getString("t_id");
+ k1ColumnValue[j] = rs.getInt("k1");
+ v1ColumnValues[j] = rs.getString("V1");
}
+ Arrays.sort(tIdColumnValues);
+ Arrays.sort(v1ColumnValues);
+ Arrays.sort(k1ColumnValue);
+ assertTrue(Arrays.equals(strings, tIdColumnValues));
+ assertTrue(Arrays.equals(strings, v1ColumnValues));
+ for(int m=0;m<26;m++) {
+ assertEquals(m, k1ColumnValue[m]);
+ }
+
rs = conn1.createStatement().executeQuery("EXPLAIN " + query);
assertEquals(
"CLIENT PARALLEL " + (4 + i) + "-WAY RANGE SCAN OVER "
@@ -817,11 +829,20 @@ public class LocalIndexIT extends BaseHBaseManagedTimeIT {
+ "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
rs = conn1.createStatement().executeQuery(query);
Thread.sleep(1000);
+ int[] k3ColumnValue = new int[26];
for (int j = 0; j < 26; j++) {
assertTrue(rs.next());
- assertEquals(strings[j], rs.getString("t_id"));
- assertEquals(j, rs.getInt("k1"));
- assertEquals(j+2, rs.getInt("k3"));
+ tIdColumnValues[j] = rs.getString("t_id");
+ k1ColumnValue[j] = rs.getInt("k1");
+ k3ColumnValue[j] = rs.getInt("k3");
+ }
+ Arrays.sort(tIdColumnValues);
+ Arrays.sort(k1ColumnValue);
+ Arrays.sort(k3ColumnValue);
+ assertTrue(Arrays.equals(strings, tIdColumnValues));
+ for(int m=0;m<26;m++) {
+ assertEquals(m, k1ColumnValue[m]);
+ assertEquals(m+2, k3ColumnValue[m]);
}
}
} finally {
@@ -1013,24 +1034,6 @@ public class LocalIndexIT extends BaseHBaseManagedTimeIT {
assertEquals(5, regionsOfIndexTable.size());
boolean success = latch1.await(WAIT_TIME_SECONDS, TimeUnit.SECONDS);
assertTrue("Timed out waiting for MockedLocalIndexSplitter.preSplitAfterPONR to complete", success);
- // Verify the metadata for index is correct.
- rs = conn1.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), indexName,
- new String[] { PTableType.INDEX.toString() });
- assertTrue(rs.next());
- assertEquals(indexName, rs.getString(3));
- assertEquals(PIndexState.INACTIVE.toString(), rs.getString("INDEX_STATE"));
- assertFalse(rs.next());
- rs = conn1.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), indexName+"_2",
- new String[] { PTableType.INDEX.toString() });
- assertTrue(rs.next());
- assertEquals(indexName+"_2", rs.getString(3));
- assertEquals(PIndexState.INACTIVE.toString(), rs.getString("INDEX_STATE"));
- assertFalse(rs.next());
-
- String query = "SELECT t_id,k1,v1 FROM " + tableName+"2";
- rs = conn1.createStatement().executeQuery("EXPLAIN " + query);
- assertEquals("CLIENT PARALLEL " + 1 + "-WAY FULL SCAN OVER " + tableName+"2",
- QueryUtil.getExplainPlan(rs));
latch2.countDown();
} finally {
conn1.close();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
index add282e..80f1250 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
@@ -31,14 +32,25 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
+import jline.internal.Log;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
import org.apache.phoenix.end2end.Shadower;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
@@ -86,8 +98,8 @@ public class MutableIndexIT extends BaseHBaseManagedTimeIT {
@Parameters(name="localIndex = {0} , transactional = {1}")
public static Collection<Boolean[]> data() {
- return Arrays.asList(new Boolean[][] {
- { false, false }, { false, true }, { true, false }, { true, true }
+ return Arrays.asList(new Boolean[][] {
+ { false, false }, { false, true }, { true, false }, { true, true }
});
}
@@ -594,4 +606,227 @@ public class MutableIndexIT extends BaseHBaseManagedTimeIT {
}
}
+ @Test
+ public void testSplitDuringIndexScan() throws Exception {
+ testSplitDuringIndexScan(false);
+ }
+
+ @Test
+ public void testSplitDuringIndexReverseScan() throws Exception {
+ testSplitDuringIndexScan(true);
+ }
+
+ private void testSplitDuringIndexScan(boolean isReverse) throws Exception {
+ Properties props = new Properties();
+ props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(2));
+ props.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(false));
+ try(Connection conn1 = DriverManager.getConnection(getUrl(), props)){
+ String[] strings = {"a","b","c","d","e","f","g","h","i","j","k","l","m","n","o","p","q","r","s","t","u","v","w","x","y","z"};
+ HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
+ dropTable(admin, conn1);
+ createTableAndLoadData(conn1, strings, isReverse);
+
+ ResultSet rs = conn1.createStatement().executeQuery("SELECT * FROM " + tableName);
+ assertTrue(rs.next());
+ splitDuringScan(conn1, strings, admin, isReverse);
+ dropTable(admin, conn1);
+ }
+ }
+
+ private void dropTable(HBaseAdmin admin, Connection conn) throws SQLException, IOException {
+ conn.createStatement().execute("DROP TABLE IF EXISTS "+ tableName);
+ if(admin.tableExists(tableName)) {
+ admin.disableTable(TableName.valueOf(tableName));
+ admin.deleteTable(TableName.valueOf(tableName));
+ }
+ if(admin.tableExists(localIndex? MetaDataUtil.getLocalIndexTableName(tableName): indexName)) {
+ admin.disableTable(localIndex? MetaDataUtil.getLocalIndexTableName(tableName): indexName);
+ admin.deleteTable(localIndex? MetaDataUtil.getLocalIndexTableName(tableName): indexName);
+ }
+ }
+
+ private void createTableAndLoadData(Connection conn1, String[] strings, boolean isReverse) throws SQLException {
+ createBaseTable(conn1, tableName, null);
+ for (int i = 0; i < 26; i++) {
+ conn1.createStatement().execute(
+ "UPSERT INTO " + tableName + " values('"+strings[i]+"'," + i + ","
+ + (i + 1) + "," + (i + 2) + ",'" + strings[25 - i] + "')");
+ }
+ conn1.commit();
+ conn1.createStatement().execute(
+ "CREATE " + (localIndex ? "LOCAL" : "")+" INDEX " + indexName + " ON " + tableName + "(v1"+(isReverse?" DESC":"")+") include (k3)");
+ }
+
+ @Test
+ public void testIndexHalfStoreFileReader() throws Exception {
+ Connection conn1 = DriverManager.getConnection(getUrl());
+ HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
+ try {
+ dropTable(admin, conn1);
+ createBaseTable(conn1, tableName, "('e')");
+ conn1.createStatement().execute("CREATE "+(localIndex?"LOCAL":"")+" INDEX " + indexName + " ON " + tableName + "(v1)" + (localIndex?"":" SPLIT ON ('e')"));
+ conn1.createStatement().execute("UPSERT INTO "+tableName+" values('b',1,2,4,'z')");
+ conn1.createStatement().execute("UPSERT INTO "+tableName+" values('f',1,2,3,'z')");
+ conn1.createStatement().execute("UPSERT INTO "+tableName+" values('j',2,4,2,'a')");
+ conn1.createStatement().execute("UPSERT INTO "+tableName+" values('q',3,1,1,'c')");
+ conn1.commit();
+
+ String query = "SELECT count(*) FROM " + tableName +" where v1<='z'";
+ ResultSet rs = conn1.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals(4, rs.getInt(1));
+
+ TableName table = TableName.valueOf(localIndex?tableName: indexName);
+ TableName indexTable = TableName.valueOf(localIndex?MetaDataUtil.getLocalIndexTableName(tableName): indexName);
+ admin.flush(indexTable);
+ boolean merged = false;
+ // merge regions until 1 left
+ end: while (true) {
+ long numRegions = 0;
+ while (true) {
+ rs = conn1.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ System.out.println("Number of rows returned:" + rs.getInt(1));
+ assertEquals(4, rs.getInt(1)); //TODO this returns 5 sometimes instead of 4, duplicate results?
+ try {
+ List<HRegionInfo> indexRegions = admin.getTableRegions(indexTable);
+ numRegions = indexRegions.size();
+ if (numRegions==1) {
+ break end;
+ }
+ if(!merged) {
+ List<HRegionInfo> regions =
+ admin.getTableRegions(localIndex ? table : indexTable);
+ System.out.println("Merging: " + regions.size());
+ admin.mergeRegions(regions.get(0).getEncodedNameAsBytes(),
+ regions.get(1).getEncodedNameAsBytes(), false);
+ merged = true;
+ Threads.sleep(10000);
+ }
+ break;
+ } catch (Exception ex) {
+ Log.info(ex);
+ }
+
+ long waitStartTime = System.currentTimeMillis();
+ // wait until merge happened
+ while (System.currentTimeMillis() - waitStartTime < 10000) {
+ List<HRegionInfo> regions = admin.getTableRegions(indexTable);
+ System.out.println("Waiting:" + regions.size());
+ if (regions.size() < numRegions) {
+ break;
+ }
+ Threads.sleep(1000);
+ }
+ }
+ }
+ } finally {
+ dropTable(admin, conn1);
+ }
+ }
+
+ private List<HRegionInfo> mergeRegions(HBaseAdmin admin, List<HRegionInfo> regionsOfUserTable)
+ throws IOException, InterruptedException {
+ for (int i = 2; i > 0; i--) {
+ Threads.sleep(10000);
+ admin.mergeRegions(regionsOfUserTable.get(0).getEncodedNameAsBytes(),
+ regionsOfUserTable.get(1).getEncodedNameAsBytes(), false);
+ regionsOfUserTable =
+ MetaTableAccessor.getTableRegions(getUtility().getZooKeeperWatcher(), admin.getConnection(),
+ TableName.valueOf(localIndex? tableName:indexName), false);
+
+ while (regionsOfUserTable.size() != i) {
+ Thread.sleep(100);
+ regionsOfUserTable = MetaTableAccessor.getTableRegions(getUtility().getZooKeeperWatcher(),
+ admin.getConnection(), TableName.valueOf(localIndex? tableName:indexName), false);
+ }
+ assertEquals(i, regionsOfUserTable.size());
+ if(localIndex) {
+ List<HRegionInfo> regionsOfIndexTable = MetaTableAccessor.getTableRegions(getUtility().getZooKeeperWatcher(),
+ admin.getConnection(), TableName.valueOf(MetaDataUtil.getLocalIndexTableName(tableName)), false);
+ while (regionsOfIndexTable.size() != i) {
+ Thread.sleep(100);
+ regionsOfIndexTable = MetaTableAccessor.getTableRegions(getUtility().getZooKeeperWatcher(),
+ admin.getConnection(), TableName.valueOf(MetaDataUtil.getLocalIndexTableName(tableName)), false);
+ }
+ assertEquals(i, regionsOfIndexTable.size());
+ }
+ }
+ return regionsOfUserTable;
+ }
+
+ private List<HRegionInfo> splitDuringScan(Connection conn1, String[] strings, HBaseAdmin admin, boolean isReverse)
+ throws SQLException, IOException, InterruptedException {
+ ResultSet rs;
+ String query = "SELECT t_id,k1,v1 FROM " + tableName;
+ rs = conn1.createStatement().executeQuery(query);
+ String[] tIdColumnValues = new String[26];
+ String[] v1ColumnValues = new String[26];
+ int[] k1ColumnValue = new int[26];
+ for (int j = 0; j < 5; j++) {
+ assertTrue(rs.next());
+ tIdColumnValues[j] = rs.getString("t_id");
+ k1ColumnValue[j] = rs.getInt("k1");
+ v1ColumnValues[j] = rs.getString("V1");
+ }
+
+ String[] splitKeys = new String[2];
+ splitKeys[0] = strings[4];
+ splitKeys[1] = strings[12];
+
+ int[] splitInts = new int[2];
+ splitInts[0] = 22;
+ splitInts[1] = 4;
+ List<HRegionInfo> regionsOfUserTable = null;
+ for(int i = 0; i <=1; i++) {
+ Threads.sleep(10000);
+ if(localIndex) {
+ admin.split(Bytes.toBytes(tableName),
+ ByteUtil.concat(Bytes.toBytes(splitKeys[i])));
+ } else {
+ admin.split(Bytes.toBytes(indexName), ByteUtil.concat(Bytes.toBytes(splitInts[i])));
+ }
+ Thread.sleep(100);
+ regionsOfUserTable =
+ MetaTableAccessor.getTableRegions(getUtility().getZooKeeperWatcher(),
+ admin.getConnection(), TableName.valueOf(localIndex?tableName:indexName),
+ false);
+
+ while (regionsOfUserTable.size() != (i+2)) {
+ Thread.sleep(100);
+ regionsOfUserTable =
+ MetaTableAccessor.getTableRegions(getUtility().getZooKeeperWatcher(),
+ admin.getConnection(),
+ TableName.valueOf(localIndex?tableName:indexName), false);
+ }
+ assertEquals(i+2, regionsOfUserTable.size());
+ }
+ for (int j = 5; j < 26; j++) {
+ assertTrue(rs.next());
+ tIdColumnValues[j] = rs.getString("t_id");
+ k1ColumnValue[j] = rs.getInt("k1");
+ v1ColumnValues[j] = rs.getString("V1");
+ }
+ Arrays.sort(tIdColumnValues);
+ Arrays.sort(v1ColumnValues);
+ Arrays.sort(k1ColumnValue);
+ assertTrue(Arrays.equals(strings, tIdColumnValues));
+ assertTrue(Arrays.equals(strings, v1ColumnValues));
+ for(int i=0;i<26;i++) {
+ assertEquals(i, k1ColumnValue[i]);
+ }
+ assertFalse(rs.next());
+ return regionsOfUserTable;
+ }
+
+ private void createBaseTable(Connection conn, String tableName, String splits) throws SQLException {
+ String ddl = "CREATE TABLE " + tableName + " (t_id VARCHAR NOT NULL,\n" +
+ "k1 INTEGER NOT NULL,\n" +
+ "k2 INTEGER NOT NULL,\n" +
+ "k3 INTEGER,\n" +
+ "v1 VARCHAR,\n" +
+ "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n"
+ + (tableDDLOptions!=null?tableDDLOptions:"") + (splits != null ? (" split on " + splits) : "");
+ conn.createStatement().execute(ddl);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
index 74deb71..55bed91 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
@@ -22,6 +22,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.MutationState;
@@ -39,13 +40,13 @@ public class DelayedTableResultIteratorFactory implements TableResultIteratorFac
@Override
public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan,
- CombinableMetric scanMetrics, long renewLeaseThreshold) throws SQLException {
- return new DelayedTableResultIterator(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold);
+ CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException {
+ return new DelayedTableResultIterator(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper);
}
private class DelayedTableResultIterator extends TableResultIterator {
- public DelayedTableResultIterator (MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold) throws SQLException {
- super(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold);
+ public DelayedTableResultIterator (MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException {
+ super(mutationState, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java
index d8a08e6..b5c5f0f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java
@@ -21,6 +21,7 @@ import java.sql.SQLException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.schema.PTable;
@@ -33,7 +34,7 @@ public class MockParallelIteratorFactory implements ParallelIteratorFactory {
@Override
public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan,
- String physicalTableName) throws SQLException {
+ String physicalTableName, QueryPlan plan) throws SQLException {
return new MockResultIterator(String.valueOf(counter.incrementAndGet()), table);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java
index cbc4ed6..d1d12fb 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java
@@ -18,27 +18,18 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.Map;
-import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.Type;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.index.IndexMaintainer;
/**
@@ -57,7 +48,6 @@ import org.apache.phoenix.index.IndexMaintainer;
*/
public class IndexHalfStoreFileReader extends StoreFile.Reader {
- private static final int ROW_KEY_LENGTH = 2;
private final boolean top;
// This is the key we split around. Its the first possible entry on a row:
// i.e. empty column and a timestamp of LATEST_TIMESTAMP.
@@ -73,36 +63,6 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader {
* @param fs
* @param p
* @param cacheConf
- * @param r
- * @param conf
- * @param indexMaintainers
- * @param viewConstants
- * @param regionInfo
- * @param regionStartKeyInHFile
- * @param splitKey
- * @throws IOException
- */
- public IndexHalfStoreFileReader(final FileSystem fs, final Path p, final CacheConfig cacheConf,
- final Reference r, final Configuration conf,
- final Map<ImmutableBytesWritable, IndexMaintainer> indexMaintainers,
- final byte[][] viewConstants, final HRegionInfo regionInfo,
- final byte[] regionStartKeyInHFile, byte[] splitKey) throws IOException {
- super(fs, p, cacheConf, conf);
- this.splitkey = splitKey == null ? r.getSplitKey() : splitKey;
- // Is it top or bottom half?
- this.top = Reference.isTopFileRegion(r.getFileRegion());
- this.splitRow = CellUtil.cloneRow(KeyValue.createKeyValueFromKey(splitkey));
- this.indexMaintainers = indexMaintainers;
- this.viewConstants = viewConstants;
- this.regionInfo = regionInfo;
- this.regionStartKeyInHFile = regionStartKeyInHFile;
- this.offset = regionStartKeyInHFile.length;
- }
-
- /**
- * @param fs
- * @param p
- * @param cacheConf
* @param in
* @param size
* @param r
@@ -132,371 +92,35 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader {
this.offset = regionStartKeyInHFile.length;
}
- protected boolean isTop() {
- return this.top;
+ public int getOffset() {
+ return offset;
}
- @Override
- public HFileScanner getScanner(final boolean cacheBlocks, final boolean pread,
- final boolean isCompaction) {
- final HFileScanner s = super.getScanner(cacheBlocks, pread, isCompaction);
- return new HFileScanner() {
- final HFileScanner delegate = s;
- public boolean atEnd = false;
-
- @Override
- public ByteBuffer getKey() {
- if (atEnd) {
- return null;
- }
- boolean changeBottomKeys =
- regionInfo.getStartKey().length == 0 && splitRow.length != offset;
- if (!top) {
- // For first region we are prepending empty byte array of length region end key.
- // So if split row length is not equal to region end key length then we need to
- // replace empty bytes of split row length. Because after split end key is the split
- // row.
- if(!changeBottomKeys) return delegate.getKey();
- }
- // If it is top store file replace the StartKey of the Key with SplitKey
- return getChangedKey(delegate.getKeyValue(), changeBottomKeys);
- }
-
- private ByteBuffer getChangedKey(Cell kv, boolean changeBottomKeys) {
- // new KeyValue(row, family, qualifier, timestamp, type, value)
- byte[] newRowkey = getNewRowkeyByRegionStartKeyReplacedWithSplitKey(kv, changeBottomKeys);
- KeyValue newKv =
- new KeyValue(newRowkey, 0, newRowkey.length, kv.getFamilyArray(),
- kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(),
- kv.getQualifierOffset(), kv.getQualifierLength(),
- kv.getTimestamp(), Type.codeToType(kv.getTypeByte()), null, 0, 0);
- ByteBuffer keyBuffer = ByteBuffer.wrap(newKv.getKey());
- return keyBuffer;
- }
-
- private byte[] getNewRowkeyByRegionStartKeyReplacedWithSplitKey(Cell kv, boolean changeBottomKeys) {
- int lenOfRemainingKey = kv.getRowLength() - offset;
- byte[] keyReplacedStartKey = new byte[lenOfRemainingKey + splitRow.length];
- System.arraycopy(changeBottomKeys ? new byte[splitRow.length] : splitRow, 0,
- keyReplacedStartKey, 0, splitRow.length);
- System.arraycopy(kv.getRowArray(), kv.getRowOffset() + offset, keyReplacedStartKey,
- splitRow.length, lenOfRemainingKey);
- return keyReplacedStartKey;
- }
-
- @Override
- public String getKeyString() {
- if (atEnd) {
- return null;
- }
- return Bytes.toStringBinary(getKey());
- }
-
- @Override
- public ByteBuffer getValue() {
- if (atEnd) {
- return null;
- }
- return delegate.getValue();
- }
-
- @Override
- public String getValueString() {
- if (atEnd) {
- return null;
- }
- return Bytes.toStringBinary(getValue());
- }
-
- @Override
- public Cell getKeyValue() {
- if (atEnd) {
- return null;
- }
- Cell kv = delegate.getKeyValue();
- boolean changeBottomKeys =
- regionInfo.getStartKey().length == 0 && splitRow.length != offset;
- if (!top) {
- if(!changeBottomKeys) return kv;
- }
- // If it is a top store file change the StartKey with SplitKey in Key
- // and produce the new value corresponding to the change in key
- byte[] changedKey = getNewRowkeyByRegionStartKeyReplacedWithSplitKey(kv, changeBottomKeys);
- KeyValue changedKv =
- new KeyValue(changedKey, 0, changedKey.length, kv.getFamilyArray(),
- kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(),
- kv.getQualifierOffset(), kv.getQualifierLength(),
- kv.getTimestamp(), Type.codeToType(kv.getTypeByte()),
- kv.getValueArray(), kv.getValueOffset(), kv.getValueLength(),
- kv.getTagsArray(), kv.getTagsOffset(), kv.getTagsLength());
- return changedKv;
- }
-
- @Override
- public boolean next() throws IOException {
- if (atEnd) {
- return false;
- }
- while (true) {
- boolean b = delegate.next();
- if (!b) {
- atEnd = true;
- return b;
- }
- // We need to check whether the current KV pointed by this reader is
- // corresponding to
- // this split or not.
- // In case of top store file if the ActualRowKey >= SplitKey
- // In case of bottom store file if the ActualRowKey < Splitkey
- if (isSatisfiedMidKeyCondition(delegate.getKeyValue())) {
- return true;
- }
- }
- }
-
- @Override
- public boolean seekBefore(byte[] key) throws IOException {
- return seekBefore(key, 0, key.length);
- }
-
- @Override
- public boolean seekBefore(byte[] key, int offset, int length) throws IOException {
-
- if (top) {
- byte[] fk = getFirstKey();
- // This will be null when the file is empty in which we can not seekBefore to
- // any key
- if (fk == null) {
- return false;
- }
- if (getComparator().compare(key, offset, length, fk, 0, fk.length) <= 0) {
- return false;
- }
- KeyValue replacedKey = getKeyPresentInHFiles(key);
- return this.delegate.seekBefore(replacedKey);
- } else {
- // The equals sign isn't strictly necessary just here to be consistent with
- // seekTo
- if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) >= 0) {
- return this.delegate.seekBefore(splitkey, 0, splitkey.length);
- }
- }
- return this.delegate.seekBefore(key, offset, length);
- }
-
- @Override
- public boolean seekBefore(Cell cell) throws IOException {
- KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- return seekBefore(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
- }
-
- @Override
- public boolean seekTo() throws IOException {
- boolean b = delegate.seekTo();
- if (!b) {
- atEnd = true;
- return b;
- }
- while (true) {
- // We need to check the first occurrence of satisfying the condition
- // In case of top store file if the ActualRowKey >= SplitKey
- // In case of bottom store file if the ActualRowKey < Splitkey
- if (isSatisfiedMidKeyCondition(delegate.getKeyValue())) {
- return true;
- }
- b = delegate.next();
- if (!b) {
- return b;
- }
- }
- }
-
- @Override
- public int seekTo(byte[] key) throws IOException {
- return seekTo(key, 0, key.length);
- }
-
- @Override
- public int seekTo(byte[] key, int offset, int length) throws IOException {
- if (top) {
- if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) < 0) {
- return -1;
- }
- KeyValue replacedKey = getKeyPresentInHFiles(key);
-
- int seekTo =
- delegate.seekTo(replacedKey.getBuffer(), replacedKey.getKeyOffset(),
- replacedKey.getKeyLength());
- return seekTo;
- /*
- * if (seekTo == 0 || seekTo == -1) { return seekTo; } else if (seekTo == 1) {
- * boolean next = this.next(); }
- */
- } else {
- if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) >= 0) {
- // we would place the scanner in the second half.
- // it might be an error to return false here ever...
- boolean res = delegate.seekBefore(splitkey, 0, splitkey.length);
- if (!res) {
- throw new IOException(
- "Seeking for a key in bottom of file, but key exists in top of file, failed on seekBefore(midkey)");
- }
- return 1;
- }
- }
- return delegate.seekTo(key, offset, length);
- }
-
- @Override
- public int seekTo(Cell cell) throws IOException {
- KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- return seekTo(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
- }
-
- @Override
- public int reseekTo(byte[] key) throws IOException {
- return reseekTo(key, 0, key.length);
- }
-
- @Override
- public int reseekTo(byte[] key, int offset, int length) throws IOException {
- if (top) {
- if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) < 0) {
- return -1;
- }
- KeyValue replacedKey = getKeyPresentInHFiles(key);
- return delegate.reseekTo(replacedKey.getBuffer(), replacedKey.getKeyOffset(),
- replacedKey.getKeyLength());
- } else {
- if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) >= 0) {
- // we would place the scanner in the second half.
- // it might be an error to return false here ever...
- boolean res = delegate.seekBefore(splitkey, 0, splitkey.length);
- if (!res) {
- throw new IOException(
- "Seeking for a key in bottom of file, but key exists in top of file, failed on seekBefore(midkey)");
- }
- return 1;
- }
- }
- return delegate.reseekTo(key, offset, length);
- }
-
- @Override
- public int reseekTo(Cell cell) throws IOException {
- KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- return reseekTo(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
- }
-
- @Override
- public org.apache.hadoop.hbase.io.hfile.HFile.Reader getReader() {
- return this.delegate.getReader();
- }
-
- // TODO: Need to change as per IndexHalfStoreFileReader
- @Override
- public boolean isSeeked() {
- return this.delegate.isSeeked();
- }
-
- // Added for compatibility with HBASE-13109
- // Once we drop support for older versions, add an @override annotation here
- // and figure out how to get the next indexed key
- public Cell getNextIndexedKey() {
- return null; // indicate that we cannot use the optimization
- }
- };
+ public byte[][] getViewConstants() {
+ return viewConstants;
}
- private boolean isSatisfiedMidKeyCondition(Cell kv) {
- if (CellUtil.isDelete(kv) && kv.getValueLength() == 0) {
- // In case of a Delete type KV, let it be going to both the daughter regions.
- // No problems in doing so. In the correct daughter region where it belongs to, this delete
- // tomb will really delete a KV. In the other it will just hang around there with no actual
- // kv coming for which this is a delete tomb. :)
- return true;
- }
- ImmutableBytesWritable rowKey =
- new ImmutableBytesWritable(kv.getRowArray(), kv.getRowOffset() + offset,
- kv.getRowLength() - offset);
- Entry<ImmutableBytesWritable, IndexMaintainer> entry = indexMaintainers.entrySet().iterator().next();
- IndexMaintainer indexMaintainer = entry.getValue();
- byte[] viewIndexId = indexMaintainer.getViewIndexIdFromIndexRowKey(rowKey);
- IndexMaintainer actualIndexMaintainer = indexMaintainers.get(new ImmutableBytesWritable(viewIndexId));
- byte[] dataRowKey = actualIndexMaintainer.buildDataRowKey(rowKey, this.viewConstants);
- int compareResult = Bytes.compareTo(dataRowKey, splitRow);
- if (top) {
- if (compareResult >= 0) {
- return true;
- }
- } else {
- if (compareResult < 0) {
- return true;
- }
- }
- return false;
+ public Map<ImmutableBytesWritable, IndexMaintainer> getIndexMaintainers() {
+ return indexMaintainers;
}
- /**
- * In case of top half store, the passed key will be with the start key of the daughter region.
- * But in the actual HFiles, the key will be with the start key of the old parent region. In
- * order to make the real seek in the HFiles, we need to build the old key.
- *
- * The logic here is just replace daughter region start key with parent region start key
- * in the key part.
- *
- * @param key
- *
- */
- private KeyValue getKeyPresentInHFiles(byte[] key) {
- KeyValue keyValue = new KeyValue(key);
- int rowLength = keyValue.getRowLength();
- int rowOffset = keyValue.getRowOffset();
-
- int daughterStartKeyLength =
- regionInfo.getStartKey().length == 0 ? regionInfo.getEndKey().length : regionInfo
- .getStartKey().length;
-
- // This comes incase of deletefamily
- if (top
- && 0 == keyValue.getValueLength()
- && keyValue.getTimestamp() == HConstants.LATEST_TIMESTAMP
- && Bytes.compareTo(keyValue.getRowArray(), keyValue.getRowOffset(),
- keyValue.getRowLength(), splitRow, 0, splitRow.length) == 0
- && CellUtil.isDeleteFamily(keyValue)) {
- KeyValue createFirstDeleteFamilyOnRow =
- KeyValueUtil.createFirstDeleteFamilyOnRow(regionStartKeyInHFile,
- keyValue.getFamily());
- return createFirstDeleteFamilyOnRow;
- }
+ public HRegionInfo getRegionInfo() {
+ return regionInfo;
+ }
- short length = (short) (keyValue.getRowLength() - daughterStartKeyLength + offset);
- byte[] replacedKey =
- new byte[length + key.length - (rowOffset + rowLength) + ROW_KEY_LENGTH];
- System.arraycopy(Bytes.toBytes(length), 0, replacedKey, 0, ROW_KEY_LENGTH);
- System.arraycopy(regionStartKeyInHFile, 0, replacedKey, ROW_KEY_LENGTH, offset);
- System.arraycopy(keyValue.getRowArray(), keyValue.getRowOffset() + daughterStartKeyLength,
- replacedKey, offset + ROW_KEY_LENGTH, keyValue.getRowLength()
- - daughterStartKeyLength);
- System.arraycopy(key, rowOffset + rowLength, replacedKey,
- offset + keyValue.getRowLength() - daughterStartKeyLength
- + ROW_KEY_LENGTH, key.length - (rowOffset + rowLength));
- return KeyValue.createKeyValueFromKey(replacedKey);
+ public byte[] getRegionStartKeyInHFile() {
+ return regionStartKeyInHFile;
}
- @Override
- public byte[] midkey() throws IOException {
- // Returns null to indicate file is not splitable.
- return null;
+ public byte[] getSplitkey() {
+ return splitkey;
}
- @Override
- public byte[] getFirstKey() {
- return super.getFirstKey();
+ public byte[] getSplitRow() {
+ return splitRow;
}
- @Override
- public boolean passesKeyRangeFilter(Scan scan) {
- return true;
+ public boolean isTop() {
+ return top;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
index b48314d..6cf8fa1 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
@@ -19,10 +19,12 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.NavigableSet;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -51,11 +53,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.parse.AlterIndexStatement;
-import org.apache.phoenix.parse.ParseNodeFactory;
-import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.PTableType;
@@ -67,10 +65,6 @@ import org.apache.phoenix.util.QueryUtil;
public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver {
- int storeFilesCount = 0;
- int compactedFilesCount = 0;
- private static final ParseNodeFactory FACTORY = new ParseNodeFactory();
-
@Override
public Reader preStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx,
FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf,
@@ -116,7 +110,17 @@ public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver {
// We need not change any thing in first region data because first region start key
// is equal to merged region start key. So returning same reader.
if (Bytes.compareTo(mergeRegions.getFirst().getStartKey(), splitRow) == 0) {
- return reader;
+ if (mergeRegions.getFirst().getStartKey().length == 0
+ && region.getRegionInfo().getEndKey().length != mergeRegions
+ .getFirst().getEndKey().length) {
+ childRegion = mergeRegions.getFirst();
+ regionStartKeyInHFile =
+ mergeRegions.getFirst().getStartKey().length == 0 ? new byte[mergeRegions
+ .getFirst().getEndKey().length] : mergeRegions.getFirst()
+ .getStartKey();
+ } else {
+ return reader;
+ }
} else {
childRegion = mergeRegions.getSecond();
regionStartKeyInHFile = mergeRegions.getSecond().getStartKey();
@@ -170,58 +174,31 @@ public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver {
}
return reader;
}
-
+
+ @SuppressWarnings("deprecation")
@Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
long earliestPutTs, InternalScanner s, CompactionRequest request) throws IOException {
- InternalScanner internalScanner = super.preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s, request);
- Collection<StoreFile> files = request.getFiles();
- storeFilesCount = 0;
- compactedFilesCount = 0;
- for(StoreFile file:files) {
- if(!file.isReference()) {
- return internalScanner;
- }
+ if (!scanType.equals(ScanType.COMPACT_DROP_DELETES) || s != null || !store.hasReferences()) {
+ return s;
}
- storeFilesCount = files.size();
- return internalScanner;
- }
-
- @Override
- public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
- StoreFile resultFile) throws IOException {
- super.postCompact(e, store, resultFile);
- if(storeFilesCount > 0) compactedFilesCount++;
- if(compactedFilesCount == storeFilesCount) {
- PhoenixConnection conn = null;
- try {
- conn = QueryUtil.getConnectionOnServer(e.getEnvironment().getConfiguration()).unwrap(
- PhoenixConnection.class);
- MetaDataClient client = new MetaDataClient(conn);
- String userTableName = MetaDataUtil.getUserTableName(e.getEnvironment().getRegion().getTableDesc().getNameAsString());
- PTable dataTable = PhoenixRuntime.getTable(conn, userTableName);
- List<PTable> indexes = dataTable.getIndexes();
- for (PTable index : indexes) {
- if (index.getIndexType() == IndexType.LOCAL) {
- AlterIndexStatement indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null,
- org.apache.phoenix.parse.TableName.create(index.getSchemaName().getString(), index.getTableName().getString())),
- dataTable.getTableName().getString(), false, PIndexState.ACTIVE);
- client.alterIndex(indexStatement);
- }
- }
- conn.commit();
- } catch (ClassNotFoundException ex) {
- } catch (SQLException ex) {
- } finally {
- if (conn != null) {
- try {
- conn.close();
- } catch (SQLException ex) {
- }
- }
+ List<StoreFileScanner> newScanners = new ArrayList<StoreFileScanner>(scanners.size());
+ Scan scan = new Scan();
+ scan.setMaxVersions(store.getFamily().getMaxVersions());
+ boolean scanUsePread = c.getEnvironment().getConfiguration().getBoolean("hbase.storescanner.use.pread", scan.isSmall());
+ for(KeyValueScanner scanner: scanners) {
+ Reader reader = ((StoreFileScanner) scanner).getReader();
+ if (reader instanceof IndexHalfStoreFileReader) {
+ newScanners.add(new LocalIndexStoreFileScanner(reader, reader.getScanner(
+ scan.getCacheBlocks(), scanUsePread, false), true, reader.getHFileReader()
+ .hasMVCCInfo(), store.getSmallestReadPoint()));
+ } else {
+ newScanners.add(((StoreFileScanner) scanner));
}
}
+ return new StoreScanner(store, store.getScanInfo(), scan, newScanners,
+ scanType, store.getSmallestReadPoint(), earliestPutTs);
}
private byte[][] getViewConstants(PTable dataTable) {
@@ -254,4 +231,42 @@ public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver {
}
return viewConstants;
}
+
+ @Override
+ public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Store store, final Scan scan, final NavigableSet<byte[]> targetCols,
+ final KeyValueScanner s) throws IOException {
+ if(store.hasReferences()) {
+ long readPt = c.getEnvironment().getRegion().getReadpoint(scan.getIsolationLevel());
+ boolean scanUsePread = c.getEnvironment().getConfiguration().getBoolean("hbase.storescanner.use.pread", scan.isSmall());
+ Collection<StoreFile> storeFiles = store.getStorefiles();
+ List<StoreFile> nonReferenceStoreFiles = new ArrayList<StoreFile>(store.getStorefiles().size());
+ List<StoreFile> referenceStoreFiles = new ArrayList<StoreFile>(store.getStorefiles().size());
+ List<KeyValueScanner> keyValueScanners = new ArrayList<KeyValueScanner>(store.getStorefiles().size()+1);
+ for(StoreFile storeFile : storeFiles) {
+ if (storeFile.isReference()) {
+ referenceStoreFiles.add(storeFile);
+ } else {
+ nonReferenceStoreFiles.add(storeFile);
+ }
+ }
+ List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(nonReferenceStoreFiles, scan.getCacheBlocks(), scanUsePread, readPt);
+ keyValueScanners.addAll(scanners);
+ for(StoreFile sf : referenceStoreFiles) {
+ if(sf.getReader() instanceof IndexHalfStoreFileReader) {
+ keyValueScanners.add(new LocalIndexStoreFileScanner(sf.getReader(), sf.getReader()
+ .getScanner(scan.getCacheBlocks(), scanUsePread, false), true, sf
+ .getReader().getHFileReader().hasMVCCInfo(), readPt));
+ } else {
+ keyValueScanners.add(new StoreFileScanner(sf.getReader(), sf.getReader()
+ .getScanner(scan.getCacheBlocks(), scanUsePread, false), true, sf
+ .getReader().getHFileReader().hasMVCCInfo(), readPt));
+ }
+ }
+ keyValueScanners.addAll(((HStore)store).memstore.getScanners(readPt));
+ if(!scan.isReversed()) return new StoreScanner(scan, store.getScanInfo(), ScanType.USER_SCAN, targetCols, keyValueScanners);
+ return new ReversedStoreScanner(scan, store.getScanInfo(), ScanType.USER_SCAN, targetCols, keyValueScanners);
+ }
+ return s;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java
index 63cf3ba..ba158a8 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java
@@ -31,23 +31,14 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.phoenix.hbase.index.util.VersionUtil;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.parse.AlterIndexStatement;
import org.apache.phoenix.parse.ParseNodeFactory;
-import org.apache.phoenix.schema.MetaDataClient;
-import org.apache.phoenix.schema.PIndexState;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.types.PBoolean;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.MetaDataUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
-import java.sql.SQLException;
import java.util.List;
public class LocalIndexSplitter extends BaseRegionObserver {
@@ -144,34 +135,6 @@ public class LocalIndexSplitter extends BaseRegionObserver {
throws IOException {
if (st == null || daughterRegions == null) return;
RegionCoprocessorEnvironment environment = ctx.getEnvironment();
- PhoenixConnection conn = null;
- try {
- conn = QueryUtil.getConnectionOnServer(ctx.getEnvironment().getConfiguration()).unwrap(
- PhoenixConnection.class);
- MetaDataClient client = new MetaDataClient(conn);
- String userTableName = ctx.getEnvironment().getRegion().getTableDesc().getNameAsString();
- PTable dataTable = PhoenixRuntime.getTable(conn, userTableName);
- List<PTable> indexes = dataTable.getIndexes();
- for (PTable index : indexes) {
- if (index.getIndexType() == IndexType.LOCAL) {
- AlterIndexStatement indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null,
- org.apache.phoenix.parse.TableName.create(index.getSchemaName().getString(), index.getTableName().getString())),
- dataTable.getTableName().getString(), false, PIndexState.INACTIVE);
- client.alterIndex(indexStatement);
- }
- }
- conn.commit();
- } catch (ClassNotFoundException ex) {
- } catch (SQLException ex) {
- } finally {
- if (conn != null) {
- try {
- conn.close();
- } catch (SQLException ex) {
- }
- }
- }
-
HRegionServer rs = (HRegionServer) environment.getRegionServerServices();
st.stepsAfterPONR(rs, rs, daughterRegions);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexStoreFileScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexStoreFileScanner.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexStoreFileScanner.java
new file mode 100644
index 0000000..a6e5005
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexStoreFileScanner.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.index.IndexMaintainer;
+
+import static org.apache.hadoop.hbase.KeyValue.ROW_LENGTH_SIZE;
+
+public class LocalIndexStoreFileScanner extends StoreFileScanner{
+
+ private IndexHalfStoreFileReader reader;
+ private boolean changeBottomKeys;
+ public LocalIndexStoreFileScanner(Reader reader, HFileScanner hfs, boolean useMVCC,
+ boolean hasMVCC, long readPt) {
+ super(reader, hfs, useMVCC, hasMVCC, readPt);
+ this.reader = ((IndexHalfStoreFileReader)super.getReader());
+ this.changeBottomKeys =
+ this.reader.getRegionInfo().getStartKey().length == 0
+ && this.reader.getSplitRow().length != this.reader.getOffset();
+ }
+
+ @Override
+ public Cell next() throws IOException {
+ Cell next = super.next();
+ while(next !=null && !isSatisfiedMidKeyCondition(next)) {
+ next = super.next();
+ }
+ while(super.peek() != null && !isSatisfiedMidKeyCondition(super.peek())) {
+ super.next();
+ }
+ if (next!=null && (reader.isTop() || changeBottomKeys)) {
+ next = getChangedKey(next, !reader.isTop() && changeBottomKeys);
+ }
+ return next;
+ }
+
+ @Override
+ public Cell peek() {
+ Cell peek = super.peek();
+ if (peek != null && (reader.isTop() || changeBottomKeys)) {
+ peek = getChangedKey(peek, !reader.isTop() && changeBottomKeys);
+ }
+ return peek;
+ }
+
+ private KeyValue getChangedKey(Cell next, boolean changeBottomKeys) {
+ // If it is a top store file change the StartKey with SplitKey in Key
+ //and produce the new value corresponding to the change in key
+ byte[] changedKey = getNewRowkeyByRegionStartKeyReplacedWithSplitKey(next, changeBottomKeys);
+ KeyValue changedKv =
+ new KeyValue(changedKey, 0, changedKey.length, next.getFamilyArray(),
+ next.getFamilyOffset(), next.getFamilyLength(), next.getQualifierArray(),
+ next.getQualifierOffset(), next.getQualifierLength(),
+ next.getTimestamp(), Type.codeToType(next.getTypeByte()),
+ next.getValueArray(), next.getValueOffset(), next.getValueLength(),
+ next.getTagsArray(), next.getTagsOffset(), next.getTagsLength());
+ return changedKv;
+ }
+
+ @Override
+ public boolean seek(Cell key) throws IOException {
+ return seekOrReseek(key, true);
+ }
+
+ @Override
+ public boolean reseek(Cell key) throws IOException {
+ return seekOrReseek(key, false);
+ }
+
+ @Override
+ public boolean seekToPreviousRow(Cell key) throws IOException {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(key);
+ if (reader.isTop()) {
+ byte[] fk = reader.getFirstKey();
+ // This will be null when the file is empty in which we can not seekBefore to
+ // any key
+ if (fk == null) {
+ return false;
+ }
+ if (getComparator().compare(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), fk, 0, fk.length) <= 0) {
+ return super.seekToPreviousRow(key);
+ }
+ KeyValue replacedKey = getKeyPresentInHFiles(kv.getBuffer());
+ boolean seekToPreviousRow = super.seekToPreviousRow(replacedKey);
+ while(super.peek()!=null && !isSatisfiedMidKeyCondition(super.peek())) {
+ seekToPreviousRow = super.seekToPreviousRow(super.peek());
+ }
+ return seekToPreviousRow;
+ } else {
+ // The equals sign isn't strictly necessary just here to be consistent with
+ // seekTo
+ if (getComparator().compare(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), reader.getSplitkey(), 0, reader.getSplitkey().length) >= 0) {
+ boolean seekToPreviousRow = super.seekToPreviousRow(kv);
+ while(super.peek()!=null && !isSatisfiedMidKeyCondition(super.peek())) {
+ seekToPreviousRow = super.seekToPreviousRow(super.peek());
+ }
+ return seekToPreviousRow;
+ }
+ }
+ boolean seekToPreviousRow = super.seekToPreviousRow(kv);
+ while(super.peek()!=null && !isSatisfiedMidKeyCondition(super.peek())) {
+ seekToPreviousRow = super.seekToPreviousRow(super.peek());
+ }
+ return seekToPreviousRow;
+ }
+
+ @Override
+ public boolean seekToLastRow() throws IOException {
+ boolean seekToLastRow = super.seekToLastRow();
+ while(super.peek()!=null && !isSatisfiedMidKeyCondition(super.peek())) {
+ seekToLastRow = super.seekToPreviousRow(super.peek());
+ }
+ return seekToLastRow;
+ }
+
+ private boolean isSatisfiedMidKeyCondition(Cell kv) {
+ if (CellUtil.isDelete(kv) && kv.getValueLength() == 0) {
+ // In case of a Delete type KV, let it be going to both the daughter regions.
+ // No problems in doing so. In the correct daughter region where it belongs to, this delete
+ // tomb will really delete a KV. In the other it will just hang around there with no actual
+ // kv coming for which this is a delete tomb. :)
+ return true;
+ }
+ ImmutableBytesWritable rowKey =
+ new ImmutableBytesWritable(kv.getRowArray(), kv.getRowOffset() + reader.getOffset(),
+ kv.getRowLength() - reader.getOffset());
+ Entry<ImmutableBytesWritable, IndexMaintainer> entry = reader.getIndexMaintainers().entrySet().iterator().next();
+ IndexMaintainer indexMaintainer = entry.getValue();
+ byte[] viewIndexId = indexMaintainer.getViewIndexIdFromIndexRowKey(rowKey);
+ IndexMaintainer actualIndexMaintainer = reader.getIndexMaintainers().get(new ImmutableBytesWritable(viewIndexId));
+ byte[] dataRowKey = actualIndexMaintainer.buildDataRowKey(rowKey, reader.getViewConstants());
+ int compareResult = Bytes.compareTo(dataRowKey, reader.getSplitRow());
+ if (reader.isTop()) {
+ if (compareResult >= 0) {
+ return true;
+ }
+ } else {
+ if (compareResult < 0) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * In case of top half store, the passed key will be with the start key of the daughter region.
+ * But in the actual HFiles, the key will be with the start key of the old parent region. In
+ * order to make the real seek in the HFiles, we need to build the old key.
+ *
+ * The logic here is just replace daughter region start key with parent region start key
+ * in the key part.
+ *
+ * @param key
+ *
+ */
+ private KeyValue getKeyPresentInHFiles(byte[] key) {
+ KeyValue keyValue = new KeyValue(key);
+ int rowLength = keyValue.getRowLength();
+ int rowOffset = keyValue.getRowOffset();
+
+ short length = (short) (rowLength - reader.getSplitRow().length + reader.getOffset());
+ byte[] replacedKey =
+ new byte[length + key.length - (rowOffset + rowLength) + ROW_LENGTH_SIZE];
+ System.arraycopy(Bytes.toBytes(length), 0, replacedKey, 0, ROW_LENGTH_SIZE);
+ System.arraycopy(reader.getRegionStartKeyInHFile(), 0, replacedKey, ROW_LENGTH_SIZE, reader.getOffset());
+ System.arraycopy(keyValue.getRowArray(), keyValue.getRowOffset() + reader.getSplitRow().length,
+ replacedKey, reader.getOffset() + ROW_LENGTH_SIZE, rowLength
+ - reader.getSplitRow().length);
+ System.arraycopy(key, rowOffset + rowLength, replacedKey,
+ reader.getOffset() + keyValue.getRowLength() - reader.getSplitRow().length
+ + ROW_LENGTH_SIZE, key.length - (rowOffset + rowLength));
+ return new KeyValue.KeyOnlyKeyValue(replacedKey);
+ }
+
+ /**
+ *
+ * @param kv
+ * @param isSeek pass true for seek, false for reseek.
+ * @return
+ * @throws IOException
+ */
+ public boolean seekOrReseek(Cell cell, boolean isSeek) throws IOException{
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+ KeyValue keyToSeek = kv;
+ if (reader.isTop()) {
+ if(getComparator().compare(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), reader.getSplitkey(), 0, reader.getSplitkey().length) < 0){
+ if(!isSeek && realSeekDone()) {
+ return true;
+ }
+ return seekOrReseekToProperKey(isSeek, keyToSeek);
+ }
+ keyToSeek = getKeyPresentInHFiles(kv.getBuffer());
+ return seekOrReseekToProperKey(isSeek, keyToSeek);
+ } else {
+ if (getComparator().compare(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), reader.getSplitkey(), 0, reader.getSplitkey().length) >= 0) {
+ close();
+ return false;
+ }
+ if(!isSeek && reader.getRegionInfo().getStartKey().length == 0 && reader.getSplitRow().length > reader.getRegionStartKeyInHFile().length) {
+ keyToSeek = getKeyPresentInHFiles(kv.getBuffer());
+ }
+ }
+ return seekOrReseekToProperKey(isSeek, keyToSeek);
+ }
+
+ private boolean seekOrReseekToProperKey(boolean isSeek, KeyValue kv)
+ throws IOException {
+ boolean seekOrReseek = isSeek ? super.seek(kv) : super.reseek(kv);
+ while (seekOrReseek && super.peek() != null
+ && !isSatisfiedMidKeyCondition(super.peek())) {
+ super.next();
+ seekOrReseek = super.peek() != null;
+ }
+ return seekOrReseek;
+ }
+
+ private byte[] getNewRowkeyByRegionStartKeyReplacedWithSplitKey(Cell kv, boolean changeBottomKeys) {
+ int lenOfRemainingKey = kv.getRowLength() - reader.getOffset();
+ byte[] keyReplacedStartKey = new byte[lenOfRemainingKey + reader.getSplitRow().length];
+ System.arraycopy(changeBottomKeys ? new byte[reader.getSplitRow().length] : reader.getSplitRow(), 0,
+ keyReplacedStartKey, 0, reader.getSplitRow().length);
+ System.arraycopy(kv.getRowArray(), kv.getRowOffset() + reader.getOffset(), keyReplacedStartKey,
+ reader.getSplitRow().length, lenOfRemainingKey);
+ return keyReplacedStartKey;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
index 94736ed..b52e704 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
@@ -117,6 +117,10 @@ public class ListJarsQueryPlan implements QueryPlan {
}
@Override
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan s) throws SQLException {
+ return iterator(scanGrouper);
+ }
+ @Override
public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
return new ResultIterator() {
private RemoteIterator<LocatedFileStatus> listFiles = null;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
index ed421a7..8e63fa9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
@@ -53,7 +53,7 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato
abstract protected MutationState mutate(StatementContext parentContext, ResultIterator iterator, PhoenixConnection connection) throws SQLException;
@Override
- public PeekingResultIterator newIterator(final StatementContext parentContext, ResultIterator iterator, Scan scan, String tableName) throws SQLException {
+ public PeekingResultIterator newIterator(final StatementContext parentContext, ResultIterator iterator, Scan scan, String tableName, QueryPlan plan) throws SQLException {
final PhoenixConnection clonedConnection = new PhoenixConnection(this.connection);
MutationState state = mutate(parentContext, iterator, clonedConnection);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
index 4dcc134..7722483 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@ -49,6 +49,8 @@ public interface QueryPlan extends StatementPlan {
public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException;
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException;
+
public long getEstimatedSize();
// TODO: change once joins are supported
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
index 719970a..5edaead 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
@@ -18,7 +18,6 @@
package org.apache.phoenix.compile;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.STARTKEY_OFFSET;
import java.io.IOException;
import java.util.ArrayList;
@@ -204,7 +203,7 @@ public class ScanRanges {
scan.setStopRow(scanRange.getUpperRange());
}
- private static byte[] prefixKey(byte[] key, int keyOffset, byte[] prefixKey, int prefixKeyOffset) {
+ public static byte[] prefixKey(byte[] key, int keyOffset, byte[] prefixKey, int prefixKeyOffset) {
if (key.length > 0) {
byte[] newKey = new byte[key.length + prefixKeyOffset];
int totalKeyOffset = keyOffset + prefixKeyOffset;
@@ -213,7 +212,7 @@ public class ScanRanges {
}
System.arraycopy(key, keyOffset, newKey, totalKeyOffset, key.length - keyOffset);
return newKey;
- }
+ }
return key;
}
@@ -229,7 +228,7 @@ public class ScanRanges {
return temp;
}
- private static byte[] stripPrefix(byte[] key, int keyOffset) {
+ public static byte[] stripPrefix(byte[] key, int keyOffset) {
if (key.length == 0) {
return key;
}
@@ -388,10 +387,6 @@ public class ScanRanges {
newScan.setAttribute(SCAN_ACTUAL_START_ROW, scanStartKey);
newScan.setStartRow(scanStartKey);
newScan.setStopRow(scanStopKey);
- if(keyOffset > 0) {
- newScan.setAttribute(STARTKEY_OFFSET, Bytes.toBytes(keyOffset));
- }
-
return newScan;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
index 54b4eb7..5e0977b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
@@ -115,6 +115,11 @@ public class TraceQueryPlan implements QueryPlan {
}
@Override
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
+ return iterator(scanGrouper);
+ }
+
+ @Override
public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
final PhoenixConnection conn = stmt.getConnection();
if (conn.getTraceScope() == null && !traceStatement.isTraceOn()) {