You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ya...@apache.org on 2021/01/11 06:43:27 UTC
[phoenix] branch 4.16 updated: PHOENIX-6211 Paged scan filters
This is an automated email from the ASF dual-hosted git repository.
yanxinyi pushed a commit to branch 4.16
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.16 by this push:
new 9dd24ff PHOENIX-6211 Paged scan filters
9dd24ff is described below
commit 9dd24ffd5c4258ee284cbf690b58b018bc3199de
Author: Kadir Ozdemir <ko...@salesforce.com>
AuthorDate: Sun Dec 6 18:48:45 2020 -0800
PHOENIX-6211 Paged scan filters
---
.../apache/phoenix/end2end/SpillableGroupByIT.java | 2 +-
.../java/org/apache/phoenix/end2end/ViewTTLIT.java | 285 +++++++++++----------
.../phoenix/coprocessor/BaseRegionScanner.java | 6 +
.../coprocessor/BaseScannerRegionObserver.java | 22 +-
.../coprocessor/GlobalIndexRegionScanner.java | 47 +++-
.../GroupedAggregateRegionObserver.java | 50 ++--
.../phoenix/coprocessor/HashJoinRegionScanner.java | 29 ++-
.../coprocessor/IndexRebuildRegionScanner.java | 4 +
.../coprocessor/IndexRepairRegionScanner.java | 4 +
.../phoenix/coprocessor/IndexerRegionScanner.java | 6 +
.../phoenix/coprocessor/PagedRegionScanner.java | 104 ++++++++
.../coprocessor/PhoenixTTLRegionObserver.java | 62 +++--
.../UngroupedAggregateRegionObserver.java | 28 +-
.../UngroupedAggregateRegionScanner.java | 29 +--
.../org/apache/phoenix/filter/DelegateFilter.java | 11 +
.../org/apache/phoenix/filter/PagedFilter.java | 267 +++++++++++++++++++
.../apache/phoenix/index/GlobalIndexChecker.java | 67 +++--
.../iterate/NonAggregateRegionScannerFactory.java | 20 +-
.../phoenix/iterate/OffsetResultIterator.java | 16 +-
.../phoenix/iterate/OrderedResultIterator.java | 52 +++-
.../phoenix/iterate/RegionScannerFactory.java | 29 ++-
.../iterate/RegionScannerResultIterator.java | 7 +
.../phoenix/iterate/ScanningResultIterator.java | 2 +-
.../phoenix/iterate/TableResultIterator.java | 14 +-
.../org/apache/phoenix/query/QueryServices.java | 5 +-
.../apache/phoenix/query/QueryServicesOptions.java | 2 -
.../tuple/EncodedColumnQualiferCellsList.java | 7 +
.../apache/phoenix/schema/tuple/ResultTuple.java | 1 -
.../java/org/apache/phoenix/util/ScanUtil.java | 91 ++++++-
.../java/org/apache/phoenix/query/BaseTest.java | 7 +-
30 files changed, 990 insertions(+), 286 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java
index 1fd8a65..eb379a6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java
@@ -82,7 +82,7 @@ public class SpillableGroupByIT extends BaseOwnClusterIT {
props.put(QueryServices.STATS_COLLECTION_ENABLED, Boolean.toString(false));
props.put(QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB, Boolean.TRUE.toString());
props.put(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, Boolean.TRUE.toString());
- props.put(QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS, Long.toString(1000));
+ props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(60000));
// Must update config before starting server
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
index c9fb506..1fd6395 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
@@ -54,10 +54,12 @@ import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.TableOptions;
import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.TenantViewIndexOptions;
import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.TenantViewOptions;
import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -760,7 +762,6 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
GlobalViewIndexOptions
globalViewIndexOptions =
SchemaBuilder.GlobalViewIndexOptions.withDefaults();
- globalViewIndexOptions.setLocal(false);
TenantViewOptions tenantViewOptions = new TenantViewOptions();
tenantViewOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
@@ -776,65 +777,69 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
testCaseWhenAllCFMatchAndAllDefault
.setTenantViewCFs(Lists.newArrayList((String) null, null, null, null));
- // Define the test schema.
- final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
- schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions)
- .withGlobalViewIndexOptions(globalViewIndexOptions)
- .withTenantViewOptions(tenantViewOptions)
- .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).build();
+ for (boolean isGlobalIndexLocal : Lists.newArrayList(true, false)) {
+ globalViewIndexOptions.setLocal(isGlobalIndexLocal);
- // Define the test data.
- final List<String> outerCol4s = Lists.newArrayList();
- DataSupplier dataSupplier = new DataSupplier() {
- String col4ForWhereClause;
+ // Define the test schema.
+ final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+ schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions)
+ .withGlobalViewIndexOptions(globalViewIndexOptions)
+ .withTenantViewOptions(tenantViewOptions)
+ .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).build();
- @Override public List<Object> getValues(int rowIndex) {
- Random rnd = new Random();
- String id = String.format(ID_FMT, rowIndex);
- String zid = String.format(ZID_FMT, rowIndex);
- String col4 = String.format(COL4_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+ // Define the test data.
+ final List<String> outerCol4s = Lists.newArrayList();
+ DataSupplier dataSupplier = new DataSupplier() {
+ String col4ForWhereClause;
- // Store the col4 data to be used later in a where clause
- outerCol4s.add(col4);
- String col5 = String.format(COL5_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
- String col6 = String.format(COL6_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
- String col7 = String.format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
- String col8 = String.format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
- String col9 = String.format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+ @Override public List<Object> getValues(int rowIndex) {
+ Random rnd = new Random();
+ String id = String.format(ID_FMT, rowIndex);
+ String zid = String.format(ZID_FMT, rowIndex);
+ String col4 = String.format(COL4_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
- return Lists
- .newArrayList(new Object[] { id, zid, col4, col5, col6, col7, col8, col9 });
- }
- };
+ // Store the col4 data to be used later in a where clause
+ outerCol4s.add(col4);
+ String col5 = String.format(COL5_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+ String col6 = String.format(COL6_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+ String col7 = String.format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+ String col8 = String.format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+ String col9 = String.format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
- // Create a test data reader/writer for the above schema.
- DataWriter dataWriter = new BasicDataWriter();
- DataReader dataReader = new BasicDataReader();
+ return Lists
+ .newArrayList(new Object[] { id, zid, col4, col5, col6, col7, col8, col9 });
+ }
+ };
- List<String> columns =
- Lists.newArrayList("ID", "ZID", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9");
- List<String> rowKeyColumns = Lists.newArrayList("COL6");
- String tenantConnectUrl =
- getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId();
- try (Connection writeConnection = DriverManager.getConnection(tenantConnectUrl)) {
- writeConnection.setAutoCommit(true);
- dataWriter.setConnection(writeConnection);
- dataWriter.setDataSupplier(dataSupplier);
- dataWriter.setUpsertColumns(columns);
- dataWriter.setRowKeyColumns(rowKeyColumns);
- dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+ // Create a test data reader/writer for the above schema.
+ DataWriter dataWriter = new BasicDataWriter();
+ DataReader dataReader = new BasicDataReader();
- // Upsert data for validation
- upsertData(dataWriter, DEFAULT_NUM_ROWS);
+ List<String> columns =
+ Lists.newArrayList("ID", "ZID", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9");
+ List<String> rowKeyColumns = Lists.newArrayList("COL6");
+ String tenantConnectUrl =
+ getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId();
+ try (Connection writeConnection = DriverManager.getConnection(tenantConnectUrl)) {
+ writeConnection.setAutoCommit(true);
+ dataWriter.setConnection(writeConnection);
+ dataWriter.setDataSupplier(dataSupplier);
+ dataWriter.setUpsertColumns(columns);
+ dataWriter.setRowKeyColumns(rowKeyColumns);
+ dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
- dataReader.setValidationColumns(rowKeyColumns);
- dataReader.setRowKeyColumns(rowKeyColumns);
- dataReader.setDML(String.format("SELECT col6 from %s where col4 = '%s'",
- schemaBuilder.getEntityTenantViewName(), outerCol4s.get(1)));
- dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+ // Upsert data for validation
+ upsertData(dataWriter, DEFAULT_NUM_ROWS);
- // Validate data before and after ttl expiration.
- validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder);
+ dataReader.setValidationColumns(rowKeyColumns);
+ dataReader.setRowKeyColumns(rowKeyColumns);
+ dataReader.setDML(String.format("SELECT col6 from %s where col4 = '%s'",
+ schemaBuilder.getEntityTenantViewName(), outerCol4s.get(1)));
+ dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ // Validate data before and after ttl expiration.
+ validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder);
+ }
}
}
@@ -858,7 +863,6 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
GlobalViewIndexOptions globalViewIndexOptions =
SchemaBuilder.GlobalViewIndexOptions.withDefaults();
- globalViewIndexOptions.setLocal(false);
TenantViewOptions tenantViewOptions = new TenantViewOptions();
tenantViewOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
@@ -874,101 +878,105 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
testCaseWhenAllCFMatchAndAllDefault
.setTenantViewCFs(Lists.newArrayList((String) null, null, null, null));
- // Define the test schema.
- final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
- schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions)
- .withGlobalViewIndexOptions(globalViewIndexOptions)
- .withTenantViewOptions(tenantViewOptions)
- .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).build();
-
- // Define the test data.
- final List<String> outerCol4s = Lists.newArrayList();
- DataSupplier dataSupplier = new DataSupplier() {
-
- @Override public List<Object> getValues(int rowIndex) {
- Random rnd = new Random();
- String id = String.format(ID_FMT, rowIndex);
- String zid = String.format(ZID_FMT, rowIndex);
- String col4 = String.format(COL4_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
-
- // Store the col4 data to be used later in a where clause
- outerCol4s.add(col4);
- String col5 = String.format(COL5_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
- String col6 = String.format(COL6_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
- String col7 = String.format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
- String col8 = String.format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
- String col9 = String.format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
-
- return Lists
- .newArrayList(new Object[] { id, zid, col4, col5, col6, col7, col8, col9 });
- }
- };
-
- // Create a test data reader/writer for the above schema.
- DataWriter dataWriter = new BasicDataWriter();
- DataReader dataReader = new BasicDataReader();
-
- List<String> columns =
- Lists.newArrayList("ID", "ZID", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9");
- List<String> nonCoveredColumns =
- Lists.newArrayList("ID", "ZID", "COL5", "COL7", "COL8", "COL9");
- List<String> rowKeyColumns = Lists.newArrayList("COL6");
- String tenantConnectUrl =
- getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId();
- try (Connection writeConnection = DriverManager.getConnection(tenantConnectUrl)) {
- writeConnection.setAutoCommit(true);
- dataWriter.setConnection(writeConnection);
- dataWriter.setDataSupplier(dataSupplier);
- dataWriter.setUpsertColumns(columns);
- dataWriter.setRowKeyColumns(rowKeyColumns);
- dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
-
- // Upsert data for validation
- upsertData(dataWriter, DEFAULT_NUM_ROWS);
-
- dataReader.setValidationColumns(rowKeyColumns);
- dataReader.setRowKeyColumns(rowKeyColumns);
- dataReader.setDML(String.format("SELECT col6 from %s where col4 = '%s'",
- schemaBuilder.getEntityTenantViewName(), outerCol4s.get(1)));
- dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
-
- // Validate data before and after ttl expiration.
- validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder);
+ for (boolean isGlobalIndexLocal : Lists.newArrayList(true, false)) {
+ globalViewIndexOptions.setLocal(isGlobalIndexLocal);
- // Now update the above data but not modifying the covered columns.
- // Ensure/validate that empty columns for the index are still updated.
+ // Define the test schema.
+ final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+ schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions)
+ .withGlobalViewIndexOptions(globalViewIndexOptions)
+ .withTenantViewOptions(tenantViewOptions)
+ .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).build();
- // Data supplier where covered and included (col4 and col6) columns are not updated.
- DataSupplier dataSupplierForNonCoveredColumns = new DataSupplier() {
+ // Define the test data.
+ final List<String> outerCol4s = Lists.newArrayList();
+ DataSupplier dataSupplier = new DataSupplier() {
@Override public List<Object> getValues(int rowIndex) {
Random rnd = new Random();
String id = String.format(ID_FMT, rowIndex);
String zid = String.format(ZID_FMT, rowIndex);
+ String col4 = String.format(COL4_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+
+ // Store the col4 data to be used later in a where clause
+ outerCol4s.add(col4);
String col5 = String.format(COL5_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+ String col6 = String.format(COL6_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
String col7 = String.format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
String col8 = String.format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
String col9 = String.format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
- return Lists.newArrayList(new Object[] { id, zid, col5, col7, col8, col9 });
+ return Lists
+ .newArrayList(new Object[] { id, zid, col4, col5, col6, col7, col8, col9 });
}
};
- // Upsert data for validation with non covered columns
- dataWriter.setDataSupplier(dataSupplierForNonCoveredColumns);
- dataWriter.setUpsertColumns(nonCoveredColumns);
- upsertData(dataWriter, DEFAULT_NUM_ROWS);
+ // Create a test data reader/writer for the above schema.
+ DataWriter dataWriter = new BasicDataWriter();
+ DataReader dataReader = new BasicDataReader();
- List<String> rowKeyColumns1 = Lists.newArrayList("ID", "COL6");
- dataReader.setValidationColumns(rowKeyColumns1);
- dataReader.setRowKeyColumns(rowKeyColumns1);
- dataReader.setDML(String.format("SELECT id, col6 from %s where col4 = '%s'",
- schemaBuilder.getEntityTenantViewName(), outerCol4s.get(1)));
+ List<String> columns =
+ Lists.newArrayList("ID", "ZID", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9");
+ List<String> nonCoveredColumns =
+ Lists.newArrayList("ID", "ZID", "COL5", "COL7", "COL8", "COL9");
+ List<String> rowKeyColumns = Lists.newArrayList("COL6");
+ String tenantConnectUrl =
+ getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId();
+ try (Connection writeConnection = DriverManager.getConnection(tenantConnectUrl)) {
+ writeConnection.setAutoCommit(true);
+ dataWriter.setConnection(writeConnection);
+ dataWriter.setDataSupplier(dataSupplier);
+ dataWriter.setUpsertColumns(columns);
+ dataWriter.setRowKeyColumns(rowKeyColumns);
+ dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
- // Validate data before and after ttl expiration.
- validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder);
+ // Upsert data for validation
+ upsertData(dataWriter, DEFAULT_NUM_ROWS);
+ dataReader.setValidationColumns(rowKeyColumns);
+ dataReader.setRowKeyColumns(rowKeyColumns);
+ dataReader.setDML(String.format("SELECT col6 from %s where col4 = '%s'",
+ schemaBuilder.getEntityTenantViewName(), outerCol4s.get(1)));
+ dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ // Validate data before and after ttl expiration.
+ validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder);
+
+ // Now update the above data but not modifying the covered columns.
+ // Ensure/validate that empty columns for the index are still updated.
+
+ // Data supplier where covered and included (col4 and col6) columns are not updated.
+ DataSupplier dataSupplierForNonCoveredColumns = new DataSupplier() {
+
+ @Override public List<Object> getValues(int rowIndex) {
+ Random rnd = new Random();
+ String id = String.format(ID_FMT, rowIndex);
+ String zid = String.format(ZID_FMT, rowIndex);
+ String col5 = String.format(COL5_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+ String col7 = String.format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+ String col8 = String.format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+ String col9 = String.format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+
+ return Lists.newArrayList(new Object[] { id, zid, col5, col7, col8, col9 });
+ }
+ };
+
+ // Upsert data for validation with non covered columns
+ dataWriter.setDataSupplier(dataSupplierForNonCoveredColumns);
+ dataWriter.setUpsertColumns(nonCoveredColumns);
+ upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+ List<String> rowKeyColumns1 = Lists.newArrayList("ID", "COL6");
+ dataReader.setValidationColumns(rowKeyColumns1);
+ dataReader.setRowKeyColumns(rowKeyColumns1);
+ dataReader.setDML(String.format("SELECT id, col6 from %s where col4 = '%s'",
+ schemaBuilder.getEntityTenantViewName(), outerCol4s.get(1)));
+
+ // Validate data before and after ttl expiration.
+ validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder);
+ }
}
+
}
@@ -2370,6 +2378,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
Properties props = new Properties();
long scnTimestamp = EnvironmentEdgeManager.currentTimeMillis();
props.setProperty("CurrentSCN", Long.toString(scnTimestamp));
+ props.setProperty(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true));
try (Connection readConnection = DriverManager.getConnection(tenantConnectUrl, props)) {
dataReader.setConnection(readConnection);
@@ -2487,7 +2496,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
final Statement statement = deleteConnection.createStatement()) {
deleteConnection.setAutoCommit(true);
- final String deleteIfExpiredStatement = String.format("select * from %s", viewName);
+ final String deleteIfExpiredStatement = String.format("select /*+NO_INDEX*/ count(*) from %s", viewName);
Preconditions.checkNotNull(deleteIfExpiredStatement);
final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
@@ -2506,6 +2515,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyColumnName);
scan.setAttribute(BaseScannerRegionObserver.DELETE_PHOENIX_TTL_EXPIRED, PDataType.TRUE_BYTES);
scan.setAttribute(BaseScannerRegionObserver.PHOENIX_TTL, Bytes.toBytes(Long.valueOf(table.getPhoenixTTL())));
+
PhoenixResultSet
rs = pstmt.newResultSet(queryPlan.iterator(), queryPlan.getProjector(), queryPlan.getContext());
while (rs.next());
@@ -2527,7 +2537,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
final Statement statement = deleteConnection.createStatement()) {
deleteConnection.setAutoCommit(true);
- final String deleteIfExpiredStatement = String.format("select * from %s", indexName);
+ final String deleteIfExpiredStatement = String.format("select count(*) from %s", indexName);
Preconditions.checkNotNull(deleteIfExpiredStatement);
final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
@@ -2597,4 +2607,19 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
return testCases;
}
+
+ private void runValidations(long phoenixTTL,
+ com.google.common.collect.Table<String, String, Object> table,
+ DataReader dataReader, SchemaBuilder schemaBuilder)
+ throws Exception {
+
+ //Insert for the first time and validate them.
+ validateExpiredRowsAreNotReturnedUsingData(phoenixTTL, table,
+ dataReader, schemaBuilder);
+
+ // Update the above rows and validate the same.
+ validateExpiredRowsAreNotReturnedUsingData(phoenixTTL, table,
+ dataReader, schemaBuilder);
+
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
index 945c1c4..5c54854 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
@@ -22,6 +22,8 @@ import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
@@ -58,4 +60,8 @@ public abstract class BaseRegionScanner extends DelegateRegionScanner {
public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
throw new IOException("NextRaw with scannerContext should not be called in Phoenix environment");
}
+
+ public RegionScanner getNewRegionScanner(Scan scan) throws IOException {
+ return ((BaseRegionScanner)delegate).getNewRegionScanner(scan);
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index e028e41..772d1c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.phoenix.execute.TupleProjector;
+import org.apache.phoenix.filter.PagedFilter;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.iterate.NonAggregateRegionScannerFactory;
@@ -62,6 +63,8 @@ import org.apache.phoenix.util.TransactionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForFilter;
+
abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
private static final Logger LOG = LoggerFactory.getLogger(BaseScannerRegionObserver.class);
@@ -86,10 +89,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
public static final String INDEX_REBUILD_PAGING = "_IndexRebuildPaging";
// The number of index rows to be rebuild in one RPC call
public static final String INDEX_REBUILD_PAGE_ROWS = "_IndexRebuildPageRows";
- public static final String SERVER_PAGING = "_ServerPaging";
- // The number of rows to be scanned in one RPC call
- public static final String AGGREGATE_PAGE_SIZE_IN_MS = "_AggregatePageSizeInMs";
-
+ public static final String SERVER_PAGE_SIZE_MS = "_ServerPageSizeMs";
// Index verification type done by the index tool
public static final String INDEX_REBUILD_VERIFY_TYPE = "_IndexRebuildVerifyType";
public static final String INDEX_RETRY_VERIFY = "_IndexRetryVerify";
@@ -190,7 +190,6 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
return this.getClass().getName();
}
-
private static void throwIfScanOutOfRegion(Scan scan, Region region) throws DoNotRetryIOException {
boolean isLocalIndex = ScanUtil.isLocalIndex(scan);
byte[] lowerInclusiveScanKey = scan.getStartRow();
@@ -254,6 +253,12 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
// last possible moment. You need to swap the start/stop and make the
// start exclusive and the stop inclusive.
ScanUtil.setupReverseScan(scan);
+ if (!(scan.getFilter() instanceof PagedFilter)) {
+ byte[] pageSizeMsBytes = scan.getAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS);
+ if (pageSizeMsBytes != null) {
+ scan.setFilter(new PagedFilter(scan.getFilter(), getPageSizeMsForFilter(scan)));
+ }
+ }
}
return s;
}
@@ -355,11 +360,14 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
public final RegionScanner postScannerOpen(
final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan,
final RegionScanner s) throws IOException {
- try {
+ try {
if (!isRegionObserverFor(scan)) {
return s;
}
- return new RegionScannerHolder(c, scan, s);
+ // Make sure PageRegionScanner wraps only the lowest region scanner, i.e., HBase region scanner. We assume
+ // here every Phoenix region scanner extends BaseRegionScanner
+ return new RegionScannerHolder(c, scan, s instanceof BaseRegionScanner ? s :
+ new PagedRegionScanner(c.getEnvironment().getRegion(), s, scan));
} catch (Throwable t) {
// If the exception is NotServingRegionException then throw it as
// StaleRegionBoundaryCacheException to handle it by phoenix client other wise hbase
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
index cc2772d..540851a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.Region;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.filter.AllVersionsIndexRebuildFilter;
+import org.apache.phoenix.filter.PagedFilter;
import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
@@ -95,6 +97,7 @@ import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputReposito
import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS;
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
/**
* This is an abstract region scanner which is used to scan index or data table rows locally. From the data table rows,
@@ -1391,6 +1394,35 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
return indexMutations.size();
}
+ static boolean adjustScanFilter(Scan scan) {
+ // For rebuilds we use count (*) as query for regular tables which ends up setting the FirstKeyOnlyFilter on scan
+ // This filter doesn't give us all columns and skips to the next row as soon as it finds 1 col
+ // For rebuilds we need all columns and all versions
+
+ Filter filter = scan.getFilter();
+ if (filter instanceof PagedFilter) {
+ PagedFilter pageFilter = (PagedFilter) filter;
+ Filter delegateFilter = pageFilter.getDelegateFilter();
+ if (delegateFilter == null || delegateFilter instanceof FirstKeyOnlyFilter) {
+ scan.setFilter(null);
+ return true;
+ }
+ if (delegateFilter instanceof FirstKeyOnlyFilter) {
+ pageFilter.setDelegateFilter(null);
+ } else if (delegateFilter != null) {
+ // Override the filter so that we get all versions
+ pageFilter.setDelegateFilter(new AllVersionsIndexRebuildFilter(delegateFilter));
+ }
+ } else if (filter instanceof FirstKeyOnlyFilter) {
+ scan.setFilter(null);
+ return true;
+ } else if (filter != null) {
+ // Override the filter so that we get all versions
+ scan.setFilter(new AllVersionsIndexRebuildFilter(filter));
+ }
+ return false;
+ }
+
protected RegionScanner getLocalScanner() throws IOException {
// override the filter to skip scan and open new scanner
// when lower bound of timerange is passed or newStartKey was populated
@@ -1405,20 +1437,12 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
for (byte[] family : scan.getFamilyMap().keySet()) {
incrScan.addFamily(family);
}
- // For rebuilds we use count (*) as query for regular tables which ends up setting the FKOF on scan
- // This filter doesn't give us all columns and skips to the next row as soon as it finds 1 col
- // For rebuilds we need all columns and all versions
- if (scan.getFilter() instanceof FirstKeyOnlyFilter) {
- incrScan.setFilter(null);
- } else if (scan.getFilter() != null) {
- // Override the filter so that we get all versions
- incrScan.setFilter(new AllVersionsIndexRebuildFilter(scan.getFilter()));
- }
+ adjustScanFilter(incrScan);
if(nextStartKey != null) {
incrScan.setStartRow(nextStartKey);
}
List<KeyRange> keys = new ArrayList<>();
- try(RegionScanner scanner = region.getScanner(incrScan)) {
+ try(RegionScanner scanner = new PagedRegionScanner(region, region.getScanner(incrScan), incrScan)) {
List<Cell> row = new ArrayList<>();
int rowCount = 0;
// collect row keys that have been modified in the given time-range
@@ -1427,6 +1451,9 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
ungroupedAggregateRegionObserver.checkForRegionClosing();
hasMoreIncr = scanner.nextRaw(row);
if (!row.isEmpty()) {
+ if (isDummy(row)) {
+ continue;
+ }
keys.add(PVarbinary.INSTANCE.getKeyRange(CellUtil.cloneRow(row.get(0))));
rowCount++;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index cf2a094..5b54601 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -22,10 +22,11 @@ import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
import static org.apache.phoenix.query.QueryServices.GROUPBY_ESTIMATED_DISTINCT_VALUES_ATTRIB;
import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB;
-import static org.apache.phoenix.query.QueryServices.GROUPED_AGGREGATE_PAGE_SIZE_IN_MS;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_GROUPBY_ESTIMATED_DISTINCT_VALUES;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_GROUPBY_SPILLABLE;
import static org.apache.phoenix.util.ScanUtil.getDummyResult;
+import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -64,7 +65,6 @@ import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
@@ -163,7 +163,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
if (j != null) {
innerScanner =
- new HashJoinRegionScanner(innerScanner, p, j, ScanUtil.getTenantId(scan),
+ new HashJoinRegionScanner(innerScanner, scan, p, j, ScanUtil.getTenantId(scan),
c.getEnvironment(), useQualifierAsIndex, useNewValueColumnQualifier);
}
@@ -172,22 +172,12 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
if (limitBytes != null) {
limit = PInteger.INSTANCE.getCodec().decodeInt(limitBytes, 0, SortOrder.getDefault());
}
- long pageSizeInMs = Long.MAX_VALUE;
- if (scan.getAttribute(BaseScannerRegionObserver.SERVER_PAGING) != null) {
- byte[] pageSizeFromScan =
- scan.getAttribute(BaseScannerRegionObserver.AGGREGATE_PAGE_SIZE_IN_MS);
- if (pageSizeFromScan != null) {
- pageSizeInMs = Bytes.toLong(pageSizeFromScan);
- } else {
- pageSizeInMs = c.getEnvironment().getConfiguration().getLong(GROUPED_AGGREGATE_PAGE_SIZE_IN_MS,
- QueryServicesOptions.DEFAULT_GROUPED_AGGREGATE_PAGE_SIZE_IN_MS);
- }
- }
+ long pageSizeMs = getPageSizeMsForRegionScanner(scan);
if (keyOrdered) { // Optimize by taking advantage that the rows are
// already in the required group by key order
- return new OrderedGroupByRegionScanner(c, scan, innerScanner, expressions, aggregators, limit, pageSizeInMs);
+ return new OrderedGroupByRegionScanner(c, scan, innerScanner, expressions, aggregators, limit, pageSizeMs);
} else { // Otherwse, collect them all up in an in memory map
- return new UnorderedGroupByRegionScanner(c, scan, innerScanner, expressions, aggregators, limit, pageSizeInMs);
+ return new UnorderedGroupByRegionScanner(c, scan, innerScanner, expressions, aggregators, limit, pageSizeMs);
}
}
}
@@ -410,14 +400,14 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
private final ServerAggregators aggregators;
private final long limit;
private final List<Expression> expressions;
- private final long pageSizeInMs;
+ private final long pageSizeMs;
private RegionScanner regionScanner = null;
private final boolean spillableEnabled;
private final GroupByCache groupByCache;
private UnorderedGroupByRegionScanner(final ObserverContext<RegionCoprocessorEnvironment> c,
final Scan scan, final RegionScanner scanner, final List<Expression> expressions,
- final ServerAggregators aggregators, final long limit, final long pageSizeInMs) {
+ final ServerAggregators aggregators, final long limit, final long pageSizeMs) {
super(scanner);
this.region = c.getEnvironment().getRegion();
minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
@@ -425,7 +415,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
this.aggregators = aggregators;
this.limit = limit;
- this.pageSizeInMs = pageSizeInMs;
+ this.pageSizeMs = pageSizeMs;
this.expressions = expressions;
RegionCoprocessorEnvironment env = c.getEnvironment();
Configuration conf = env.getConfiguration();
@@ -477,6 +467,10 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
// ones returned
hasMore = delegate.nextRaw(results);
if (!results.isEmpty()) {
+ if (isDummy(results)) {
+ getDummyResult(resultsToReturn);
+ return true;
+ }
result.setKeyValues(results);
ImmutableBytesPtr key =
TupleUtil.getConcatenatedValue(result, expressions);
@@ -485,8 +479,8 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
aggregators.aggregate(rowAggregators, result);
}
now = EnvironmentEdgeManager.currentTimeMillis();
- } while (hasMore && groupByCache.size() < limit && (now - startTime) < pageSizeInMs);
- if (hasMore && groupByCache.size() < limit && (now - startTime) >= pageSizeInMs) {
+ } while (hasMore && groupByCache.size() < limit && (now - startTime) < pageSizeMs);
+ if (hasMore && groupByCache.size() < limit && (now - startTime) >= pageSizeMs) {
// Return a dummy result as we have processed a page worth of rows
// but we are not ready to aggregate
getDummyResult(resultsToReturn);
@@ -529,13 +523,13 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
private final long limit;
private Aggregator[] rowAggregators;
private final List<Expression> expressions;
- private final long pageSizeInMs;
+ private final long pageSizeMs;
private long rowCount = 0;
private ImmutableBytesPtr currentKey = null;
private OrderedGroupByRegionScanner(final ObserverContext<RegionCoprocessorEnvironment> c,
final Scan scan, final RegionScanner scanner, final List<Expression> expressions,
- final ServerAggregators aggregators, final long limit, final long pageSizeInMs) {
+ final ServerAggregators aggregators, final long limit, final long pageSizeMs) {
super(scanner);
this.scan = scan;
this.region = c.getEnvironment().getRegion();
@@ -545,7 +539,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
this.aggregators = aggregators;
rowAggregators = aggregators.getAggregators();
this.limit = limit;
- this.pageSizeInMs = pageSizeInMs;
+ this.pageSizeMs = pageSizeMs;
this.expressions = expressions;
}
@@ -579,6 +573,10 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
// ones returned
hasMore = delegate.nextRaw(kvs);
if (!kvs.isEmpty()) {
+ if (isDummy(kvs)) {
+ getDummyResult(results);
+ return true;
+ }
result.setKeyValues(kvs);
key = TupleUtil.getConcatenatedValue(result, expressions);
aggBoundary = currentKey != null && currentKey.compareTo(key) != 0;
@@ -598,12 +596,12 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
// Do rowCount + 1 b/c we don't have to wait for a complete
// row in the case of a DISTINCT with a LIMIT
now = EnvironmentEdgeManager.currentTimeMillis();
- } while (hasMore && !aggBoundary && !atLimit && (now - startTime) < pageSizeInMs);
+ } while (hasMore && !aggBoundary && !atLimit && (now - startTime) < pageSizeMs);
}
} finally {
if (acquiredLock) region.closeRegionOperation();
}
- if (hasMore && !aggBoundary && !atLimit && (now - startTime) >= pageSizeInMs) {
+ if (hasMore && !aggBoundary && !atLimit && (now - startTime) >= pageSizeMs) {
// Return a dummy result as we have processed a page worth of rows
// but we are not ready to aggregate
getDummyResult(results);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index 6676a1c..108f767 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -26,9 +26,11 @@ import java.util.Queue;
import java.util.Set;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
@@ -52,9 +54,14 @@ import org.apache.phoenix.schema.tuple.PositionBasedResultTuple;
import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.TupleUtil;
+import static org.apache.phoenix.util.ScanUtil.getDummyResult;
+import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
+
public class HashJoinRegionScanner implements RegionScanner {
private final RegionScanner scanner;
@@ -72,20 +79,21 @@ public class HashJoinRegionScanner implements RegionScanner {
private final boolean useQualifierAsListIndex;
private final boolean useNewValueColumnQualifier;
private final boolean addArrayCell;
+ private final long pageSizeMs;
@SuppressWarnings("unchecked")
- public HashJoinRegionScanner(RegionScanner scanner, TupleProjector projector,
+ public HashJoinRegionScanner(RegionScanner scanner, Scan scan, TupleProjector projector,
HashJoinInfo joinInfo, ImmutableBytesPtr tenantId,
RegionCoprocessorEnvironment env, boolean useQualifierAsIndex,
boolean useNewValueColumnQualifier)
throws IOException {
- this(env, scanner, null, null, projector, joinInfo,
+ this(env, scanner, scan, null, null, projector, joinInfo,
tenantId, useQualifierAsIndex, useNewValueColumnQualifier);
}
@SuppressWarnings("unchecked")
- public HashJoinRegionScanner(RegionCoprocessorEnvironment env, RegionScanner scanner,
+ public HashJoinRegionScanner(RegionCoprocessorEnvironment env, RegionScanner scanner, Scan scan,
final Set<KeyValueColumnExpression> arrayKVRefs,
final Expression[] arrayFuncRefs, TupleProjector projector,
HashJoinInfo joinInfo, ImmutableBytesPtr tenantId,
@@ -137,6 +145,7 @@ public class HashJoinRegionScanner implements RegionScanner {
this.useNewValueColumnQualifier = useNewValueColumnQualifier;
this.addArrayCell = (arrayFuncRefs != null && arrayFuncRefs.length > 0 &&
arrayKVRefs != null && arrayKVRefs.size() > 0);
+ this.pageSizeMs = getPageSizeMsForRegionScanner(scan);
}
private void processResults(List<Cell> result, boolean hasBatchLimit) throws IOException {
@@ -288,9 +297,23 @@ public class HashJoinRegionScanner implements RegionScanner {
@Override
public boolean nextRaw(List<Cell> result) throws IOException {
try {
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
while (shouldAdvance()) {
hasMore = scanner.nextRaw(result);
+ if (isDummy(result)) {
+ return true;
+ }
+ if (result.isEmpty()) {
+ return hasMore;
+ }
+ Cell cell = result.get(0);
processResults(result, false);
+ if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) {
+ byte[] rowKey = CellUtil.cloneRow(cell);
+ result.clear();
+ getDummyResult(rowKey, result);
+ return true;
+ }
result.clear();
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index fa1ca40..f51f07f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -21,6 +21,7 @@ import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
import java.io.IOException;
import java.util.ArrayList;
@@ -313,6 +314,9 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner {
hasMore = localScanner.nextRaw(row);
if (!row.isEmpty()) {
lastCell = row.get(0); // lastCell is any cell from the last visited row
+ if (isDummy(row)) {
+ break;
+ }
Put put = null;
Delete del = null;
for (Cell cell : row) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
index 5e69925..2a67852 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
@@ -22,6 +22,7 @@ import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
import java.io.IOException;
import java.util.ArrayList;
@@ -385,6 +386,9 @@ public class IndexRepairRegionScanner extends GlobalIndexRegionScanner {
hasMore = localScanner.nextRaw(row);
if (!row.isEmpty()) {
lastCell = row.get(0); // lastCell is any cell from the last visited row
+ if (isDummy(row)) {
+ break;
+ }
indexMutationCount += populateIndexMutationFromIndexRow(row, indexMutationMap);
rowCount++;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
index 7f61be3..85a12b1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
@@ -23,13 +23,16 @@ import static org.apache.phoenix.query.QueryConstants.EMPTY_COLUMN_VALUE_BYTES;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -365,6 +368,9 @@ public class IndexerRegionScanner extends GlobalIndexRegionScanner {
hasMore = innerScanner.nextRaw(row);
if (!row.isEmpty()) {
lastCell = row.get(0); // lastCell is any cell from the last visited row
+ if (isDummy(row)) {
+ break;
+ }
Put put = null;
Delete del = null;
for (Cell cell : row) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PagedRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PagedRegionScanner.java
new file mode 100644
index 0000000..471839b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PagedRegionScanner.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.phoenix.filter.PagedFilter;
+
+import static org.apache.phoenix.compat.hbase.CompatUtil.setStartRow;
+import static org.apache.phoenix.util.ScanUtil.getDummyResult;
+import static org.apache.phoenix.util.ScanUtil.getPhoenixPagedFilter;
+
+/**
+ * PagedRegionScanner works with PagedFilter to make sure that the time between two rows returned by the HBase region
+ * scanner should not exceed the configured page size in ms (on PagedFilter). When the page size is reached (because
+ * there are too many cells/rows to be filtered out), PagedFilter stops the HBase region scanner and sets its state
+ * to STOPPED. In this case, the HBase region scanner next() returns false and PagedFilter#isStopped() returns true.
+ * PagedRegionScanner is responsible for detecting PagedFilter has stopped the scanner, and then closing the current
+ * HBase region scanner, starting a new one to resume the scan operation and returning a dummy result to signal to
+ * Phoenix client to resume the scan operation by skipping this dummy result and calling ResultScanner#next().
+ */
+public class PagedRegionScanner extends BaseRegionScanner {
+ protected Region region;
+ protected Scan scan;
+ protected PagedFilter pageFilter;
+ public PagedRegionScanner(Region region, RegionScanner scanner, Scan scan) {
+ super(scanner);
+ this.region = region;
+ this.scan = scan;
+ pageFilter = getPhoenixPagedFilter(scan);
+ if (pageFilter != null) {
+ pageFilter.init();
+ }
+ }
+
+ private boolean next(List<Cell> results, boolean raw) throws IOException {
+ try {
+ boolean hasMore = raw ? delegate.nextRaw(results) : delegate.next(results);
+ if (pageFilter == null) {
+ return hasMore;
+ }
+ if (!hasMore) {
+ // There is no more row from the HBase region scanner. We need to check if PageFilter
+ // has stopped the region scanner
+ if (pageFilter.isStopped()) {
+ // Close the current region scanner, start a new one and return a dummy result
+ delegate.close();
+ byte[] rowKey = pageFilter.getRowKeyAtStop();
+ setStartRow(scan, rowKey, true);
+ delegate = region.getScanner(scan);
+ if (results.isEmpty()) {
+ getDummyResult(rowKey, results);
+ }
+ pageFilter.init();
+ return true;
+ }
+ return false;
+ } else {
+ // We got a row from the HBase scanner within the configured time (i.e., the page size). We need to
+ // start a new page on the next next() call.
+ pageFilter.resetStartTime();
+ return true;
+ }
+ } catch (Exception e) {
+ pageFilter.init();
+ throw e;
+ }
+ }
+
+ @Override
+ public boolean next(List<Cell> results) throws IOException {
+ return next(results, false);
+ }
+
+ @Override
+ public boolean nextRaw(List<Cell> results) throws IOException {
+ return next(results, true);
+ }
+
+ @Override
+ public RegionScanner getNewRegionScanner(Scan scan) throws IOException {
+ return new PagedRegionScanner(region, region.getScanner(scan), scan);
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java
index f6c1b96..d873286 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.coprocessor;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -27,6 +28,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -41,35 +43,35 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
+import static org.apache.phoenix.util.ScanUtil.getDummyResult;
+import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
/**
* Coprocessor that checks whether the row is expired based on the TTL spec.
*/
-public class PhoenixTTLRegionObserver extends BaseRegionObserver {
+public class PhoenixTTLRegionObserver extends BaseScannerRegionObserver {
private static final Logger LOG = LoggerFactory.getLogger(PhoenixTTLRegionObserver.class);
private MetricsPhoenixTTLSource metricSource;
- @Override public void start(CoprocessorEnvironment e) throws IOException {
- super.start(e);
+ @Override
+ public void start(CoprocessorEnvironment e) throws IOException {
metricSource = MetricsPhoenixCoprocessorSourceFactory.getInstance().getPhoenixTTLSource();
}
- @Override public void stop(CoprocessorEnvironment e) throws IOException {
- super.stop(e);
+ @Override
+ protected boolean isRegionObserverFor(Scan scan) {
+ return ScanUtil.isMaskTTLExpiredRows(scan) || ScanUtil.isDeleteTTLExpiredRows(scan);
}
@Override
- public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
- RegionScanner s) throws IOException {
-
- if (!ScanUtil.isMaskTTLExpiredRows(scan) && !ScanUtil.isDeleteTTLExpiredRows(scan)) {
- return s;
- } else if (ScanUtil.isMaskTTLExpiredRows(scan) && ScanUtil.isDeleteTTLExpiredRows(scan)) {
+ protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan,
+ final RegionScanner s) throws IOException, SQLException {
+ if (ScanUtil.isMaskTTLExpiredRows(scan) && ScanUtil.isDeleteTTLExpiredRows(scan)) {
throw new IOException("Both mask and delete expired rows property cannot be set");
} else if (ScanUtil.isMaskTTLExpiredRows(scan)) {
metricSource.incrementMaskExpiredRequestCount();
@@ -99,10 +101,11 @@ public class PhoenixTTLRegionObserver extends BaseRegionObserver {
/**
* A region scanner that checks the TTL expiration of rows
*/
- private static class PhoenixTTLRegionScanner implements RegionScanner {
+ private static class PhoenixTTLRegionScanner extends BaseRegionScanner {
private static final String MASK_PHOENIX_TTL_EXPIRED_REQUEST_ID_ATTR =
"MASK_PHOENIX_TTL_EXPIRED_REQUEST_ID";
+ private final RegionCoprocessorEnvironment env;
private final RegionScanner scanner;
private final Scan scan;
private final byte[] emptyCF;
@@ -119,9 +122,12 @@ public class PhoenixTTLRegionObserver extends BaseRegionObserver {
private long numRowsScanned;
private long numRowsDeleted;
private boolean reported = false;
+ private long pageSizeMs;
public PhoenixTTLRegionScanner(RegionCoprocessorEnvironment env, Scan scan,
RegionScanner scanner) throws IOException {
+ super(scanner);
+ this.env = env;
this.scan = scan;
this.scanner = scanner;
byte[] requestIdBytes = scan.getAttribute(MASK_PHOENIX_TTL_EXPIRED_REQUEST_ID_ATTR);
@@ -145,6 +151,7 @@ public class PhoenixTTLRegionObserver extends BaseRegionObserver {
now = maxTimestamp != HConstants.LATEST_TIMESTAMP ?
maxTimestamp :
EnvironmentEdgeManager.currentTimeMillis();
+ pageSizeMs = getPageSizeMsForRegionScanner(scan);
}
@Override public int getBatch() {
@@ -186,10 +193,6 @@ public class PhoenixTTLRegionObserver extends BaseRegionObserver {
return scanner.getRegionInfo();
}
- @Override public boolean isFilterDone() throws IOException {
- return scanner.isFilterDone();
- }
-
@Override public boolean reseek(byte[] row) throws IOException {
return scanner.reseek(row);
}
@@ -204,17 +207,30 @@ public class PhoenixTTLRegionObserver extends BaseRegionObserver {
private boolean doNext(List<Cell> result, boolean raw) throws IOException {
try {
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
boolean hasMore;
do {
hasMore = raw ? scanner.nextRaw(result) : scanner.next(result);
if (result.isEmpty()) {
break;
}
+ if (isDummy(result)) {
+ return true;
+ }
+
+ /**
+ Note : That both MaskIfExpiredRequest and DeleteIfExpiredRequest cannot be set at the same time.
+ Case : MaskIfExpiredRequest, If row not expired then return.
+ */
numRowsScanned++;
if (maskIfExpired && checkRowNotExpired(result)) {
break;
}
+ /**
+ Case : DeleteIfExpiredRequest, If deleted then return.
+ So that it will count towards the aggregate deleted count.
+ */
if (deleteIfExpired && deleteRowIfExpired(result)) {
numRowsDeleted++;
break;
@@ -226,6 +242,12 @@ public class PhoenixTTLRegionObserver extends BaseRegionObserver {
if (maskIfExpired) {
numRowsExpired++;
}
+ if (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime) >= pageSizeMs) {
+ byte[] rowKey = CellUtil.cloneRow(result.get(0));
+ result.clear();
+ getDummyResult(rowKey, result);
+ return true;
+ }
result.clear();
} while (hasMore);
return hasMore;
@@ -303,5 +325,9 @@ public class PhoenixTTLRegionObserver extends BaseRegionObserver {
return true;
}
+ @Override
+ public RegionScanner getNewRegionScanner(Scan scan) throws IOException {
+ return new PhoenixTTLRegionScanner(env, scan, ((BaseRegionScanner)delegate).getNewRegionScanner(scan));
+ }
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index ed4ea74..cdec974 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.coprocessor;
+import static org.apache.phoenix.coprocessor.GlobalIndexRegionScanner.adjustScanFilter;
import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
@@ -440,7 +441,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
if (j != null) {
- theScanner = new HashJoinRegionScanner(theScanner, p, j, ScanUtil.getTenantId(scan), env, useQualifierAsIndex, useNewValueColumnQualifier);
+ theScanner = new HashJoinRegionScanner(theScanner, scan, p, j, ScanUtil.getTenantId(scan), env, useQualifierAsIndex, useNewValueColumnQualifier);
}
return new UngroupedAggregateRegionScanner(c, theScanner,region, scan, env, this);
}
@@ -624,7 +625,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
}
- private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan,
+ private RegionScanner rebuildIndices(RegionScanner innerScanner, final Region region, final Scan scan,
final RegionCoprocessorEnvironment env) throws IOException {
boolean oldCoproc = region.getTableDesc().hasCoprocessor(Indexer.class.getCanonicalName());
byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE);
@@ -633,29 +634,28 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
if (oldCoproc && verifyType == IndexTool.IndexVerifyType.ONLY) {
return new IndexerRegionScanner(innerScanner, region, scan, env, this);
}
+ RegionScanner scanner;
if (!scan.isRaw()) {
Scan rawScan = new Scan(scan);
rawScan.setRaw(true);
rawScan.setMaxVersions();
rawScan.getFamilyMap().clear();
- // For rebuilds we use count (*) as query for regular tables which ends up setting the FKOF on scan
- // This filter doesn't give us all columns and skips to the next row as soon as it finds 1 col
- // For rebuilds we need all columns and all versions
- if (scan.getFilter() instanceof FirstKeyOnlyFilter) {
- rawScan.setFilter(null);
- } else if (scan.getFilter() != null) {
- // Override the filter so that we get all versions
- rawScan.setFilter(new AllVersionsIndexRebuildFilter(scan.getFilter()));
- }
+ adjustScanFilter(rawScan);
rawScan.setCacheBlocks(false);
for (byte[] family : scan.getFamilyMap().keySet()) {
rawScan.addFamily(family);
}
+ scanner = ((BaseRegionScanner)innerScanner).getNewRegionScanner(rawScan);
innerScanner.close();
- RegionScanner scanner = region.getScanner(rawScan);
- return getRegionScanner(scanner, region, scan, env, oldCoproc);
+ } else {
+ if (adjustScanFilter(scan)) {
+ scanner = ((BaseRegionScanner) innerScanner).getNewRegionScanner(scan);
+ innerScanner.close();
+ } else {
+ scanner = innerScanner;
+ }
}
- return getRegionScanner(innerScanner, region, scan, env, oldCoproc);
+ return getRegionScanner(scanner, region, scan, env, oldCoproc);
}
private RegionScanner collectStats(final RegionScanner innerScanner, StatisticsCollector stats,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
index a96303a..fcee172 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
@@ -32,9 +32,10 @@ import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
import static org.apache.phoenix.query.QueryServices.SOURCE_OPERATION_ATTRIB;
-import static org.apache.phoenix.query.QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS;
import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
import static org.apache.phoenix.util.WALAnnotationUtil.annotateMutation;
+import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
import java.io.IOException;
import java.sql.SQLException;
@@ -118,7 +119,7 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
private static final Logger LOGGER = LoggerFactory.getLogger(UngroupedAggregateRegionScanner.class);
- private long pageSizeInMs = Long.MAX_VALUE;
+ private long pageSizeMs;
private int maxBatchSize = 0;
private final Scan scan;
private final RegionScanner innerScanner;
@@ -169,17 +170,7 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
this.ungroupedAggregateRegionObserver = ungroupedAggregateRegionObserver;
this.innerScanner = innerScanner;
Configuration conf = env.getConfiguration();
- if (scan.getAttribute(BaseScannerRegionObserver.SERVER_PAGING) != null) {
- byte[] pageSizeFromScan =
- scan.getAttribute(BaseScannerRegionObserver.AGGREGATE_PAGE_SIZE_IN_MS);
- if (pageSizeFromScan != null) {
- pageSizeInMs = Bytes.toLong(pageSizeFromScan);
- } else {
- pageSizeInMs =
- conf.getLong(UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS,
- QueryServicesOptions.DEFAULT_UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS);
- }
- }
+ pageSizeMs = getPageSizeMsForRegionScanner(scan);
ts = scan.getTimeRange().getMax();
boolean localIndexScan = ScanUtil.isLocalIndex(scan);
@@ -566,6 +557,13 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
// since this is an indication of whether or not there are more values after the
// ones returned
hasMore = innerScanner.nextRaw(results);
+ if (isDummy(results)) {
+ if (!hasAny) {
+ resultsToReturn.addAll(results);
+ return true;
+ }
+ break;
+ }
if (!results.isEmpty()) {
lastCell = results.get(0);
result.setKeyValues(results);
@@ -606,8 +604,7 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
aggregators.aggregate(rowAggregators, result);
hasAny = true;
}
- } while (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime) < pageSizeInMs);
-
+ } while (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime) < pageSizeMs);
if (!mutations.isEmpty()) {
annotateAndCommit(mutations);
}
@@ -621,7 +618,7 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
} catch (DataExceedsCapacityException e) {
throw new DoNotRetryIOException(e.getMessage(), e);
} catch (Throwable e) {
- LOGGER.error("Exception in UngroupedAggreagteRegionScanner for region "
+ LOGGER.error("Exception in UngroupedAggregateRegionScanner for region "
+ region.getRegionInfo().getRegionNameAsString(), e);
throw e;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/DelegateFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/DelegateFilter.java
index 2402e62..0f7a190 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/DelegateFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/DelegateFilter.java
@@ -97,4 +97,15 @@ public class DelegateFilter extends FilterBase {
public byte[] toByteArray() throws IOException {
return delegate.toByteArray();
}
+
+ @Override
+ public void setReversed(boolean reversed) {
+ delegate.setReversed(reversed);
+ }
+
+ @Override
+ public boolean isReversed() {
+ return delegate.isReversed();
+ }
+
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/PagedFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/PagedFilter.java
new file mode 100644
index 0000000..e46cfa0
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/PagedFilter.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.filter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.io.Writable;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+
+/**
+ * This filter overrides the behavior of delegate so that we do not scan more rows than pageSizeInRows .
+ */
+public class PagedFilter extends FilterBase implements Writable {
+ private enum State {
+ INITIAL, STARTED, TIME_TO_STOP, STOPPED
+ }
+ State state;
+ private long pageSizeMs;
+ private long startTime;
+ private byte[] rowKeyAtStop;
+ private Filter delegate = null;
+
+ public PagedFilter() {
+ init();
+ }
+
+ public PagedFilter(Filter delegate, long pageSizeMs) {
+ init();
+ this.delegate = delegate;
+ this.pageSizeMs = pageSizeMs;
+ }
+
+ public Filter getDelegateFilter() {
+ return delegate;
+ }
+
+ public void setDelegateFilter (Filter delegate) {
+ this.delegate = delegate;
+ }
+
+ public byte[] getRowKeyAtStop() {
+ if (rowKeyAtStop != null) {
+ return Arrays.copyOf(rowKeyAtStop, rowKeyAtStop.length);
+ }
+ return null;
+ }
+
+ public boolean isStopped() {
+ return state == State.STOPPED;
+ }
+
+ public void init() {
+ state = State.INITIAL;
+ rowKeyAtStop = null;
+ }
+
+ public void resetStartTime() {
+ if (state == State.STARTED) {
+ init();
+ }
+ }
+
+ @Override
+ public void reset() throws IOException {
+ if (state == State.INITIAL) {
+ startTime = EnvironmentEdgeManager.currentTimeMillis();
+ state = State.STARTED;
+ } else if (state == State.STARTED && EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) {
+ state = State.TIME_TO_STOP;
+ }
+ if (delegate != null) {
+ delegate.reset();
+ return;
+ }
+ super.reset();
+ }
+
+ @Override
+ public Cell getNextCellHint(Cell currentKV) throws IOException {
+ if (delegate != null) {
+ return delegate.getNextCellHint(currentKV);
+ }
+ return super.getNextCellHint(currentKV);
+ }
+
+ public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
+ if (state == State.TIME_TO_STOP) {
+ if (rowKeyAtStop == null) {
+ rowKeyAtStop = new byte[length];
+ Bytes.putBytes(rowKeyAtStop, 0, buffer, offset, length);
+ }
+ return true;
+ }
+ if (delegate != null) {
+ return delegate.filterRowKey(buffer, offset, length);
+ }
+ return super.filterRowKey(buffer, offset, length);
+ }
+
+ @Override
+ public boolean filterAllRemaining() throws IOException {
+ if (state == State.TIME_TO_STOP && rowKeyAtStop != null) {
+ state = State.STOPPED;
+ return true;
+ }
+ if (delegate != null) {
+ return delegate.filterAllRemaining();
+ }
+ return super.filterAllRemaining();
+ }
+
+ @Override
+ public boolean hasFilterRow() {
+ return true;
+ }
+
+ @Override
+ public boolean filterRow() throws IOException {
+ if (state == State.TIME_TO_STOP) {
+ return true;
+ }
+ if (delegate != null) {
+ return delegate.filterRow();
+ }
+ return super.filterRow();
+ }
+
+ @Override
+ public Cell transformCell(Cell v) throws IOException {
+ if (delegate != null) {
+ return delegate.transformCell(v);
+ }
+ return super.transformCell(v);
+ }
+
+ @Override
+ public void filterRowCells(List<Cell> kvs) throws IOException {
+ if (delegate != null) {
+ delegate.filterRowCells(kvs);
+ return;
+ }
+ super.filterRowCells(kvs);
+ }
+
+ @Override
+ public void setReversed(boolean reversed) {
+ if (delegate != null) {
+ delegate.setReversed(reversed);
+ }
+ super.setReversed(reversed);
+ }
+
+ @Override
+ public boolean isReversed() {
+ if (delegate != null) {
+ return delegate.isReversed();
+ }
+ return super.isReversed();
+ }
+
+ @Override
+ public boolean isFamilyEssential(byte[] name) throws IOException {
+ if (delegate != null) {
+ return delegate.isFamilyEssential(name);
+ }
+ return super.isFamilyEssential(name);
+ }
+
+ @Override
+ public ReturnCode filterKeyValue(Cell v) throws IOException {
+ if (delegate != null) {
+ return delegate.filterKeyValue(v);
+ }
+ return ReturnCode.INCLUDE;
+ }
+
+ public static PagedFilter parseFrom(final byte [] pbBytes) throws DeserializationException {
+ try {
+ return (PagedFilter) Writables.getWritable(pbBytes, new PagedFilter());
+ } catch (IOException e) {
+ throw new DeserializationException(e);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(pageSizeMs);
+ if (delegate != null) {
+ out.writeUTF(delegate.getClass().getName());
+ byte[] b = delegate.toByteArray();
+ out.writeInt(b.length);
+ out.write(b);
+ } else {
+ out.writeUTF("");
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ pageSizeMs = in.readLong();
+ String className = in.readUTF();
+ if (className.length() == 0) {
+ return;
+ }
+ Class cls = null;
+ try {
+ cls = Class.forName(className);
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ throw new DoNotRetryIOException(e);
+ }
+
+ Method m = null;
+ try {
+ m = cls.getDeclaredMethod("parseFrom", byte[].class);
+ } catch (NoSuchMethodException e) {
+ e.printStackTrace();
+ throw new DoNotRetryIOException(e);
+ }
+ int length = in.readInt();
+ byte[] b = new byte[length];
+ in.readFully(b);
+ try {
+ delegate = (Filter) m.invoke(null, b);
+ } catch (IllegalAccessException e) {
+ e.printStackTrace();
+ throw new DoNotRetryIOException(e);
+ } catch (InvocationTargetException e) {
+ e.printStackTrace();
+ throw new DoNotRetryIOException(e);
+ }
+ }
+
+ @Override
+ public byte[] toByteArray() throws IOException {
+ return Writables.getBytes(this);
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index 12592c0..7720ae6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -26,8 +26,12 @@ import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.PHYSICAL_
import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
import static org.apache.phoenix.index.IndexMaintainer.getIndexMaintainer;
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+import static org.apache.phoenix.util.ScanUtil.getDummyResult;
+import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
import java.io.IOException;
+import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
@@ -43,7 +47,6 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.Filter;
@@ -54,7 +57,9 @@ import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.BaseRegionScanner;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.filter.PagedFilter;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.metrics.GlobalIndexCheckerSource;
import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
@@ -94,7 +99,7 @@ import org.slf4j.LoggerFactory;
* the verified version that is masked by the unverified version(s).
*
*/
-public class GlobalIndexChecker extends BaseRegionObserver {
+public class GlobalIndexChecker extends BaseScannerRegionObserver {
private static final Logger LOG = LoggerFactory.getLogger(GlobalIndexChecker.class);
private HTableFactory hTableFactory;
private GlobalIndexCheckerSource metricsSource;
@@ -119,13 +124,11 @@ public class GlobalIndexChecker extends BaseRegionObserver {
* An instance of this class is created for each scanner on an index
* and used to verify individual rows and rebuild them if they are not valid
*/
- private class GlobalIndexScanner implements RegionScanner {
+ private class GlobalIndexScanner extends BaseRegionScanner {
private RegionScanner scanner;
- private RegionScanner deleteRowScanner;
private long ageThreshold;
private Scan scan;
private Scan indexScan;
- private Scan deleteRowScan;
private Scan singleRowIndexScan;
private Scan buildIndexScan = null;
private Table dataHTable = null;
@@ -143,11 +146,13 @@ public class GlobalIndexChecker extends BaseRegionObserver {
private boolean restartScanDueToPageFilterRemoval = false;
private boolean hasMore;
private String indexName;
+ private long pageSizeMs;
public GlobalIndexScanner(RegionCoprocessorEnvironment env,
Scan scan,
RegionScanner scanner,
GlobalIndexCheckerSource metricsSource) throws IOException {
+ super(scanner);
this.env = env;
this.scan = scan;
this.scanner = scanner;
@@ -171,6 +176,7 @@ public class GlobalIndexChecker extends BaseRegionObserver {
"repairIndexRows: IndexMaintainer is not included in scan attributes for " +
region.getRegionInfo().getTable().getNameAsString());
}
+ pageSizeMs = getPageSizeMsForRegionScanner(scan);
}
@Override
@@ -185,6 +191,7 @@ public class GlobalIndexChecker extends BaseRegionObserver {
public boolean next(List<Cell> result, boolean raw) throws IOException {
try {
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
do {
if (raw) {
hasMore = scanner.nextRaw(result);
@@ -194,9 +201,19 @@ public class GlobalIndexChecker extends BaseRegionObserver {
if (result.isEmpty()) {
break;
}
+ if (isDummy(result)) {
+ return true;
+ }
+ Cell cell = result.get(0);
if (verifyRowAndRepairIfNecessary(result)) {
break;
}
+ if (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime) >= pageSizeMs) {
+ byte[] rowKey = CellUtil.cloneRow(cell);
+ result.clear();
+ getDummyResult(rowKey, result);
+ return true;
+ }
// skip this row as it is invalid
// if there is no more row, then result will be an empty list
} while (hasMore);
@@ -245,11 +262,6 @@ public class GlobalIndexChecker extends BaseRegionObserver {
}
@Override
- public boolean isFilterDone() throws IOException {
- return scanner.isFilterDone();
- }
-
- @Override
public boolean reseek(byte[] row) throws IOException {
return scanner.reseek(row);
}
@@ -297,6 +309,12 @@ public class GlobalIndexChecker extends BaseRegionObserver {
private PageFilter removePageFilter(Scan scan) {
Filter filter = scan.getFilter();
if (filter != null) {
+ if (filter instanceof PagedFilter) {
+ filter = ((PagedFilter) filter).getDelegateFilter();
+ if (filter == null) {
+ return null;
+ }
+ }
if (filter instanceof PageFilter) {
scan.setFilter(null);
return (PageFilter) filter;
@@ -316,7 +334,6 @@ public class GlobalIndexChecker extends BaseRegionObserver {
}
buildIndexScan = new Scan();
indexScan = new Scan(scan);
- deleteRowScan = new Scan();
singleRowIndexScan = new Scan(scan);
byte[] dataTableName = scan.getAttribute(PHYSICAL_DATA_TABLE_NAME);
dataHTable = hTableFactory.getTable(new ImmutableBytesPtr(dataTableName));
@@ -361,7 +378,7 @@ public class GlobalIndexChecker extends BaseRegionObserver {
if (restartScanDueToPageFilterRemoval) {
scanner.close();
setStartRow(indexScan, indexRowKey, false);
- scanner = region.getScanner(indexScan);
+ scanner = ((BaseRegionScanner)delegate).getNewRegionScanner(indexScan);
hasMore = true;
// Set restartScanDueToPageFilterRemoval to false as we do not restart the scan unnecessarily next time
restartScanDueToPageFilterRemoval = false;
@@ -379,7 +396,7 @@ public class GlobalIndexChecker extends BaseRegionObserver {
deleteRowIfAgedEnough(indexRowKey, ts, false);
// Open a new scanner starting from the row after the current row
setStartRow(indexScan, indexRowKey, false);
- scanner = region.getScanner(indexScan);
+ scanner = ((BaseRegionScanner)delegate).getNewRegionScanner(indexScan);
hasMore = true;
// Skip this unverified row (i.e., do not return it to the client). Just retuning empty row is
// sufficient to do that
@@ -389,12 +406,15 @@ public class GlobalIndexChecker extends BaseRegionObserver {
// code == RebuildReturnCode.INDEX_ROW_EXISTS.getValue()
// Open a new scanner starting from the current row
setStartRow(indexScan, indexRowKey, true);
- scanner = region.getScanner(indexScan);
+ scanner = ((BaseRegionScanner)delegate).getNewRegionScanner(indexScan);
hasMore = scanner.next(row);
if (row.isEmpty()) {
// This means the index row has been deleted before opening the new scanner.
return;
}
+ if (isDummy(row)) {
+ return;
+ }
// Check if the index row still exist after rebuild
if (Bytes.compareTo(row.get(0).getRowArray(), row.get(0).getRowOffset(), row.get(0).getRowLength(),
indexRowKey, 0, indexRowKey.length) != 0) {
@@ -406,7 +426,7 @@ public class GlobalIndexChecker extends BaseRegionObserver {
// The row is "unverified". Rewind the scanner and let the row be scanned again
// so that it can be repaired
scanner.close();
- scanner = region.getScanner(indexScan);
+ scanner =((BaseRegionScanner)delegate).getNewRegionScanner(indexScan);
hasMore = true;
row.clear();
return;
@@ -430,7 +450,7 @@ public class GlobalIndexChecker extends BaseRegionObserver {
// can be 1. In that case, we will get only one (i.e., the most recent) version instead of all versions
setSingleRow(singleRowIndexScan, indexRowKey);
singleRowIndexScan.setTimeRange(minTimestamp, ts);
- RegionScanner singleRowScanner = region.getScanner(singleRowIndexScan);
+ RegionScanner singleRowScanner = ((BaseRegionScanner)delegate).getNewRegionScanner(singleRowIndexScan);
row.clear();
singleRowScanner.next(row);
singleRowScanner.close();
@@ -442,6 +462,9 @@ public class GlobalIndexChecker extends BaseRegionObserver {
// possibly by compaction
return;
}
+ if (isDummy(row)) {
+ return;
+ }
if (verifyRowAndRemoveEmptyColumn(row)) {
// The index row status is "verified". This row is good to return to the client. We are done here.
return;
@@ -587,11 +610,13 @@ public class GlobalIndexChecker extends BaseRegionObserver {
}
@Override
- public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
- Scan scan, RegionScanner s) throws IOException {
- if (scan.getAttribute(CHECK_VERIFY_COLUMN) == null) {
- return s;
- }
+ protected boolean isRegionObserverFor(Scan scan) {
+ return scan.getAttribute(CHECK_VERIFY_COLUMN) != null;
+ }
+
+ @Override
+ protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan,
+ final RegionScanner s) throws IOException, SQLException {
return new GlobalIndexScanner(c.getEnvironment(), scan, s, metricsSource);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
index 26114dd..d54a785 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
@@ -19,6 +19,9 @@
package org.apache.phoenix.iterate;
import static org.apache.phoenix.util.EncodedColumnsUtil.getMinMaxQualifiersFromScan;
+import static org.apache.phoenix.util.ScanUtil.getDummyResult;
+import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
@@ -157,13 +160,14 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory {
final ImmutableBytesPtr tenantId = ScanUtil.getTenantId(scan);
if (j != null) {
- innerScanner = new HashJoinRegionScanner(env, innerScanner, arrayKVRefs, arrayFuncRefs,
+ innerScanner = new HashJoinRegionScanner(env, innerScanner, scan, arrayKVRefs, arrayFuncRefs,
p, j, tenantId, useQualifierAsIndex,
useNewValueColumnQualifier);
}
if (scanOffset != null) {
innerScanner = getOffsetScanner(innerScanner, new OffsetResultIterator(
- new RegionScannerResultIterator(innerScanner, getMinMaxQualifiersFromScan(scan), encodingScheme), scanOffset),
+ new RegionScannerResultIterator(innerScanner, getMinMaxQualifiersFromScan(scan), encodingScheme),
+ scanOffset, getPageSizeMsForRegionScanner(scan)),
scan.getAttribute(QueryConstants.LAST_SCAN) != null);
}
boolean spoolingEnabled =
@@ -219,7 +223,7 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory {
PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
ResultIterator inner = new RegionScannerResultIterator(s, EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan), encodingScheme);
return new OrderedResultIterator(inner, orderByExpressions, spoolingEnabled,
- thresholdBytes, limit >= 0 ? limit : null, null, estimatedRowSize);
+ thresholdBytes, limit >= 0 ? limit : null, null, estimatedRowSize, getPageSizeMsForRegionScanner(scan));
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
@@ -373,11 +377,13 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory {
if (isFilterDone()) {
return false;
}
-
- for (int i = 0; i < tuple.size(); i++) {
- results.add(tuple.getValue(i));
+ if (isDummy(tuple)) {
+ getDummyResult(results);
+ } else {
+ for (int i = 0; i < tuple.size(); i++) {
+ results.add(tuple.getValue(i));
+ }
}
-
tuple = iterator.next();
return !isFilterDone();
} catch (Throwable t) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
index 5c5a6d3..3eecfc8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
@@ -23,6 +23,10 @@ import java.util.List;
import org.apache.phoenix.compile.ExplainPlanAttributes
.ExplainPlanAttributesBuilder;
import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+
+import static org.apache.phoenix.util.ScanUtil.getDummyResult;
+import static org.apache.phoenix.util.ScanUtil.getDummyTuple;
/**
* Iterates through tuples up to a limit
@@ -32,17 +36,27 @@ import org.apache.phoenix.schema.tuple.Tuple;
public class OffsetResultIterator extends DelegateResultIterator {
private int rowCount;
private int offset;
+ private long pageSizeMs = Long.MAX_VALUE;
public OffsetResultIterator(ResultIterator delegate, Integer offset) {
super(delegate);
this.offset = offset == null ? -1 : offset;
}
+ public OffsetResultIterator(ResultIterator delegate, Integer offset, long pageSizeMs) {
+ this(delegate, offset);
+ this.pageSizeMs = pageSizeMs;
+ }
@Override
public Tuple next() throws SQLException {
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
while (rowCount < offset) {
- if (super.next() == null) { return null; }
+ Tuple tuple = super.next();
+ if (tuple == null) { return null; }
rowCount++;
+ if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) {
+ return getDummyTuple(tuple);
+ }
}
return super.next();
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
index a433759..13d75ea 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
@@ -19,13 +19,14 @@ package org.apache.phoenix.iterate;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkPositionIndex;
+import static org.apache.phoenix.util.ScanUtil.getDummyTuple;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
-import java.util.Queue;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
@@ -36,9 +37,9 @@ import org.apache.phoenix.compile.ExplainPlanAttributes
import org.apache.phoenix.execute.DescVarLengthFastByteComparisons;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.OrderByExpression;
-import org.apache.phoenix.iterate.OrderedResultIterator.ResultEntry;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.SizedUtil;
@@ -145,7 +146,10 @@ public class OrderedResultIterator implements PeekingResultIterator {
private final long estimatedByteSize;
private PeekingResultIterator resultIterator;
+ private boolean resultIteratorReady = false;
+ private Tuple dummyTuple = null;
private long byteSize;
+ private long pageSizeMs;
protected ResultIterator getDelegate() {
return delegate;
@@ -153,7 +157,7 @@ public class OrderedResultIterator implements PeekingResultIterator {
public OrderedResultIterator(ResultIterator delegate, List<OrderByExpression> orderByExpressions,
boolean spoolingEnabled, long thresholdBytes, Integer limit, Integer offset) {
- this(delegate, orderByExpressions, spoolingEnabled, thresholdBytes, limit, offset, 0);
+ this(delegate, orderByExpressions, spoolingEnabled, thresholdBytes, limit, offset, 0, Long.MAX_VALUE);
}
public OrderedResultIterator(ResultIterator delegate, List<OrderByExpression> orderByExpressions,
@@ -162,8 +166,14 @@ public class OrderedResultIterator implements PeekingResultIterator {
}
public OrderedResultIterator(ResultIterator delegate,
+ List<OrderByExpression> orderByExpressions, boolean spoolingEnabled,
+ long thresholdBytes, Integer limit, Integer offset, int estimatedRowSize) {
+ this(delegate, orderByExpressions, spoolingEnabled, thresholdBytes, limit, offset, estimatedRowSize, Long.MAX_VALUE);
+ }
+
+ public OrderedResultIterator(ResultIterator delegate,
List<OrderByExpression> orderByExpressions, boolean spoolingEnabled,
- long thresholdBytes, Integer limit, Integer offset, int estimatedRowSize) {
+ long thresholdBytes, Integer limit, Integer offset, int estimatedRowSize, long pageSizeMs) {
checkArgument(!orderByExpressions.isEmpty());
this.delegate = delegate;
this.orderByExpressions = orderByExpressions;
@@ -187,6 +197,7 @@ public class OrderedResultIterator implements PeekingResultIterator {
assert(limit == null || Long.MAX_VALUE / estimatedEntrySize >= limit + this.offset);
this.estimatedByteSize = limit == null ? 0 : (limit + this.offset) * estimatedEntrySize;
+ this.pageSizeMs = pageSizeMs;
}
public Integer getLimit() {
@@ -244,11 +255,17 @@ public class OrderedResultIterator implements PeekingResultIterator {
@Override
public Tuple next() throws SQLException {
- return getResultIterator().next();
+ getResultIterator();
+ if (!resultIteratorReady) {
+ return dummyTuple;
+ }
+ return resultIterator.next();
}
private PeekingResultIterator getResultIterator() throws SQLException {
- if (resultIterator != null) {
+ if (resultIteratorReady) {
+ // The results have not been ordered yet. When the results are ordered then the result iterator
+ // will be ready to iterate over them
return resultIterator;
}
@@ -256,11 +273,17 @@ public class OrderedResultIterator implements PeekingResultIterator {
List<Expression> expressions = Lists.newArrayList(Collections2.transform(orderByExpressions, TO_EXPRESSION));
final Comparator<ResultEntry> comparator = buildComparator(orderByExpressions);
try{
- final SizeAwareQueue<ResultEntry> queueEntries =
- PhoenixQueues.newResultEntrySortedQueue(comparator, limit, spoolingEnabled,
- thresholdBytes);
- resultIterator = new RecordPeekingResultIterator(queueEntries);
+ if (resultIterator == null) {
+ resultIterator = new RecordPeekingResultIterator(PhoenixQueues.newResultEntrySortedQueue(comparator,
+ limit, spoolingEnabled, thresholdBytes));
+ }
+ final SizeAwareQueue<ResultEntry> queueEntries = ((RecordPeekingResultIterator)resultIterator).getQueueEntries();
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
for (Tuple result = delegate.next(); result != null; result = delegate.next()) {
+ if (isDummy(result)) {
+ dummyTuple = result;
+ return resultIterator;
+ }
int pos = 0;
ImmutableBytesWritable[] sortKeys = new ImmutableBytesWritable[numSortKeys];
for (Expression expression : expressions) {
@@ -270,7 +293,12 @@ public class OrderedResultIterator implements PeekingResultIterator {
sortKeys[pos++] = evaluated && sortKey.getLength() > 0 ? sortKey : null;
}
queueEntries.add(new ResultEntry(sortKeys, result));
+ if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) {
+ dummyTuple = getDummyTuple(result);
+ return resultIterator;
+ }
}
+ resultIteratorReady = true;
this.byteSize = queueEntries.getByteSize();
} catch (IOException e) {
ServerUtil.createIOException(e.getMessage(), e);
@@ -336,6 +364,10 @@ public class OrderedResultIterator implements PeekingResultIterator {
this.queueEntries = queueEntries;
}
+ public SizeAwareQueue<ResultEntry> getQueueEntries() {
+ return queueEntries;
+ }
+
@Override
public Tuple next() throws SQLException {
ResultEntry entry = queueEntries.poll();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index c9d471b..5b587df 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -20,10 +20,16 @@ package org.apache.phoenix.iterate;
import static org.apache.phoenix.coprocessor.ScanRegionObserver.WILDCARD_SCAN_INCLUDES_DYNAMIC_COLUMNS;
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+import static org.apache.phoenix.util.ScanUtil.getDummyResult;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+
+import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
@@ -54,6 +60,7 @@ import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.transaction.PhoenixTransactionContext;
import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;
@@ -118,6 +125,7 @@ public abstract class RegionScannerFactory {
private HRegionInfo regionInfo = env.getRegionInfo();
private byte[] actualStartKey = getActualStartKey();
private boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan);
+ final long pageSizeMs = getPageSizeMsForRegionScanner(scan);
// Get the actual scan start row of local index. This will be used to compare the row
// key of the results less than scan start row when there are references.
@@ -129,7 +137,11 @@ public abstract class RegionScannerFactory {
@Override
public boolean next(List<Cell> results) throws IOException {
try {
- return s.next(results);
+ boolean next = s.next(results);
+ if (isDummy(results)) {
+ return true;
+ }
+ return next;
} catch (Throwable t) {
ServerUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), t);
return false; // impossible
@@ -170,6 +182,9 @@ public abstract class RegionScannerFactory {
public boolean nextRaw(List<Cell> result) throws IOException {
try {
boolean next = s.nextRaw(result);
+ if (isDummy(result)) {
+ return true;
+ }
Cell arrayElementCell = null;
if (result.size() == 0) {
return next;
@@ -182,7 +197,7 @@ public abstract class RegionScannerFactory {
if(actualStartKey!=null) {
next = scanTillScanStartRow(s, arrayKVRefs, arrayFuncRefs, result,
null, arrayElementCell);
- if (result.isEmpty()) {
+ if (result.isEmpty() || isDummy(result)) {
return next;
}
}
@@ -290,8 +305,15 @@ public abstract class RegionScannerFactory {
ScannerContext scannerContext, Cell arrayElementCell) throws IOException {
boolean next = true;
Cell firstCell = result.get(0);
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
while (Bytes.compareTo(firstCell.getRowArray(), firstCell.getRowOffset(),
firstCell.getRowLength(), actualStartKey, 0, actualStartKey.length) < 0) {
+ if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) {
+ byte[] rowKey = CellUtil.cloneRow(result.get(0));
+ result.clear();
+ getDummyResult(rowKey, result);
+ return true;
+ }
result.clear();
if(scannerContext == null) {
next = s.nextRaw(result);
@@ -301,6 +323,9 @@ public abstract class RegionScannerFactory {
if (result.isEmpty()) {
return next;
}
+ if (isDummy(result)) {
+ return true;
+ }
if (arrayFuncRefs != null && arrayFuncRefs.length > 0 && arrayKVRefs.size() > 0) {
int arrayElementCellPosition = replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result);
arrayElementCell = result.get(arrayElementCellPosition);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
index a5a40e2..caf91c4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
@@ -23,16 +23,20 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.ServerUtil;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
+
public class RegionScannerResultIterator extends BaseResultIterator {
private final RegionScanner scanner;
@@ -63,6 +67,9 @@ public class RegionScannerResultIterator extends BaseResultIterator {
if (!hasMore && results.isEmpty()) {
return null;
}
+ if (isDummy(results)) {
+ return new ResultTuple(Result.create(results));
+ }
// We instantiate a new tuple because in all cases currently we hang on to it
// (i.e. to compute and hold onto the TopN).
Tuple tuple = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
index 4cafb88..46cbf96 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
@@ -160,7 +160,7 @@ public class ScanningResultIterator implements ResultIterator {
public Tuple next() throws SQLException {
try {
Result result = scanner.next();
- while (result != null && isDummy(result)) {
+ while (result != null && (result.isEmpty() || isDummy(result))) {
result = scanner.next();
}
if (result == null) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 2ae9223..da5edbd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -26,7 +26,6 @@ import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NO
import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED;
import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.THRESHOLD_NOT_REACHED;
import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.UNINITIALIZED;
-import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
import java.io.IOException;
import java.sql.SQLException;
@@ -37,10 +36,10 @@ import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.GuardedBy;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.AbstractClientScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
@@ -55,6 +54,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.join.HashCacheClient;
import org.apache.phoenix.monitoring.ScanMetricsHolder;
import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.ByteUtil;
@@ -139,7 +139,15 @@ public class TableResultIterator implements ResultIterator {
.getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES);
ScanUtil.setScanAttributesForIndexReadRepair(scan, table, plan.getContext().getConnection());
ScanUtil.setScanAttributesForPhoenixTTL(scan, table, plan.getContext().getConnection());
- scan.setAttribute(BaseScannerRegionObserver.SERVER_PAGING, TRUE_BYTES);
+ long pageSizeMs = plan.getContext().getConnection().getQueryServices().getProps()
+ .getInt(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, -1);
+ if (pageSizeMs == -1) {
+ // Use the half of the HBase RPC timeout value as the the server page size to make sure that the HBase
+ // region server will be able to send a heartbeat message to the client before the client times out
+ pageSizeMs = (long) (plan.getContext().getConnection().getQueryServices().getProps()
+ .getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT) * 0.5);
+ }
+ scan.setAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS, Bytes.toBytes(Long.valueOf(pageSizeMs)));
}
@Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index dab3a96..faade11 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -330,11 +330,10 @@ public interface QueryServices extends SQLCloseable {
public static final String LONG_VIEW_INDEX_ENABLED_ATTRIB = "phoenix.index.longViewIndex.enabled";
// The number of index rows to be rebuild in one RPC call
public static final String INDEX_REBUILD_PAGE_SIZE_IN_ROWS = "phoenix.index.rebuild_page_size_in_rows";
- // The number of rows to be scanned in one RPC call
- public static final String UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS = "phoenix.ungrouped.aggregate_page_size_in_ms";
- public static final String GROUPED_AGGREGATE_PAGE_SIZE_IN_MS = "phoenix.grouped.aggregate_page_size_in_ms";
// Flag indicating that server side masking of ttl expired rows is enabled.
public static final String PHOENIX_TTL_SERVER_SIDE_MASKING_ENABLED = "phoenix.ttl.server_side.masking.enabled";
+ // The time limit on the amount of work to be done in one RPC call
+ public static final String PHOENIX_SERVER_PAGE_SIZE_MS = "phoenix.server.page.size.ms";
// Before 4.15 when we created a view we included the parent table column metadata in the view
// metadata. After PHOENIX-3534 we allow SYSTEM.CATALOG to split and no longer store the parent
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 289e3fc..9c6ff29 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -342,8 +342,6 @@ public class QueryServicesOptions {
public static final long DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS = 7*24*60*60*1000; /* 7 days */
public static final boolean DEFAULT_INDEX_REGION_OBSERVER_ENABLED = true;
public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 32*1024;
- public static final long DEFAULT_UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS = 1000; // 1 second
- public static final long DEFAULT_GROUPED_AGGREGATE_PAGE_SIZE_IN_MS = 1000;
public static final boolean DEFAULT_ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK = false;
public static final boolean DEFAULT_PROPERTY_POLICY_PROVIDER_ENABLED = true;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
index db3647d..54e97a3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.schema.tuple;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE;
import static org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
import java.util.Collection;
import java.util.ConcurrentModificationException;
@@ -137,6 +138,12 @@ public class EncodedColumnQualiferCellsList implements List<Cell> {
if (e == null) {
throw new NullPointerException();
}
+ if (isDummy(e)) {
+ array[0] = e;
+ firstNonNullElementIdx = 0;
+ numNonNullElements = 1;
+ return true;
+ }
int columnQualifier =
encodingScheme.decode(e.getQualifierArray(), e.getQualifierOffset(),
e.getQualifierLength());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java
index 3774837..cd17527 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java
@@ -35,7 +35,6 @@ import org.apache.phoenix.util.KeyValueUtil;
public class ResultTuple extends BaseTuple {
private final Result result;
public static final ResultTuple EMPTY_TUPLE = new ResultTuple(Result.create(Collections.<Cell>emptyList()));
-
public ResultTuple(Result result) {
this.result = result;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 28e58a0..a9c53e3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -49,6 +49,7 @@ import org.apache.phoenix.filter.ColumnProjectionFilter;
import org.apache.phoenix.filter.DistinctPrefixFilter;
import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter;
import org.apache.phoenix.filter.MultiEncodedCQKeyValueComparisonFilter;
+import org.apache.phoenix.filter.PagedFilter;
import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.VersionUtil;
@@ -70,6 +71,8 @@ import org.apache.phoenix.schema.RowKeySchema;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.ValueSchema.Field;
+import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PVarbinary;
import org.slf4j.Logger;
@@ -676,7 +679,7 @@ public class ScanUtil {
// Start/stop row must be swapped if scan is being done in reverse
public static void setupReverseScan(Scan scan) {
- if (isReversed(scan)) {
+ if (isReversed(scan) && !scan.isReversed()) {
byte[] newStartRow = getReversedRow(scan.getStartRow());
byte[] newStopRow = getReversedRow(scan.getStopRow());
scan.setStartRow(newStopRow);
@@ -761,6 +764,12 @@ public class ScanUtil {
if (filter == null) {
return;
}
+ if (filter instanceof PagedFilter) {
+ filter = ((PagedFilter) filter).getDelegateFilter();
+ if (filter == null) {
+ return;
+ }
+ }
if (filter instanceof FilterList) {
FilterList filterList = (FilterList)filter;
for (Filter childFilter : filterList.getFilters()) {
@@ -1206,6 +1215,15 @@ public class ScanUtil {
if (!ScanUtil.isDeleteTTLExpiredRows(scan)) {
scan.setAttribute(BaseScannerRegionObserver.MASK_PHOENIX_TTL_EXPIRED, PDataType.TRUE_BYTES);
}
+ if (ScanUtil.isLocalIndex(scan)) {
+ byte[] actualStartRow = scan.getAttribute(SCAN_ACTUAL_START_ROW) != null ?
+ scan.getAttribute(SCAN_ACTUAL_START_ROW) :
+ HConstants.EMPTY_BYTE_ARRAY;
+ ScanUtil.setLocalIndexAttributes(scan, 0,
+ actualStartRow,
+ HConstants.EMPTY_BYTE_ARRAY,
+ scan.getStartRow(), scan.getStopRow());
+ }
addEmptyColumnToScan(scan, emptyColumnFamilyName, emptyColumnName);
}
}
@@ -1222,22 +1240,85 @@ public class ScanUtil {
getDummyResult(EMPTY_BYTE_ARRAY, result);
}
+ public static Tuple getDummyTuple(byte[] rowKey) {
+ List<Cell> result = new ArrayList<Cell>(1);
+ getDummyResult(rowKey, result);
+ return new ResultTuple(Result.create(result));
+ }
+
+ public static Tuple getDummyTuple() {
+ List<Cell> result = new ArrayList<Cell>(1);
+ getDummyResult(result);
+ return new ResultTuple(Result.create(result));
+ }
+
+ public static Tuple getDummyTuple(Tuple tuple) {
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ tuple.getKey(ptr);
+ return getDummyTuple(ptr.copyBytes());
+ }
+
+ public static boolean isDummy(Cell cell) {
+ return CellUtil.matchingColumn(cell, EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
+ }
+
public static boolean isDummy(Result result) {
- // Check if the result is a dummy result
if (result.rawCells().length != 1) {
return false;
}
Cell cell = result.rawCells()[0];
- return CellUtil.matchingColumn(cell, EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
+ return isDummy(cell);
}
public static boolean isDummy(List<Cell> result) {
- // Check if the result is a dummy result
if (result.size() != 1) {
return false;
}
Cell cell = result.get(0);
- return CellUtil.matchingColumn(cell, EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
+ return isDummy(cell);
+ }
+
+ public static boolean isDummy(Tuple tuple) {
+ if (tuple instanceof ResultTuple) {
+ isDummy(((ResultTuple) tuple).getResult());
+ }
+ return false;
+ }
+
+ public static PagedFilter getPhoenixPagedFilter(Scan scan) {
+ Filter filter = scan.getFilter();
+ if (filter != null && filter instanceof PagedFilter) {
+ PagedFilter pageFilter = (PagedFilter) filter;
+ return pageFilter;
+ }
+ return null;
+ }
+
+ /**
+ *
+ * The server page size expressed in ms is the maximum time we want the Phoenix server code to spend
+ * for each iteration of ResultScanner. For each ResultScanner#next() can be translated into one or more
+ * HBase RegionScanner#next() calls by a Phoenix RegionScanner object in a loop. To ensure that the total
+ * time spent by the Phoenix server code will not exceed the configured page size value, SERVER_PAGE_SIZE_MS,
+ * the loop time in a Phoenix region scanner is limited by 0.6 * SERVER_PAGE_SIZE_MS and
+ * each HBase RegionScanner#next() time which is controlled by PagedFilter is set to 0.3 * SERVER_PAGE_SIZE_MS.
+ *
+ */
+ private static long getPageSizeMs(Scan scan) {
+ long pageSizeMs = Long.MAX_VALUE;
+ byte[] pageSizeMsBytes = scan.getAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS);
+ if (pageSizeMsBytes != null) {
+ pageSizeMs = Bytes.toLong(pageSizeMsBytes);
+ }
+ return pageSizeMs;
+ }
+
+ public static long getPageSizeMsForRegionScanner(Scan scan) {
+ return (long) (getPageSizeMs(scan) * 0.6);
+ }
+
+ public static long getPageSizeMsForFilter(Scan scan) {
+ return (long) (getPageSizeMs(scan) * 0.3);
}
/**
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 05773fb..f326f9b 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -638,11 +638,8 @@ public abstract class BaseTest {
// This results in processing one row at a time in each next operation of the aggregate region
// scanner, i.e., one row pages. In other words, 0ms page allows only one row to be processed
// within one page; 0ms page is equivalent to one-row page
- if (conf.getLong(QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS, 0) == 0) {
- conf.setLong(QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS, 0);
- }
- if (conf.getLong(QueryServices.GROUPED_AGGREGATE_PAGE_SIZE_IN_MS, 0) == 0) {
- conf.setLong(QueryServices.GROUPED_AGGREGATE_PAGE_SIZE_IN_MS, 0);
+ if (conf.getLong(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, 0) == 0) {
+ conf.setLong(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, 0);
}
return conf;
}