You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by tk...@apache.org on 2023/11/30 18:54:00 UTC
(phoenix) branch 5.1 updated: Backport PHOENIX-7024 and PHOENIX-6909 to 5.1 (#1746)
This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.1 by this push:
new 0120f95489 Backport PHOENIX-7024 and PHOENIX-6909 to 5.1 (#1746)
0120f95489 is described below
commit 0120f954891741a3473e8e6cd720ff7042e91ec7
Author: tkhurana <kh...@gmail.com>
AuthorDate: Thu Nov 30 10:53:54 2023 -0800
Backport PHOENIX-7024 and PHOENIX-6909 to 5.1 (#1746)
* PHOENIX-7024 Fix issues in Server Paging
* PHOENIX-6909 Paged rows counter metric
---
.../org/apache/phoenix/end2end/BaseOrderByIT.java | 20 +-
.../org/apache/phoenix/end2end/ServerPagingIT.java | 369 +++++++++++++++++++++
.../phoenix/iterate/PhoenixQueryTimeoutIT.java | 14 +-
.../phoenix/coprocessor/PagedRegionScanner.java | 19 +-
.../UncoveredGlobalIndexRegionScanner.java | 3 +
.../coprocessor/UncoveredIndexRegionScanner.java | 30 +-
.../UncoveredLocalIndexRegionScanner.java | 3 +
.../org/apache/phoenix/filter/PagedFilter.java | 84 ++---
.../phoenix/iterate/OffsetResultIterator.java | 19 +-
.../phoenix/iterate/ScanningResultIterator.java | 14 +-
.../phoenix/monitoring/GlobalClientMetrics.java | 4 +-
.../org/apache/phoenix/monitoring/MetricType.java | 4 +-
.../phoenix/monitoring/ScanMetricsHolder.java | 8 +
.../java/org/apache/phoenix/util/ScanUtil.java | 12 +-
14 files changed, 515 insertions(+), 88 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseOrderByIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseOrderByIT.java
index a0ef9bdb72..dcd7463887 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseOrderByIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseOrderByIT.java
@@ -44,16 +44,25 @@ import java.util.Properties;
import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.ExplainPlanAttributes;
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryBuilder;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
+import org.junit.Before;
import org.junit.Test;
public abstract class BaseOrderByIT extends ParallelStatsDisabledIT {
+ Properties props;
+ @Before
+ public void setup() {
+ props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(0));
+ }
+
@Test
public void testMultiOrderByExpr() throws Exception {
String tenantId = getOrganizationId();
@@ -63,7 +72,7 @@ public abstract class BaseOrderByIT extends ParallelStatsDisabledIT {
Lists.newArrayList("ENTITY_ID", "B_STRING"))
.setFullTableName(tableName)
.setOrderByClause("B_STRING, ENTITY_ID");
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
ResultSet rs = executeQuery(conn, queryBuilder);
assertTrue (rs.next());
@@ -98,7 +107,6 @@ public abstract class BaseOrderByIT extends ParallelStatsDisabledIT {
Lists.newArrayList("ENTITY_ID", "B_STRING"))
.setFullTableName(tableName)
.setOrderByClause("B_STRING || ENTITY_ID DESC");
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
ResultSet rs = executeQuery(conn, queryBuilder);
assertTrue (rs.next());
@@ -126,7 +134,6 @@ public abstract class BaseOrderByIT extends ParallelStatsDisabledIT {
@Test
public void testOrderByDifferentColumns() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.setAutoCommit(false);
String tableName = generateUniqueName();
@@ -201,7 +208,6 @@ public abstract class BaseOrderByIT extends ParallelStatsDisabledIT {
@Test
public void testAggregateOrderBy() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
String tableName = generateUniqueName();
String ddl = "create table " + tableName + " (ID VARCHAR NOT NULL PRIMARY KEY, VAL1 VARCHAR, VAL2 INTEGER)";
@@ -262,7 +268,6 @@ public abstract class BaseOrderByIT extends ParallelStatsDisabledIT {
@Test
public void testAggregateOptimizedOutOrderBy() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
String tableName = generateUniqueName();
String ddl = "create table " + tableName + " (K1 VARCHAR NOT NULL, K2 VARCHAR NOT NULL, VAL1 VARCHAR, VAL2 INTEGER, CONSTRAINT pk PRIMARY KEY(K1,K2))";
@@ -342,7 +347,6 @@ public abstract class BaseOrderByIT extends ParallelStatsDisabledIT {
@Test
public void testNullsLastWithDesc() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String tableName=generateUniqueName();
String sql="CREATE TABLE "+tableName+" ( "+
@@ -606,7 +610,6 @@ public abstract class BaseOrderByIT extends ParallelStatsDisabledIT {
}
private void doTestOrderByReverseOptimization(boolean salted,boolean desc1,boolean desc2,boolean desc3) throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String tableName=generateUniqueName();
String sql="CREATE TABLE "+tableName+" ( "+
@@ -712,7 +715,6 @@ public abstract class BaseOrderByIT extends ParallelStatsDisabledIT {
}
private void doTestOrderByReverseOptimizationWithNullsLast(boolean salted,boolean desc1,boolean desc2,boolean desc3) throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String tableName=generateUniqueName();
String sql="CREATE TABLE "+tableName+" ( "+
@@ -958,7 +960,6 @@ public abstract class BaseOrderByIT extends ParallelStatsDisabledIT {
@Test
public void testOrderByNullable() throws SQLException {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String sql = "CREATE TABLE IF NOT EXISTS us_population (state CHAR(2) NOT NULL," +
"city VARCHAR NOT NULL, population BIGINT CONSTRAINT my_pk PRIMARY KEY" +
@@ -1000,7 +1001,6 @@ public abstract class BaseOrderByIT extends ParallelStatsDisabledIT {
@Test
public void testPhoenix6999() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
String tableName = "TBL_" + generateUniqueName();
String descTableName = "TBL_" + generateUniqueName();
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
new file mode 100644
index 0000000000..d4368ee1d7
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.end2end.index.GlobalIndexCheckerIT.assertExplainPlan;
+import static org.apache.phoenix.end2end.index.GlobalIndexCheckerIT.assertExplainPlanWithLimit;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PAGED_ROWS_COUNTER;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.monitoring.MetricType;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.types.PDate;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.DateUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class ServerPagingIT extends ParallelStatsDisabledIT {
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(2);
+ props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(0));
+ props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ private void assertServerPagingMetric(String tableName, ResultSet rs, boolean isPaged) throws SQLException {
+ Map<String, Map<MetricType, Long>> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs);
+ for (Map.Entry<String, Map<MetricType, Long>> entry : metrics.entrySet()) {
+ assertEquals(String.format("Got %s", entry.getKey()), tableName, entry.getKey());
+ Map<MetricType, Long> metricValues = entry.getValue();
+ Long pagedRowsCntr = metricValues.get(MetricType.PAGED_ROWS_COUNTER);
+ assertNotNull(pagedRowsCntr);
+ if (isPaged) {
+ assertTrue(String.format("Got %d", pagedRowsCntr.longValue()), pagedRowsCntr > 0);
+ } else {
+ assertTrue(String.format("Got %d", pagedRowsCntr.longValue()), pagedRowsCntr == 0);
+ }
+ }
+ assertTrue(GLOBAL_PAGED_ROWS_COUNTER.getMetric().getValue() > 0);
+ }
+
+ @Test
+ public void testOrderByNonAggregation() throws Exception {
+ final String tablename = generateUniqueName();
+ final String tenantId = getOrganizationId();
+
+ final Date D1 = DateUtil.parseDate("1970-01-01 00:58:00");
+ final Date D2 = DateUtil.parseDate("1970-01-01 01:02:00");
+ final Date D3 = DateUtil.parseDate("1970-01-01 01:30:00");
+ final Date D4 = DateUtil.parseDate("1970-01-01 01:45:00");
+ final Date D5 = DateUtil.parseDate("1970-01-01 02:00:00");
+ final Date D6 = DateUtil.parseDate("1970-01-01 04:00:00");
+ final String F1 = "A";
+ final String F2 = "B";
+ final String F3 = "C";
+ final String R1 = "R1";
+ final String R2 = "R2";
+ byte[][] splits = new byte[][] {
+ ByteUtil.concat(Bytes.toBytes(tenantId), PDate.INSTANCE.toBytes(D3)),
+ ByteUtil.concat(Bytes.toBytes(tenantId), PDate.INSTANCE.toBytes(D5)),
+ };
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String ddl = "create table " + tablename +
+ " (organization_id char(15) not null," +
+ " date date not null," +
+ " feature char(1) not null," +
+ " unique_users integer not null,\n" +
+ " transactions bigint,\n" +
+ " region varchar,\n" +
+ " CONSTRAINT pk PRIMARY KEY (organization_id, \"DATE\", feature, unique_users))";
+ StringBuilder buf = new StringBuilder(ddl);
+ if (splits != null) {
+ buf.append(" SPLIT ON (");
+ for (int i = 0; i < splits.length; i++) {
+ buf.append("'").append(Bytes.toString(splits[i])).append("'").append(",");
+ }
+ buf.setCharAt(buf.length()-1, ')');
+ }
+ ddl = buf.toString();
+ conn.createStatement().execute(ddl);
+
+ PreparedStatement stmt = conn.prepareStatement(
+ "upsert into " + tablename +
+ " (" +
+ " ORGANIZATION_ID, " +
+ " \"DATE\", " +
+ " FEATURE, " +
+ " UNIQUE_USERS, " +
+ " TRANSACTIONS, " +
+ " REGION) " +
+ "VALUES (?, ?, ?, ?, ?, ?)");
+ stmt.setString(1, tenantId);
+ stmt.setDate(2, D1);
+ stmt.setString(3, F1);
+ stmt.setInt(4, 10);
+ stmt.setLong(5, 100L);
+ stmt.setString(6, R2);
+ stmt.execute();
+
+ stmt.setString(1, tenantId);
+ stmt.setDate(2, D2);
+ stmt.setString(3, F1);
+ stmt.setInt(4, 20);
+ stmt.setLong(5, 200);
+ stmt.setString(6, null);
+ stmt.execute();
+
+ stmt.setString(1, tenantId);
+ stmt.setDate(2, D3);
+ stmt.setString(3, F1);
+ stmt.setInt(4, 30);
+ stmt.setLong(5, 300);
+ stmt.setString(6, R1);
+ stmt.execute();
+
+ stmt.setString(1, tenantId);
+ stmt.setDate(2, D4);
+ stmt.setString(3, F2);
+ stmt.setInt(4, 40);
+ stmt.setLong(5, 400);
+ stmt.setString(6, R1);
+ stmt.execute();
+
+ stmt.setString(1, tenantId);
+ stmt.setDate(2, D5);
+ stmt.setString(3, F3);
+ stmt.setInt(4, 50);
+ stmt.setLong(5, 500);
+ stmt.setString(6, R2);
+ stmt.execute();
+
+ stmt.setString(1, tenantId);
+ stmt.setDate(2, D6);
+ stmt.setString(3, F1);
+ stmt.setInt(4, 60);
+ stmt.setLong(5, 600);
+ stmt.setString(6, null);
+ stmt.execute();
+ conn.commit();
+ }
+
+ String query = "SELECT \"DATE\", transactions t FROM "+tablename+
+ " WHERE organization_id=? AND unique_users <= 30 ORDER BY t DESC LIMIT 2";
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement statement = conn.prepareStatement(query)) {
+ statement.setString(1, tenantId);
+ try (ResultSet rs = statement.executeQuery()) {
+ assertTrue(rs.next());
+ assertEquals(D3.getTime(), rs.getDate(1).getTime());
+ assertTrue(rs.next());
+ assertEquals(D2.getTime(), rs.getDate(1).getTime());
+ assertFalse(rs.next());
+ assertServerPagingMetric(tablename, rs, true);
+ }
+ }
+ }
+
+ @Test
+ public void testLimitOffset() throws SQLException {
+ final String tablename = generateUniqueName();
+ final String[] STRINGS = { "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n",
+ "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z" };
+ String ddl = "CREATE TABLE " + tablename + " (t_id VARCHAR NOT NULL,\n" + "k1 INTEGER NOT NULL,\n"
+ + "k2 INTEGER NOT NULL,\n" + "C3.k3 INTEGER,\n" + "C2.v1 VARCHAR,\n"
+ + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2)) " + "SPLIT ON ('e','i','o')";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ createTestTable(getUrl(), ddl);
+ for (int i = 0; i < 26; i++) {
+ conn.createStatement().execute("UPSERT INTO " + tablename + " values('" + STRINGS[i] + "'," + i + ","
+ + (i + 1) + "," + (i + 2) + ",'" + STRINGS[25 - i] + "')");
+ }
+ conn.commit();
+ int limit = 10;
+ // Testing 0 as remaining offset after 4 rows in first region, 4 rows in second region
+ int offset = 8;
+ ResultSet rs;
+ rs = conn.createStatement()
+ .executeQuery("SELECT t_id from " + tablename + " order by t_id limit " + limit + " offset " + offset);
+ int i = 0;
+ while (i < limit) {
+ assertTrue(rs.next());
+ assertEquals("Expected string didn't match for i = " + i, STRINGS[offset + i], rs.getString(1));
+ i++;
+ }
+ // no paging when serial offset
+ assertServerPagingMetric(tablename, rs, false);
+
+ // Testing query with offset + filter
+ int filterCond = 10;
+ rs = conn.createStatement().executeQuery(
+ "SELECT t_id from " + tablename + " where k2 > " + filterCond +
+ " order by t_id limit " + limit + " offset " + offset);
+ i = 0;
+ limit = 5;
+ while (i < limit) {
+ assertTrue(rs.next());
+ assertEquals("Expected string didn't match for i = " + i,
+ STRINGS[offset + filterCond + i], rs.getString(1));
+ i++;
+ }
+ // no paging when serial offset
+ assertServerPagingMetric(tablename, rs, false);
+
+ limit = 35;
+ rs = conn.createStatement().executeQuery("SELECT t_id from " + tablename + " union all SELECT t_id from "
+ + tablename + " offset " + offset + " FETCH FIRST " + limit + " rows only");
+ i = 0;
+ while (i++ < STRINGS.length - offset) {
+ assertTrue(rs.next());
+ assertEquals(STRINGS[offset + i - 1], rs.getString(1));
+ }
+ i = 0;
+ while (i++ < limit - STRINGS.length - offset) {
+ assertTrue(rs.next());
+ assertEquals(STRINGS[i - 1], rs.getString(1));
+ }
+ // no paging when serial offset
+ assertServerPagingMetric(tablename, rs, false);
+ limit = 1;
+ offset = 1;
+ rs = conn.createStatement()
+ .executeQuery("SELECT k2 from " + tablename + " order by k2 desc limit " + limit + " offset " + offset);
+ assertTrue(rs.next());
+ assertEquals(25, rs.getInt(1));
+ assertFalse(rs.next());
+ // because of descending order the offset is implemented on client
+ // so this generates a parallel scan and paging happens
+ assertServerPagingMetric(tablename, rs, true);
+ }
+ }
+
+ @Test
+ public void testGroupBy() throws SQLException {
+ final String tablename = generateUniqueName();
+ final String indexName = generateUniqueName();
+
+ String ddl = "CREATE TABLE " + tablename + " (t_id VARCHAR NOT NULL,\n" + "k1 INTEGER NOT NULL,\n"
+ + "k2 INTEGER CONSTRAINT pk PRIMARY KEY (t_id, k1)) ";
+ String indexDDl = "CREATE INDEX IF NOT EXISTS " + indexName + " ON " + tablename + "(k2)";
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ createTestTable(getUrl(), ddl);
+ createTestTable(getUrl(), indexDDl);
+ for (int i = 0; i < 8; i++) {
+ conn.createStatement().execute("UPSERT INTO " + tablename + " values('tenant1'," + i + ","
+ + (i + 1) + ")");
+ }
+ conn.commit();
+ ResultSet rs = conn.createStatement().executeQuery(
+ "SELECT count(*) FROM " + tablename + " where t_id = 'tenant1' AND (k2 IN (5,6) or k2 is null) group by k2=6");
+ while (rs.next()) {
+ Assert.assertEquals(1, rs.getInt(1));
+ }
+ Assert.assertFalse(rs.next());
+ assertServerPagingMetric(indexName, rs, true);
+ }
+ }
+
+ @Test
+ public void testUncoveredQuery() throws Exception {
+ String dataTableName = generateUniqueName();
+ populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String indexTableName = generateUniqueName();
+ conn.createStatement().execute("CREATE INDEX "
+ + indexTableName + " on " + dataTableName + " (val1) include (val2) ");
+ String selectSql;
+ int limit = 10;
+ //Verify that with index hint, we will read from the index table even though val3 is not included by the index table
+ selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ val3,val2 from "
+ + dataTableName + " WHERE val1 = 'bc' AND (val2 = 'bcd' OR val3 ='bcde') LIMIT " + limit;
+ assertExplainPlanWithLimit(conn, selectSql, dataTableName, indexTableName, limit);
+ ResultSet rs = conn.createStatement().executeQuery(selectSql);
+ assertTrue(rs.next());
+ assertEquals("bcde", rs.getString(1));
+ assertEquals("bcd", rs.getString(2));
+ assertFalse(rs.next());
+ assertServerPagingMetric(indexTableName, rs, true);
+
+ // Add another row and run a group by query where the uncovered index should be used
+ conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val2, val3) values ('c', 'ab','cde', 'cdef')");
+ conn.commit();
+
+ selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ count(val3) from " + dataTableName + " where val1 > '0' GROUP BY val1";
+ // Verify that we will read from the index table
+ assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+ rs = conn.createStatement().executeQuery(selectSql);
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertFalse(rs.next());
+
+ selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ count(val3) from " + dataTableName + " where val1 > '0'";
+ // Verify that we will read from the index table
+ assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+ rs = conn.createStatement().executeQuery(selectSql);
+ assertTrue(rs.next());
+ assertEquals(3, rs.getInt(1));
+
+ // Run an order by query where the uncovered index should be used
+ selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + "*/ val3 from " + dataTableName + " where val1 > '0' ORDER BY val1";
+ // Verify that we will read from the index table
+ assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+ rs = conn.createStatement().executeQuery(selectSql);
+ assertTrue(rs.next());
+ assertEquals("abcd", rs.getString(1));
+ assertTrue(rs.next());
+ assertEquals("cdef", rs.getString(1));
+ assertTrue(rs.next());
+ assertEquals("bcde", rs.getString(1));
+ assertFalse(rs.next());
+ }
+ }
+
+ private void populateTable(String tableName) throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute("create table " + tableName +
+ " (id varchar(10) not null primary key, val1 varchar(10), val2 varchar(10)," +
+ " val3 varchar(10))");
+ conn.createStatement().execute("upsert into " + tableName + " values ('a', 'ab', 'abc', 'abcd')");
+ conn.commit();
+ conn.createStatement().execute("upsert into " + tableName + " values ('b', 'bc', 'bcd', 'bcde')");
+ conn.commit();
+ conn.close();
+ }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java
index b73da96d98..e807e75047 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java
@@ -27,6 +27,7 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.SQLTimeoutException;
import java.util.Properties;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
@@ -119,11 +120,16 @@ public class PhoenixQueryTimeoutIT extends ParallelStatsDisabledIT {
fail("Expected query to timeout with a 1 ms timeout");
} catch (SQLException e) {
//OPERATION_TIMED_OUT Exception expected
- if (e.getErrorCode() == IO_EXCEPTION.getErrorCode() && e.getCause() instanceof SQLException) {
- assertEquals(OPERATION_TIMED_OUT.getErrorCode(), ((SQLException) e.getCause()).getErrorCode());
- } else {
- assertEquals(OPERATION_TIMED_OUT.getErrorCode(), e.getErrorCode());
+ Throwable t = e;
+ // SQLTimeoutException can be wrapped inside outer exceptions like PhoenixIOException
+ while (t != null && !(t instanceof SQLTimeoutException)) {
+ t = t.getCause();
}
+ if (t == null) {
+ fail("Expected query to fail with SQLTimeoutException");
+ }
+ assertEquals(OPERATION_TIMED_OUT.getErrorCode(),
+ ((SQLTimeoutException)t).getErrorCode());
} finally {
BaseResultIterators.setForTestingSetTimeoutToMaxToLetQueryPassHere(false);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PagedRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PagedRegionScanner.java
index 9e4e0d625d..03460af96a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PagedRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PagedRegionScanner.java
@@ -24,7 +24,11 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.filter.PagedFilter;
+import org.apache.phoenix.util.ScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.phoenix.util.ScanUtil.getDummyResult;
import static org.apache.phoenix.util.ScanUtil.getPhoenixPagedFilter;
@@ -42,6 +46,9 @@ public class PagedRegionScanner extends BaseRegionScanner {
protected Region region;
protected Scan scan;
protected PagedFilter pageFilter;
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PagedRegionScanner.class);
+
public PagedRegionScanner(Region region, RegionScanner scanner, Scan scan) {
super(scanner);
this.region = region;
@@ -54,6 +61,9 @@ public class PagedRegionScanner extends BaseRegionScanner {
private boolean next(List<Cell> results, boolean raw) throws IOException {
try {
+ if (pageFilter != null) {
+ pageFilter.init();
+ }
boolean hasMore = raw ? delegate.nextRaw(results) : delegate.next(results);
if (pageFilter == null) {
return hasMore;
@@ -65,19 +75,20 @@ public class PagedRegionScanner extends BaseRegionScanner {
// Close the current region scanner, start a new one and return a dummy result
delegate.close();
byte[] rowKey = pageFilter.getRowKeyAtStop();
- scan.withStartRow(rowKey, true);
+ boolean isInclusive = pageFilter.isNextRowInclusive();
+ scan.withStartRow(rowKey, isInclusive);
delegate = region.getScanner(scan);
if (results.isEmpty()) {
- getDummyResult(rowKey, results);
+ LOGGER.info("Page filter stopped, generating dummy key {} inclusive={}",
+ Bytes.toStringBinary(rowKey), isInclusive);
+ ScanUtil.getDummyResult(rowKey, results);
}
- pageFilter.init();
return true;
}
return false;
} else {
// We got a row from the HBase scanner within the configured time (i.e., the page size). We need to
// start a new page on the next next() call.
- pageFilter.resetStartTime();
return true;
}
} catch (Exception e) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
index 5fb1244e91..ddcb6fff7a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
@@ -123,6 +123,9 @@ public class UncoveredGlobalIndexRegionScanner extends UncoveredIndexRegionScann
protected void scanDataRows(Collection<byte[]> dataRowKeys, long startTime) throws IOException {
Scan dataScan = prepareDataTableScan(dataRowKeys);
+ if (dataScan == null) {
+ return;
+ }
try (ResultScanner resultScanner = dataHTable.getScanner(dataScan)) {
for (Result result = resultScanner.next(); (result != null);
result = resultScanner.next()) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
index 7cf87dcf00..c6c45eae02 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
@@ -158,17 +158,27 @@ public abstract class UncoveredIndexRegionScanner extends BaseRegionScanner {
protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException {
List<KeyRange> keys = new ArrayList<>(dataRowKeys.size());
for (byte[] dataRowKey : dataRowKeys) {
- keys.add(PVarbinary.INSTANCE.getKeyRange(dataRowKey, SortOrder.ASC));
+ // If the data table scan was interrupted because of paging we retry the scan
+ // but on retry we should only fetch data table rows which we haven't already
+ // fetched.
+ if (!dataRows.containsKey(new ImmutableBytesPtr(dataRowKey))) {
+ keys.add(PVarbinary.INSTANCE.getKeyRange(dataRowKey, SortOrder.ASC));
+ }
+ }
+ if (!keys.isEmpty()) {
+ ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
+ Scan dataScan = new Scan(dataTableScan);
+ dataScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax());
+ scanRanges.initializeScan(dataScan);
+ SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
+ dataScan.setFilter(new SkipScanFilter(skipScanFilter, false));
+ dataScan.setAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS,
+ Bytes.toBytes(Long.valueOf(pageSizeMs)));
+ return dataScan;
+ } else {
+ LOGGER.info("All data rows have already been fetched");
+ return null;
}
- ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
- Scan dataScan = new Scan(dataTableScan);
- dataScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax());
- scanRanges.initializeScan(dataScan);
- SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
- dataScan.setFilter(new SkipScanFilter(skipScanFilter, false));
- scan.setAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS,
- Bytes.toBytes(Long.valueOf(pageSizeMs)));
- return dataScan;
}
protected boolean scanIndexTableRows(List<Cell> result,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
index 0d6ddc39d0..d7273aa8a2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
@@ -67,6 +67,9 @@ public class UncoveredLocalIndexRegionScanner extends UncoveredIndexRegionScanne
protected void scanDataRows(Collection<byte[]> dataRowKeys, long startTime) throws IOException {
Scan dataScan = prepareDataTableScan(dataRowKeys);
+ if (dataScan == null) {
+ return;
+ }
try (RegionScanner regionScanner = region.getScanner(dataScan)) {
boolean hasMore;
do {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/PagedFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/PagedFilter.java
index 8d785c3ab4..370dbf007d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/PagedFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/PagedFilter.java
@@ -46,7 +46,11 @@ public class PagedFilter extends FilterBase implements Writable {
State state;
private long pageSizeMs;
private long startTime;
- private byte[] rowKeyAtStop;
+ // tracks the row which we will visit next. It is not always a full row key and maybe
+ // just the row key prefix.
+ private Cell nextHintCell;
+ // tracks the row we last visited
+ private Cell currentCell;
private Filter delegate = null;
public PagedFilter() {
@@ -68,10 +72,19 @@ public class PagedFilter extends FilterBase implements Writable {
}
public byte[] getRowKeyAtStop() {
- if (rowKeyAtStop != null) {
- return Arrays.copyOf(rowKeyAtStop, rowKeyAtStop.length);
+ byte[] rowKeyAtStop = null;
+ if (nextHintCell != null) {
+ // if we have already seeked to the next cell use that when we resume the scan
+ rowKeyAtStop = CellUtil.cloneRow(nextHintCell);
+ } else if (currentCell != null) {
+ rowKeyAtStop = CellUtil.cloneRow(currentCell);
}
- return null;
+ return rowKeyAtStop;
+ }
+
+ public boolean isNextRowInclusive() {
+ // since this can be a key prefix we have to set inclusive to true when resuming scan
+ return nextHintCell != null;
}
public boolean isStopped() {
@@ -80,21 +93,19 @@ public class PagedFilter extends FilterBase implements Writable {
public void init() {
state = State.INITIAL;
- rowKeyAtStop = null;
- }
-
- public void resetStartTime() {
- if (state == State.STARTED) {
- init();
- }
+ currentCell = null;
+ nextHintCell = null;
}
@Override
public void reset() throws IOException {
- if (state == State.INITIAL) {
- startTime = EnvironmentEdgeManager.currentTimeMillis();
- state = State.STARTED;
- } else if (state == State.STARTED && EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) {
+ long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+ // reset can be called multiple times for the same row sometimes even before we have
+ // scanned even one row. The order in which it is called is not very predictable.
+ // So we need to ensure that we have seen at least one row before we page.
+ // The currentCell != null check ensures that.
+ if (state == State.STARTED && currentCell != null
+ && currentTime - startTime >= pageSizeMs) {
state = State.TIME_TO_STOP;
}
if (delegate != null) {
@@ -107,33 +118,19 @@ public class PagedFilter extends FilterBase implements Writable {
@Override
public Cell getNextCellHint(Cell currentKV) throws IOException {
if (delegate != null) {
- return delegate.getNextCellHint(currentKV);
+ Cell ret = delegate.getNextCellHint(currentKV);
+ // save the hint so that if the filter stops we know where to resume the scan
+ nextHintCell = ret;
+ return ret;
}
return super.getNextCellHint(currentKV);
}
- public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
- if (state == State.TIME_TO_STOP) {
- if (rowKeyAtStop == null) {
- rowKeyAtStop = new byte[length];
- Bytes.putBytes(rowKeyAtStop, 0, buffer, offset, length);
- }
- return true;
- }
- if (delegate != null) {
- return delegate.filterRowKey(buffer, offset, length);
- }
- return super.filterRowKey(buffer, offset, length);
- }
-
@Override
public boolean filterRowKey(Cell cell) throws IOException {
- if (state == State.TIME_TO_STOP) {
- if (rowKeyAtStop == null) {
- rowKeyAtStop = CellUtil.cloneRow(cell);
- }
- return true;
- }
+ currentCell = cell;
+ // now that we have visited the row we need to reset the hint
+ nextHintCell = null;
if (delegate != null) {
return delegate.filterRowKey(cell);
}
@@ -142,10 +139,13 @@ public class PagedFilter extends FilterBase implements Writable {
@Override
public boolean filterAllRemaining() throws IOException {
- if (state == State.TIME_TO_STOP && rowKeyAtStop != null) {
+ if (state == State.TIME_TO_STOP) {
state = State.STOPPED;
return true;
}
+ if (state == State.STOPPED) {
+ return true;
+ }
if (delegate != null) {
return delegate.filterAllRemaining();
}
@@ -153,7 +153,14 @@ public class PagedFilter extends FilterBase implements Writable {
}
@Override
+ /**
+ * This is called once for every row in the beginning.
+ */
public boolean hasFilterRow() {
+ if (state == State.INITIAL) {
+ startTime = EnvironmentEdgeManager.currentTimeMillis();
+ state = State.STARTED;
+ }
return true;
}
@@ -211,6 +218,7 @@ public class PagedFilter extends FilterBase implements Writable {
@Override
public ReturnCode filterKeyValue(Cell v) throws IOException {
+
if (delegate != null) {
return delegate.filterKeyValue(v);
}
@@ -219,6 +227,8 @@ public class PagedFilter extends FilterBase implements Writable {
@Override
public Filter.ReturnCode filterCell(Cell c) throws IOException {
+ currentCell = c;
+ nextHintCell = null;
if (delegate != null) {
return delegate.filterCell(c);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
index 3eecfc82c3..e00a9fbeb4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
@@ -20,13 +20,10 @@ package org.apache.phoenix.iterate;
import java.sql.SQLException;
import java.util.List;
-import org.apache.phoenix.compile.ExplainPlanAttributes
- .ExplainPlanAttributesBuilder;
-import org.apache.phoenix.schema.tuple.Tuple;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
-import static org.apache.phoenix.util.ScanUtil.getDummyResult;
-import static org.apache.phoenix.util.ScanUtil.getDummyTuple;
+import org.apache.phoenix.compile.ExplainPlanAttributes.ExplainPlanAttributesBuilder;
+import org.apache.phoenix.schema.tuple.Tuple;
/**
* Iterates through tuples up to a limit
@@ -49,14 +46,16 @@ public class OffsetResultIterator extends DelegateResultIterator {
}
@Override
public Tuple next() throws SQLException {
- long startTime = EnvironmentEdgeManager.currentTimeMillis();
while (rowCount < offset) {
Tuple tuple = super.next();
if (tuple == null) { return null; }
- rowCount++;
- if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) {
- return getDummyTuple(tuple);
+ if (isDummy(tuple)) {
+ // while rowCount < offset absorb the dummy and call next on the underlying scanner
+ continue;
}
+ rowCount++;
+ // no page timeout check at this level because we cannot correctly resume
+ // scans for OFFSET queries until the offset is reached
}
return super.next();
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
index 8189fbcef9..6b9c84e2c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
@@ -40,6 +40,7 @@ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COU
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_RPC_CALLS;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_RPC_RETRIES;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_SCANNED_REGIONS;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PAGED_ROWS_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SCAN_BYTES;
import static org.apache.phoenix.util.ScanUtil.isDummy;
@@ -52,8 +53,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
-import org.apache.phoenix.compile.ExplainPlanAttributes
- .ExplainPlanAttributesBuilder;
+import org.apache.phoenix.compile.ExplainPlanAttributes.ExplainPlanAttributesBuilder;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -81,6 +81,8 @@ public class ScanningResultIterator implements ResultIterator {
private final boolean isMapReduceContext;
private final long maxQueryEndTime;
+ private long dummyRowCounter = 0;
+
public ScanningResultIterator(ResultScanner scanner, Scan scan, ScanMetricsHolder scanMetricsHolder, StatementContext context, boolean isMapReduceContext, long maxQueryEndTime) {
this.scanner = scanner;
this.scanMetricsHolder = scanMetricsHolder;
@@ -99,13 +101,13 @@ public class ScanningResultIterator implements ResultIterator {
}
private void changeMetric(CombinableMetric metric, Long value) {
- if(value != null) {
+ if (value != null) {
metric.change(value);
}
}
private void changeMetric(GlobalClientMetrics metric, Long value) {
- if(value != null) {
+ if (value != null) {
metric.update(value);
}
}
@@ -139,6 +141,7 @@ public class ScanningResultIterator implements ResultIterator {
scanMetricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
changeMetric(scanMetricsHolder.getCountOfRowsFiltered(),
scanMetricsMap.get(COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME));
+ changeMetric(scanMetricsHolder.getCountOfRowsPaged(), dummyRowCounter);
changeMetric(GLOBAL_SCAN_BYTES,
scanMetricsMap.get(BYTES_IN_RESULTS_METRIC_NAME));
@@ -165,6 +168,8 @@ public class ScanningResultIterator implements ResultIterator {
changeMetric(GLOBAL_HBASE_COUNT_ROWS_FILTERED,
scanMetricsMap.get(COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME));
+ changeMetric(GLOBAL_PAGED_ROWS_COUNTER, dummyRowCounter);
+
scanMetricsUpdated = true;
}
@@ -175,6 +180,7 @@ public class ScanningResultIterator implements ResultIterator {
try {
Result result = scanner.next();
while (result != null && (result.isEmpty() || isDummy(result))) {
+ dummyRowCounter += 1;
long timeOutForScan = maxQueryEndTime - EnvironmentEdgeManager.currentTimeMillis();
if (timeOutForScan < 0) {
throw new SQLExceptionInfo.Builder(OPERATION_TIMED_OUT).setMessage(
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
index ad19e05aa5..4a2dacd688 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
@@ -30,6 +30,7 @@ import static org.apache.phoenix.monitoring.MetricType.NUM_PARALLEL_SCANS;
import static org.apache.phoenix.monitoring.MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.INDEX_COMMIT_FAILURE_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.PAGED_ROWS_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.QUERY_SERVICES_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.QUERY_TIME;
@@ -109,7 +110,7 @@ public enum GlobalClientMetrics {
GLOBAL_HCONNECTIONS_COUNTER(HCONNECTIONS_COUNTER),
GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER(PHOENIX_CONNECTIONS_THROTTLED_COUNTER),
GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER(PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER),
-
+ GLOBAL_PAGED_ROWS_COUNTER(PAGED_ROWS_COUNTER),
GLOBAL_HBASE_COUNT_RPC_CALLS(COUNT_RPC_CALLS),
GLOBAL_HBASE_COUNT_REMOTE_RPC_CALLS(COUNT_REMOTE_RPC_CALLS),
GLOBAL_HBASE_COUNT_MILLS_BETWEEN_NEXTS(COUNT_MILLS_BETWEEN_NEXTS),
@@ -124,7 +125,6 @@ public enum GlobalClientMetrics {
GLOBAL_CLIENT_METADATA_CACHE_MISS_COUNTER(CLIENT_METADATA_CACHE_MISS_COUNTER),
GLOBAL_CLIENT_METADATA_CACHE_HIT_COUNTER(CLIENT_METADATA_CACHE_HIT_COUNTER);
-
private static final Logger LOGGER = LoggerFactory.getLogger(GlobalClientMetrics.class);
private static final boolean isGlobalMetricsEnabled = QueryServicesOptions.withDefaults().isGlobalMetricsEnabled();
private MetricType metricType;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
index 75907761e7..fae7c0a5fe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -67,6 +67,8 @@ public enum MetricType {
PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER("ca","Number of requests for Phoenix connections, whether successful or not.",LogLevel.OFF, PLong.INSTANCE),
CLIENT_METADATA_CACHE_MISS_COUNTER("cmcm", "Number of cache misses for the CQSI cache.", LogLevel.DEBUG, PLong.INSTANCE),
CLIENT_METADATA_CACHE_HIT_COUNTER("cmch", "Number of cache hits for the CQSI cache.", LogLevel.DEBUG, PLong.INSTANCE),
+ PAGED_ROWS_COUNTER("prc", "Number of dummy rows returned to client due to paging.", LogLevel.DEBUG, PLong.INSTANCE),
+
// hbase metrics
COUNT_RPC_CALLS("rp", "Number of RPC calls",LogLevel.DEBUG, PLong.INSTANCE),
COUNT_REMOTE_RPC_CALLS("rr", "Number of remote RPC calls",LogLevel.DEBUG, PLong.INSTANCE),
@@ -118,7 +120,7 @@ public enum MetricType {
public static String getMetricColumnsDetails() {
StringBuilder buffer=new StringBuilder();
- for(MetricType metric:MetricType.values()){
+ for (MetricType metric:MetricType.values()) {
if (metric.logLevel() != LogLevel.OFF) {
buffer.append(metric.columnName());
buffer.append(" ");
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
index dd6603e152..8b3276a437 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
@@ -28,6 +28,7 @@ import static org.apache.phoenix.monitoring.MetricType.COUNT_ROWS_SCANNED;
import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_CALLS;
import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_RETRIES;
import static org.apache.phoenix.monitoring.MetricType.COUNT_SCANNED_REGIONS;
+import static org.apache.phoenix.monitoring.MetricType.PAGED_ROWS_COUNTER;
import java.io.IOException;
import java.util.Map;
@@ -49,6 +50,8 @@ public class ScanMetricsHolder {
private final CombinableMetric countOfRemoteRPCRetries;
private final CombinableMetric countOfRowsScanned;
private final CombinableMetric countOfRowsFiltered;
+ private final CombinableMetric countOfRowsPaged;
+
private Map<String, Long> scanMetricMap;
private Object scan;
@@ -78,6 +81,7 @@ public class ScanMetricsHolder {
countOfRemoteRPCRetries = readMetrics.allotMetric(COUNT_REMOTE_RPC_RETRIES, tableName);
countOfRowsScanned = readMetrics.allotMetric(COUNT_ROWS_SCANNED, tableName);
countOfRowsFiltered = readMetrics.allotMetric(COUNT_ROWS_FILTERED, tableName);
+ countOfRowsPaged = readMetrics.allotMetric(PAGED_ROWS_COUNTER, tableName);
}
public CombinableMetric getCountOfRemoteRPCcalls() {
@@ -128,6 +132,10 @@ public class ScanMetricsHolder {
return scanMetricMap;
}
+ public CombinableMetric getCountOfRowsPaged() {
+ return countOfRowsPaged;
+ }
+
public void setScanMetricMap(Map<String, Long> scanMetricMap) {
this.scanMetricMap = scanMetricMap;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 98e3a039ca..6122fec9b3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -484,7 +484,7 @@ public class ScanUtil {
// But if last field of rowKey is variable length and also DESC, the trailing 0xFF
// is not removed when stored in HBASE, so for such case, we should not set
// lastInclusiveUpperSingleKey back to false.
- if(sepByte != QueryConstants.DESC_SEPARATOR_BYTE) {
+ if (sepByte != QueryConstants.DESC_SEPARATOR_BYTE) {
lastInclusiveUpperSingleKey &= (fieldIndex + slotSpan[i]) < schema.getMaxFields()-1;
}
}
@@ -686,10 +686,10 @@ public class ScanUtil {
public static void setupLocalIndexScan(Scan scan) {
byte[] prefix = scan.getStartRow().length == 0 ? new byte[scan.getStopRow().length]: scan.getStartRow();
int prefixLength = scan.getStartRow().length == 0? scan.getStopRow().length: scan.getStartRow().length;
- if(scan.getAttribute(SCAN_START_ROW_SUFFIX)!=null) {
+ if(scan.getAttribute(SCAN_START_ROW_SUFFIX) != null) {
scan.setStartRow(ScanRanges.prefixKey(scan.getAttribute(SCAN_START_ROW_SUFFIX), 0, prefix, prefixLength));
}
- if(scan.getAttribute(SCAN_STOP_ROW_SUFFIX)!=null) {
+ if(scan.getAttribute(SCAN_STOP_ROW_SUFFIX) != null) {
scan.setStopRow(ScanRanges.prefixKey(scan.getAttribute(SCAN_STOP_ROW_SUFFIX), 0, prefix, prefixLength));
}
}
@@ -711,7 +711,7 @@ public class ScanUtil {
* @param newScan
*/
public static void setLocalIndexAttributes(Scan newScan, int keyOffset, byte[] regionStartKey, byte[] regionEndKey, byte[] startRowSuffix, byte[] stopRowSuffix) {
- if(ScanUtil.isLocalIndex(newScan)) {
+ if (ScanUtil.isLocalIndex(newScan)) {
newScan.setAttribute(SCAN_ACTUAL_START_ROW, regionStartKey);
newScan.setStartRow(regionStartKey);
newScan.setStopRow(regionEndKey);
@@ -787,7 +787,7 @@ public class ScanUtil {
public static int getRowKeyPosition(int[] slotSpan, int slotPosition) {
int offset = 0;
- for(int i = 0; i < slotPosition; i++) {
+ for (int i = 0; i < slotPosition; i++) {
offset += slotSpan[i];
}
@@ -1298,7 +1298,7 @@ public class ScanUtil {
public static boolean isDummy(Tuple tuple) {
if (tuple instanceof ResultTuple) {
- isDummy(((ResultTuple) tuple).getResult());
+ return isDummy(((ResultTuple) tuple).getResult());
}
return false;
}