You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ss...@apache.org on 2017/10/03 07:45:26 UTC
[1/3] phoenix git commit: PHOENIX-3112 Partial row scan not handled
correctly
Repository: phoenix
Updated Branches:
refs/heads/4.x-HBase-1.1 edce39e25 -> 8dfc88db1
refs/heads/4.x-HBase-1.2 d231db891 -> e5a9c72e3
refs/heads/master bd21ed3d6 -> aaa41a33d
PHOENIX-3112 Partial row scan not handled correctly
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8dfc88db
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8dfc88db
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8dfc88db
Branch: refs/heads/4.x-HBase-1.1
Commit: 8dfc88db12188ad5d80bf6071a8a2b879e2558f7
Parents: edce39e
Author: Sergey Soldatov <ss...@apache.org>
Authored: Wed Aug 2 16:56:04 2017 -0700
Committer: Sergey Soldatov <ss...@apache.org>
Committed: Tue Oct 3 00:39:06 2017 -0700
----------------------------------------------------------------------
.../PartialResultServerConfigurationIT.java | 148 ++++++++++++++
.../PartialScannerResultsDisabledIT.java | 193 +++++++++++++++++++
.../DataTableLocalIndexRegionScanner.java | 7 +-
.../hbase/regionserver/ScannerContextUtil.java | 41 ++++
.../phoenix/coprocessor/BaseRegionScanner.java | 4 +-
.../coprocessor/BaseScannerRegionObserver.java | 11 +-
.../coprocessor/DelegateRegionScanner.java | 4 +-
.../coprocessor/HashJoinRegionScanner.java | 38 +---
.../phoenix/iterate/RegionScannerFactory.java | 51 +----
.../phoenix/schema/stats/StatisticsScanner.java | 4 +-
10 files changed, 411 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8dfc88db/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java
new file mode 100644
index 0000000..1c9ac38
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.query.ConnectionQueryServices.Feature;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
+import static org.apache.phoenix.query.BaseTest.generateUniqueName;
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+import static org.apache.phoenix.query.QueryServices.*;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+
+/**
+ * This is a separate from @PartialResultDisabledIT because it requires server side configuration
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class PartialResultServerConfigurationIT {
+ private static HBaseTestingUtility hbaseTestUtil;
+ private static String zkQuorum;
+ private static String url;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ hbaseTestUtil = new HBaseTestingUtility(conf);
+ setUpConfigForMiniCluster(conf);
+
+ //Enforce the limit of the result, so scans will stop between cells.
+ conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 5);
+ conf.setLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 5);
+
+ hbaseTestUtil.startMiniCluster();
+ zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
+ url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
+
+ DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ try {
+ DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
+ } finally {
+ hbaseTestUtil.shutdownMiniCluster();
+ }
+ }
+
+ /**
+ * This test creates two tables with a single record at the end of each tables that match the join condition
+ * if scanner context is used during the scan, it would produce a partial row with NULL values.
+ * @throws Exception
+ */
+ @Test
+ public void testJoinScan() throws Exception {
+ String tableNameR = generateUniqueName();
+ String tableNameL = generateUniqueName();
+
+ int numRecords = 1000;
+ try (Connection conn = DriverManager.getConnection(url)) {
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableNameR + " (PK1 INTEGER NOT NULL PRIMARY KEY, KV1 VARCHAR)");
+ int i = 0;
+ String upsert = "UPSERT INTO " + tableNameR + " VALUES (?, ?)";
+ Random random = new Random();
+ PreparedStatement stmt = conn.prepareStatement(upsert);
+ while (i < numRecords) {
+ stmt.setInt(1, i);
+ stmt.setString(2, UUID.randomUUID().toString());
+ stmt.executeUpdate();
+ i++;
+ }
+ stmt.setInt(1, 9999);
+ stmt.setString(2, UUID.randomUUID().toString());
+ stmt.executeUpdate();
+ conn.commit();
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableNameL + " (PK1 INTEGER NOT NULL PRIMARY KEY, KV1 VARCHAR)");
+ upsert = "UPSERT INTO " + tableNameL + " VALUES (?, ?)";
+ stmt = conn.prepareStatement(upsert);
+ while (i < numRecords * 2) {
+ stmt.setInt(1, random.nextInt());
+ stmt.setString(2, "KV" + random.nextInt());
+ stmt.executeUpdate();
+ i++;
+ }
+ stmt.setInt(1, 9999);
+ stmt.setString(2, UUID.randomUUID().toString());
+ stmt.executeUpdate();
+ conn.commit();
+
+ String sql = "SELECT * FROM " + tableNameR + " A JOIN " + tableNameL + " B ON A.PK1 = B.PK1";
+ Statement s = conn.createStatement();
+ s.setFetchSize(2);
+ ResultSet rs = s.executeQuery(sql);
+ int count = 0;
+ while (rs.next()) {
+ if (rs.getString(2) == null || rs.getString(4) == null)
+ fail("Null value because of partial result from scan");
+ count++;
+ }
+ assertEquals(count, 1);
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8dfc88db/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java
new file mode 100644
index 0000000..817b0bd
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.util.IndexScrutiny;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PartialScannerResultsDisabledIT extends ParallelStatsDisabledIT {
+ public static final String TEST_TABLE_DDL =
+ "CREATE TABLE IF NOT EXISTS %s\n" + "(\n" + " ORGANIZATION_ID CHAR(15) NOT NULL,\n"
+ + " FEED_ELEMENT_ID CHAR(15) NOT NULL,\n"
+ + " CONTAINER_ID CHAR(15) NOT NULL,\n"
+ + " FEED_TYPE VARCHAR(1) NOT NULL, \n"
+ + " NETWORK_ID CHAR(15) NOT NULL,\n" + " USER_ID CHAR(15) NOT NULL,\n"
+ + " CREATED_TIME TIMESTAMP,\n" + " LAST_UPDATE TIMESTAMP,\n"
+ + " RELEVANCE_SCORE DOUBLE,\n" + " FEED_ITEM_TYPE VARCHAR(1),\n"
+ + " FEED_ELEMENT_TYPE VARCHAR(1),\n"
+ + " FEED_ELEMENT_IS_SYS_GEN BOOLEAN,\n"
+ + " FEED_ELEMENT_STATUS VARCHAR(1),\n"
+ + " FEED_ELEMENT_VISIBILITY VARCHAR(1),\n" + " PARENT_ID CHAR(15),\n"
+ + " CREATED_BY CHAR(15),\n" + " BEST_COMMENT_ID CHAR(15),\n"
+ + " COMMENT_COUNT INTEGER,\n" + " CONSTRAINT PK PRIMARY KEY\n" + " (\n"
+ + " ORGANIZATION_ID,\n" + " FEED_ELEMENT_ID,\n"
+ + " CONTAINER_ID,\n" + " FEED_TYPE,\n" + " NETWORK_ID,\n"
+ + " USER_ID\n" + " )\n" + ") COLUMN_ENCODED_BYTES = 0";
+
+ public static final String INDEX_1_DDL =
+ "CREATE INDEX IF NOT EXISTS %s\n" + "ON %s (\n" + " NETWORK_ID,\n"
+ + " CONTAINER_ID,\n" + " FEED_TYPE,\n" + " USER_ID,\n"
+ + " CREATED_TIME DESC,\n" + " FEED_ELEMENT_ID DESC,\n"
+ + " CREATED_BY\n" + ") "
+ + " INCLUDE (\n" + " FEED_ITEM_TYPE,\n"
+ + " FEED_ELEMENT_TYPE,\n" + " FEED_ELEMENT_IS_SYS_GEN,\n"
+ + " FEED_ELEMENT_STATUS,\n" + " FEED_ELEMENT_VISIBILITY,\n"
+ + " PARENT_ID,\n" + " BEST_COMMENT_ID,\n" + " COMMENT_COUNT\n" + ")";
+
+ private static final String UPSERT_INTO_DATA_TABLE =
+ "UPSERT INTO %s\n" + "(\n" + " ORGANIZATION_ID,\n" + " FEED_ELEMENT_ID,\n"
+ + " CONTAINER_ID,\n" + " FEED_TYPE,\n" + " NETWORK_ID,\n"
+ + " USER_ID,\n" + " CREATED_TIME,\n" + " LAST_UPDATE,\n"
+ + " FEED_ITEM_TYPE,\n" + " FEED_ELEMENT_TYPE,\n"
+ + " FEED_ELEMENT_IS_SYS_GEN,\n" + " FEED_ELEMENT_STATUS,\n"
+ + " FEED_ELEMENT_VISIBILITY,\n" + " PARENT_ID,\n" + " CREATED_BY,\n"
+ + " BEST_COMMENT_ID,\n" + " COMMENT_COUNT\n" + ")"
+ + "VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
+
+ private String dataTableName;
+ private String indexTableName;
+ private String schemaName;
+ private String dataTableFullName;
+ private static String indexTableFullName;
+ private static final Logger logger = LoggerFactory.getLogger(PartialScannerResultsDisabledIT.class);
+ private static Random random = new Random(1);
+ // background writer threads
+ private static Random sourceOfRandomness = new Random(0);
+ private static AtomicInteger upsertIdCounter = new AtomicInteger(1);
+
+ @Before
+ public void setup() throws Exception {
+ // create the tables
+ generateUniqueTableNames();
+ createTestTable(getUrl(), String.format(TEST_TABLE_DDL, dataTableFullName));
+ createTestTable(getUrl(), String.format(INDEX_1_DDL, indexTableName, dataTableFullName));
+ }
+
+ @Test
+ public void testWithEnoughData() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ // Write enough data to trigger partial scanner results
+ // TODO: it's likely that less data could be written if whatever
+ // config parameters decide this are lowered.
+ writeSingleBatch(conn, 100, 20, dataTableFullName);
+ logger.info("Running scrutiny");
+ // Scutunize index to see if partial results are silently returned
+ // In that case we'll get a false positive on the scrutiny run.
+ long rowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+ assertEquals(2000,rowCount);
+ }
+ }
+
+ /**
+ * Simple select query with fetch size that exceed the result size. In that case scan would start to produce
+ * partial result sets that from Phoenix perspective are the rows with NULL values.
+ * @throws SQLException
+ */
+ @Test
+ public void partialResultDuringSelect () throws SQLException {
+ String tableName = generateUniqueName();
+ Properties props = new Properties();
+ props.setProperty(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, "5");
+ int numRecords = 10;
+ try (Connection conn = DriverManager.getConnection(url, props)) {
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableName + " (PK1 INTEGER NOT NULL PRIMARY KEY, KV1 VARCHAR)");
+ int i = 0;
+ String upsert = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+ PreparedStatement stmt = conn.prepareStatement(upsert);
+ while (i < numRecords) {
+ stmt.setInt(1, i);
+ stmt.setString(2, UUID.randomUUID().toString());
+ stmt.executeUpdate();
+ i++;
+ }
+ conn.commit();
+
+ String sql = "SELECT * FROM " + tableName;
+ // at every next call wait for this period. This will cause lease to expire.
+ Statement s = conn.createStatement();
+ s.setFetchSize(100);
+ ResultSet rs = s.executeQuery(sql);
+ int count = 0;
+ while (rs.next()) {
+ if (rs.getString(2) == null)
+ fail("Null value because of partial row scan");
+ }
+ count++;
+ }
+
+ }
+
+ private static String randString(int length, Random random) {
+ return RandomStringUtils.randomAlphabetic(length);
+ }
+
+ private void writeSingleBatch(Connection connection, int batchSize, int numBatches, String tableName) throws Exception {
+ for (int j = 0; j < numBatches; j++) {
+ try (PreparedStatement statement =
+ connection.prepareStatement(String.format(UPSERT_INTO_DATA_TABLE, tableName))) {
+ for (int i = 0; i < batchSize; i++) {
+ int index = 0;
+ String id = "" + upsertIdCounter.getAndIncrement();
+ statement.setString(++index, id); // ORGANIZATION_ID
+ statement.setString(++index, id); // FEED_ELEMENT_ID,\n"
+ statement.setString(++index, id); // CONTAINER_ID,\n"
+ statement.setString(++index, randString(1, sourceOfRandomness)); // FEED_TYPE,\n"
+ statement.setString(++index, randString(15, sourceOfRandomness)); // NETWORK_ID,\n"
+ statement.setString(++index, randString(15, sourceOfRandomness)); // USER_ID,\n"
+ statement.setTimestamp(++index, new Timestamp(System.currentTimeMillis())); // CREATED_TIME,\n"
+ statement.setTimestamp(++index, new Timestamp(System.currentTimeMillis())); // LAST_UPDATE\n"
+ statement.setString(++index, randString(1, sourceOfRandomness)); // FEED_ITEM_TYPE\n"
+ statement.setString(++index, randString(1, sourceOfRandomness)); // FEED_ELEMENT_TYPE\n"
+ statement.setBoolean(++index, false); // FEED_ELEMENT_IS_SYS_GEN\n"
+ statement.setString(++index, randString(1, sourceOfRandomness)); // FEED_ELEMENT_STATUS\n"
+ statement.setString(++index, randString(1, sourceOfRandomness)); // FEED_ELEMENT_VISIBILITY\n"
+ statement.setString(++index, randString(15, sourceOfRandomness)); // PARENT_ID\n"
+ statement.setString(++index, randString(15, sourceOfRandomness)); // CREATED_BY\n"
+ statement.setString(++index, randString(15, sourceOfRandomness)); // BEST_COMMENT_ID\n"
+ statement.setInt(++index, random.nextInt()); // COMMENT_COUNT\n" + ")"
+ statement.execute();
+ }
+ connection.commit();
+ }
+ }
+ }
+
+ private void generateUniqueTableNames() {
+ schemaName = generateUniqueName();
+ dataTableName = generateUniqueName() + "_DATA";
+ dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ indexTableName = generateUniqueName() + "_IDX";
+ indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8dfc88db/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java
index 4c44e82..64d4ac4 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java
@@ -85,8 +85,13 @@ public class DataTableLocalIndexRegionScanner extends DelegateRegionScanner {
@Override
public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
+ return next(outResult);
+ }
+
+ @Override
+ public boolean next(List<Cell> results) throws IOException {
List<Cell> dataTableResults = new ArrayList<Cell>();
- boolean next = super.next(dataTableResults, scannerContext);
+ boolean next = super.next(dataTableResults);
addMutations(dataTableResults);
if (ServerUtil.readyToCommit(mutationList.size(), mutationList.byteSize(), maxBatchSize, maxBatchSizeBytes)||!next) {
region.batchMutate(mutationList.toArray(new Mutation[mutationList.size()]), HConstants.NO_NONCE,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8dfc88db/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
new file mode 100644
index 0000000..126e0b1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+
+import java.util.List;
+
+/**
+ * @ScannerContext has all methods package visible. To properly update the context progress for our scanners we
+ * need this helper
+ */
+public class ScannerContextUtil {
+ public static void incrementSizeProgress(ScannerContext sc, List<Cell> cells) {
+ for (Cell cell : cells) {
+ sc.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));
+ }
+ }
+
+ public static void updateTimeProgress(ScannerContext sc) {
+ sc.updateTimeProgress();
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8dfc88db/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
index b5e9c9f..945c1c4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
@@ -41,7 +41,7 @@ public abstract class BaseRegionScanner extends DelegateRegionScanner {
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
- return next(result);
+ throw new IOException("Next with scannerContext should not be called in Phoenix environment");
}
@Override
@@ -56,6 +56,6 @@ public abstract class BaseRegionScanner extends DelegateRegionScanner {
@Override
public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
- return next(result, scannerContext);
+ throw new IOException("NextRaw with scannerContext should not be called in Phoenix environment");
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8dfc88db/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 8239069..1b95058 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
@@ -266,7 +267,10 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
overrideDelegate();
- return super.next(result, scannerContext);
+ boolean res = super.next(result);
+ ScannerContextUtil.incrementSizeProgress(scannerContext, result);
+ ScannerContextUtil.updateTimeProgress(scannerContext);
+ return res;
}
@Override
@@ -278,7 +282,10 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
@Override
public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
overrideDelegate();
- return super.nextRaw(result, scannerContext);
+ boolean res = super.nextRaw(result);
+ ScannerContextUtil.incrementSizeProgress(scannerContext, result);
+ ScannerContextUtil.updateTimeProgress(scannerContext);
+ return res;
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8dfc88db/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
index 0ddabed..95d449a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
@@ -64,7 +64,7 @@ public class DelegateRegionScanner implements RegionScanner {
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
- return delegate.next(result, scannerContext);
+ throw new IOException("Next with scannerContext should not be called in Phoenix environment");
}
@Override
@@ -74,7 +74,7 @@ public class DelegateRegionScanner implements RegionScanner {
@Override
public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
- return delegate.nextRaw(result, scannerContext);
+ throw new IOException("NextRaw with scannerContext should not be called in Phoenix environment");
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8dfc88db/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index 5061d94..59f844d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -276,18 +276,7 @@ public class HashJoinRegionScanner implements RegionScanner {
@Override
public boolean nextRaw(List<Cell> result, ScannerContext scannerContext)
throws IOException {
- try {
- while (shouldAdvance()) {
- hasMore = scanner.nextRaw(result, scannerContext);
- processResults(result, false); // TODO detect if limit used here
- result.clear();
- }
-
- return nextInQueue(result);
- } catch (Throwable t) {
- ServerUtil.throwIOException(env.getRegion().getRegionInfo().getRegionNameAsString(), t);
- return false; // impossible
- }
+ throw new IOException("Next with scannerContext should not be called in Phoenix environment");
}
@Override
@@ -302,33 +291,12 @@ public class HashJoinRegionScanner implements RegionScanner {
@Override
public boolean next(List<Cell> result) throws IOException {
- try {
- while (shouldAdvance()) {
- hasMore = scanner.next(result);
- processResults(result, false);
- result.clear();
- }
-
- return nextInQueue(result);
- } catch (Throwable t) {
- ServerUtil.throwIOException(env.getRegion().getRegionInfo().getRegionNameAsString(), t);
- return false; // impossible
- }
+ throw new IOException("Next should not be used in HashJoin scanner");
}
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
- try {
- while (shouldAdvance()) {
- hasMore = scanner.next(result, scannerContext);
- processResults(result, false); // TODO detect if limit used here
- result.clear();
- }
- return nextInQueue(result);
- } catch (Throwable t) {
- ServerUtil.throwIOException(env.getRegion().getRegionInfo().getRegionNameAsString(), t);
- return false; // impossible
- }
+ throw new IOException("Next with scannerContext should not be called in Phoenix environment");
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8dfc88db/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index 898a573..3dcbef9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
@@ -140,12 +141,7 @@ public abstract class RegionScannerFactory {
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
- try {
- return s.next(result, scannerContext);
- } catch (Throwable t) {
- ServerUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), t);
- return false; // impossible
- }
+ throw new IOException("Next with scannerContext should not be called in Phoenix environment");
}
@Override
@@ -221,45 +217,10 @@ public abstract class RegionScannerFactory {
@Override
public boolean nextRaw(List<Cell> result, ScannerContext scannerContext)
throws IOException {
- try {
- boolean next = s.nextRaw(result, scannerContext);
- Cell arrayElementCell = null;
- if (result.size() == 0) {
- return next;
- }
- if (arrayFuncRefs != null && arrayFuncRefs.length > 0 && arrayKVRefs.size() > 0) {
- int arrayElementCellPosition = replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result);
- arrayElementCell = result.get(arrayElementCellPosition);
- }
- if ((offset > 0 || ScanUtil.isLocalIndex(scan)) && !ScanUtil.isAnalyzeTable(scan)) {
- if(hasReferences && actualStartKey!=null) {
- next = scanTillScanStartRow(s, arrayKVRefs, arrayFuncRefs, result,
- scannerContext, arrayElementCell);
- if (result.isEmpty()) {
- return next;
- }
- }
- /* In the following, c is only used when data region is null.
- dataRegion will never be null in case of non-coprocessor call,
- therefore no need to refactor
- */
- IndexUtil.wrapResultUsingOffset(env, result, offset, dataColumns,
- tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr);
- }
- if (projector != null) {
- Tuple toProject = useQualifierAsListIndex ? new PositionBasedMultiKeyValueTuple(result) : new ResultTuple(Result.create(result));
- Tuple tuple = projector.projectResults(toProject, useNewValueColumnQualifier);
- result.clear();
- result.add(tuple.getValue(0));
- if(arrayElementCell != null)
- result.add(arrayElementCell);
- }
- // There is a scanattribute set to retrieve the specific array element
- return next;
- } catch (Throwable t) {
- ServerUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), t);
- return false; // impossible
- }
+ boolean res = next(result);
+ ScannerContextUtil.incrementSizeProgress(scannerContext, result);
+ ScannerContextUtil.updateTimeProgress(scannerContext);
+ return res;
}
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8dfc88db/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
index ab6b3db..2fb6f14 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
@@ -73,9 +73,7 @@ public class StatisticsScanner implements InternalScanner {
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
- boolean ret = delegate.next(result, scannerContext);
- updateStats(result);
- return ret;
+ return next(result);
}
/**
[3/3] phoenix git commit: PHOENIX-3112 Partial row scan not handled
correctly
Posted by ss...@apache.org.
PHOENIX-3112 Partial row scan not handled correctly
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/aaa41a33
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/aaa41a33
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/aaa41a33
Branch: refs/heads/master
Commit: aaa41a33d025ad6daa832fe8b42fc235e7154648
Parents: bd21ed3
Author: Sergey Soldatov <ss...@apache.org>
Authored: Wed Aug 2 16:56:04 2017 -0700
Committer: Sergey Soldatov <ss...@apache.org>
Committed: Tue Oct 3 00:44:26 2017 -0700
----------------------------------------------------------------------
.../PartialResultServerConfigurationIT.java | 148 ++++++++++++++
.../PartialScannerResultsDisabledIT.java | 193 +++++++++++++++++++
.../DataTableLocalIndexRegionScanner.java | 7 +-
.../hbase/regionserver/ScannerContextUtil.java | 41 ++++
.../phoenix/coprocessor/BaseRegionScanner.java | 4 +-
.../coprocessor/BaseScannerRegionObserver.java | 11 +-
.../coprocessor/DelegateRegionScanner.java | 4 +-
.../coprocessor/HashJoinRegionScanner.java | 38 +---
.../phoenix/iterate/RegionScannerFactory.java | 51 +----
.../phoenix/schema/stats/StatisticsScanner.java | 4 +-
10 files changed, 411 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/aaa41a33/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java
new file mode 100644
index 0000000..1c9ac38
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.query.ConnectionQueryServices.Feature;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
+import static org.apache.phoenix.query.BaseTest.generateUniqueName;
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+import static org.apache.phoenix.query.QueryServices.*;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+
+/**
+ * This is a separate from @PartialResultDisabledIT because it requires server side configuration
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class PartialResultServerConfigurationIT {
+ private static HBaseTestingUtility hbaseTestUtil;
+ private static String zkQuorum;
+ private static String url;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ hbaseTestUtil = new HBaseTestingUtility(conf);
+ setUpConfigForMiniCluster(conf);
+
+ //Enforce the limit of the result, so scans will stop between cells.
+ conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 5);
+ conf.setLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 5);
+
+ hbaseTestUtil.startMiniCluster();
+ zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
+ url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
+
+ DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ try {
+ DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
+ } finally {
+ hbaseTestUtil.shutdownMiniCluster();
+ }
+ }
+
+ /**
+ * This test creates two tables with a single record at the end of each tables that match the join condition
+ * if scanner context is used during the scan, it would produce a partial row with NULL values.
+ * @throws Exception
+ */
+ @Test
+ public void testJoinScan() throws Exception {
+ String tableNameR = generateUniqueName();
+ String tableNameL = generateUniqueName();
+
+ int numRecords = 1000;
+ try (Connection conn = DriverManager.getConnection(url)) {
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableNameR + " (PK1 INTEGER NOT NULL PRIMARY KEY, KV1 VARCHAR)");
+ int i = 0;
+ String upsert = "UPSERT INTO " + tableNameR + " VALUES (?, ?)";
+ Random random = new Random();
+ PreparedStatement stmt = conn.prepareStatement(upsert);
+ while (i < numRecords) {
+ stmt.setInt(1, i);
+ stmt.setString(2, UUID.randomUUID().toString());
+ stmt.executeUpdate();
+ i++;
+ }
+ stmt.setInt(1, 9999);
+ stmt.setString(2, UUID.randomUUID().toString());
+ stmt.executeUpdate();
+ conn.commit();
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableNameL + " (PK1 INTEGER NOT NULL PRIMARY KEY, KV1 VARCHAR)");
+ upsert = "UPSERT INTO " + tableNameL + " VALUES (?, ?)";
+ stmt = conn.prepareStatement(upsert);
+ while (i < numRecords * 2) {
+ stmt.setInt(1, random.nextInt());
+ stmt.setString(2, "KV" + random.nextInt());
+ stmt.executeUpdate();
+ i++;
+ }
+ stmt.setInt(1, 9999);
+ stmt.setString(2, UUID.randomUUID().toString());
+ stmt.executeUpdate();
+ conn.commit();
+
+ String sql = "SELECT * FROM " + tableNameR + " A JOIN " + tableNameL + " B ON A.PK1 = B.PK1";
+ Statement s = conn.createStatement();
+ s.setFetchSize(2);
+ ResultSet rs = s.executeQuery(sql);
+ int count = 0;
+ while (rs.next()) {
+ if (rs.getString(2) == null || rs.getString(4) == null)
+ fail("Null value because of partial result from scan");
+ count++;
+ }
+ assertEquals(count, 1);
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/aaa41a33/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java
new file mode 100644
index 0000000..817b0bd
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.util.IndexScrutiny;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PartialScannerResultsDisabledIT extends ParallelStatsDisabledIT {
+ public static final String TEST_TABLE_DDL =
+ "CREATE TABLE IF NOT EXISTS %s\n" + "(\n" + " ORGANIZATION_ID CHAR(15) NOT NULL,\n"
+ + " FEED_ELEMENT_ID CHAR(15) NOT NULL,\n"
+ + " CONTAINER_ID CHAR(15) NOT NULL,\n"
+ + " FEED_TYPE VARCHAR(1) NOT NULL, \n"
+ + " NETWORK_ID CHAR(15) NOT NULL,\n" + " USER_ID CHAR(15) NOT NULL,\n"
+ + " CREATED_TIME TIMESTAMP,\n" + " LAST_UPDATE TIMESTAMP,\n"
+ + " RELEVANCE_SCORE DOUBLE,\n" + " FEED_ITEM_TYPE VARCHAR(1),\n"
+ + " FEED_ELEMENT_TYPE VARCHAR(1),\n"
+ + " FEED_ELEMENT_IS_SYS_GEN BOOLEAN,\n"
+ + " FEED_ELEMENT_STATUS VARCHAR(1),\n"
+ + " FEED_ELEMENT_VISIBILITY VARCHAR(1),\n" + " PARENT_ID CHAR(15),\n"
+ + " CREATED_BY CHAR(15),\n" + " BEST_COMMENT_ID CHAR(15),\n"
+ + " COMMENT_COUNT INTEGER,\n" + " CONSTRAINT PK PRIMARY KEY\n" + " (\n"
+ + " ORGANIZATION_ID,\n" + " FEED_ELEMENT_ID,\n"
+ + " CONTAINER_ID,\n" + " FEED_TYPE,\n" + " NETWORK_ID,\n"
+ + " USER_ID\n" + " )\n" + ") COLUMN_ENCODED_BYTES = 0";
+
+ public static final String INDEX_1_DDL =
+ "CREATE INDEX IF NOT EXISTS %s\n" + "ON %s (\n" + " NETWORK_ID,\n"
+ + " CONTAINER_ID,\n" + " FEED_TYPE,\n" + " USER_ID,\n"
+ + " CREATED_TIME DESC,\n" + " FEED_ELEMENT_ID DESC,\n"
+ + " CREATED_BY\n" + ") "
+ + " INCLUDE (\n" + " FEED_ITEM_TYPE,\n"
+ + " FEED_ELEMENT_TYPE,\n" + " FEED_ELEMENT_IS_SYS_GEN,\n"
+ + " FEED_ELEMENT_STATUS,\n" + " FEED_ELEMENT_VISIBILITY,\n"
+ + " PARENT_ID,\n" + " BEST_COMMENT_ID,\n" + " COMMENT_COUNT\n" + ")";
+
+ private static final String UPSERT_INTO_DATA_TABLE =
+ "UPSERT INTO %s\n" + "(\n" + " ORGANIZATION_ID,\n" + " FEED_ELEMENT_ID,\n"
+ + " CONTAINER_ID,\n" + " FEED_TYPE,\n" + " NETWORK_ID,\n"
+ + " USER_ID,\n" + " CREATED_TIME,\n" + " LAST_UPDATE,\n"
+ + " FEED_ITEM_TYPE,\n" + " FEED_ELEMENT_TYPE,\n"
+ + " FEED_ELEMENT_IS_SYS_GEN,\n" + " FEED_ELEMENT_STATUS,\n"
+ + " FEED_ELEMENT_VISIBILITY,\n" + " PARENT_ID,\n" + " CREATED_BY,\n"
+ + " BEST_COMMENT_ID,\n" + " COMMENT_COUNT\n" + ")"
+ + "VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
+
+ private String dataTableName;
+ private String indexTableName;
+ private String schemaName;
+ private String dataTableFullName;
+ private static String indexTableFullName;
+ private static final Logger logger = LoggerFactory.getLogger(PartialScannerResultsDisabledIT.class);
+ private static Random random = new Random(1);
+ // background writer threads
+ private static Random sourceOfRandomness = new Random(0);
+ private static AtomicInteger upsertIdCounter = new AtomicInteger(1);
+
+ @Before
+ public void setup() throws Exception {
+ // create the tables
+ generateUniqueTableNames();
+ createTestTable(getUrl(), String.format(TEST_TABLE_DDL, dataTableFullName));
+ createTestTable(getUrl(), String.format(INDEX_1_DDL, indexTableName, dataTableFullName));
+ }
+
+ @Test
+ public void testWithEnoughData() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ // Write enough data to trigger partial scanner results
+ // TODO: it's likely that less data could be written if whatever
+ // config parameters decide this are lowered.
+ writeSingleBatch(conn, 100, 20, dataTableFullName);
+ logger.info("Running scrutiny");
+ // Scutunize index to see if partial results are silently returned
+ // In that case we'll get a false positive on the scrutiny run.
+ long rowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+ assertEquals(2000,rowCount);
+ }
+ }
+
+ /**
+ * Simple select query with fetch size that exceed the result size. In that case scan would start to produce
+ * partial result sets that from Phoenix perspective are the rows with NULL values.
+ * @throws SQLException
+ */
+ @Test
+ public void partialResultDuringSelect () throws SQLException {
+ String tableName = generateUniqueName();
+ Properties props = new Properties();
+ props.setProperty(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, "5");
+ int numRecords = 10;
+ try (Connection conn = DriverManager.getConnection(url, props)) {
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableName + " (PK1 INTEGER NOT NULL PRIMARY KEY, KV1 VARCHAR)");
+ int i = 0;
+ String upsert = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+ PreparedStatement stmt = conn.prepareStatement(upsert);
+ while (i < numRecords) {
+ stmt.setInt(1, i);
+ stmt.setString(2, UUID.randomUUID().toString());
+ stmt.executeUpdate();
+ i++;
+ }
+ conn.commit();
+
+ String sql = "SELECT * FROM " + tableName;
+ // at every next call wait for this period. This will cause lease to expire.
+ Statement s = conn.createStatement();
+ s.setFetchSize(100);
+ ResultSet rs = s.executeQuery(sql);
+ int count = 0;
+ while (rs.next()) {
+ if (rs.getString(2) == null)
+ fail("Null value because of partial row scan");
+ }
+ count++;
+ }
+
+ }
+
+ private static String randString(int length, Random random) {
+ return RandomStringUtils.randomAlphabetic(length);
+ }
+
+ private void writeSingleBatch(Connection connection, int batchSize, int numBatches, String tableName) throws Exception {
+ for (int j = 0; j < numBatches; j++) {
+ try (PreparedStatement statement =
+ connection.prepareStatement(String.format(UPSERT_INTO_DATA_TABLE, tableName))) {
+ for (int i = 0; i < batchSize; i++) {
+ int index = 0;
+ String id = "" + upsertIdCounter.getAndIncrement();
+ statement.setString(++index, id); // ORGANIZATION_ID
+ statement.setString(++index, id); // FEED_ELEMENT_ID,\n"
+ statement.setString(++index, id); // CONTAINER_ID,\n"
+ statement.setString(++index, randString(1, sourceOfRandomness)); // FEED_TYPE,\n"
+ statement.setString(++index, randString(15, sourceOfRandomness)); // NETWORK_ID,\n"
+ statement.setString(++index, randString(15, sourceOfRandomness)); // USER_ID,\n"
+ statement.setTimestamp(++index, new Timestamp(System.currentTimeMillis())); // CREATED_TIME,\n"
+ statement.setTimestamp(++index, new Timestamp(System.currentTimeMillis())); // LAST_UPDATE\n"
+ statement.setString(++index, randString(1, sourceOfRandomness)); // FEED_ITEM_TYPE\n"
+ statement.setString(++index, randString(1, sourceOfRandomness)); // FEED_ELEMENT_TYPE\n"
+ statement.setBoolean(++index, false); // FEED_ELEMENT_IS_SYS_GEN\n"
+ statement.setString(++index, randString(1, sourceOfRandomness)); // FEED_ELEMENT_STATUS\n"
+ statement.setString(++index, randString(1, sourceOfRandomness)); // FEED_ELEMENT_VISIBILITY\n"
+ statement.setString(++index, randString(15, sourceOfRandomness)); // PARENT_ID\n"
+ statement.setString(++index, randString(15, sourceOfRandomness)); // CREATED_BY\n"
+ statement.setString(++index, randString(15, sourceOfRandomness)); // BEST_COMMENT_ID\n"
+ statement.setInt(++index, random.nextInt()); // COMMENT_COUNT\n" + ")"
+ statement.execute();
+ }
+ connection.commit();
+ }
+ }
+ }
+
+ private void generateUniqueTableNames() {
+ schemaName = generateUniqueName();
+ dataTableName = generateUniqueName() + "_DATA";
+ dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ indexTableName = generateUniqueName() + "_IDX";
+ indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/aaa41a33/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java
index 4c44e82..64d4ac4 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java
@@ -85,8 +85,13 @@ public class DataTableLocalIndexRegionScanner extends DelegateRegionScanner {
@Override
public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
+ return next(outResult);
+ }
+
+ @Override
+ public boolean next(List<Cell> results) throws IOException {
List<Cell> dataTableResults = new ArrayList<Cell>();
- boolean next = super.next(dataTableResults, scannerContext);
+ boolean next = super.next(dataTableResults);
addMutations(dataTableResults);
if (ServerUtil.readyToCommit(mutationList.size(), mutationList.byteSize(), maxBatchSize, maxBatchSizeBytes)||!next) {
region.batchMutate(mutationList.toArray(new Mutation[mutationList.size()]), HConstants.NO_NONCE,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/aaa41a33/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
new file mode 100644
index 0000000..126e0b1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+
+import java.util.List;
+
+/**
+ * @ScannerContext has all methods package visible. To properly update the context progress for our scanners we
+ * need this helper
+ */
+public class ScannerContextUtil {
+ public static void incrementSizeProgress(ScannerContext sc, List<Cell> cells) {
+ for (Cell cell : cells) {
+ sc.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));
+ }
+ }
+
+ public static void updateTimeProgress(ScannerContext sc) {
+ sc.updateTimeProgress();
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/aaa41a33/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
index b5e9c9f..945c1c4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
@@ -41,7 +41,7 @@ public abstract class BaseRegionScanner extends DelegateRegionScanner {
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
- return next(result);
+ throw new IOException("Next with scannerContext should not be called in Phoenix environment");
}
@Override
@@ -56,6 +56,6 @@ public abstract class BaseRegionScanner extends DelegateRegionScanner {
@Override
public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
- return next(result, scannerContext);
+ throw new IOException("NextRaw with scannerContext should not be called in Phoenix environment");
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/aaa41a33/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 34361ac..d3b257b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
@@ -267,7 +268,10 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
overrideDelegate();
- return super.next(result, scannerContext);
+ boolean res = super.next(result);
+ ScannerContextUtil.incrementSizeProgress(scannerContext, result);
+ ScannerContextUtil.updateTimeProgress(scannerContext);
+ return res;
}
@Override
@@ -279,7 +283,10 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
@Override
public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
overrideDelegate();
- return super.nextRaw(result, scannerContext);
+ boolean res = super.nextRaw(result);
+ ScannerContextUtil.incrementSizeProgress(scannerContext, result);
+ ScannerContextUtil.updateTimeProgress(scannerContext);
+ return res;
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/aaa41a33/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
index 0ddabed..95d449a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
@@ -64,7 +64,7 @@ public class DelegateRegionScanner implements RegionScanner {
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
- return delegate.next(result, scannerContext);
+ throw new IOException("Next with scannerContext should not be called in Phoenix environment");
}
@Override
@@ -74,7 +74,7 @@ public class DelegateRegionScanner implements RegionScanner {
@Override
public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
- return delegate.nextRaw(result, scannerContext);
+ throw new IOException("NextRaw with scannerContext should not be called in Phoenix environment");
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/aaa41a33/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index 5061d94..59f844d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -276,18 +276,7 @@ public class HashJoinRegionScanner implements RegionScanner {
@Override
public boolean nextRaw(List<Cell> result, ScannerContext scannerContext)
throws IOException {
- try {
- while (shouldAdvance()) {
- hasMore = scanner.nextRaw(result, scannerContext);
- processResults(result, false); // TODO detect if limit used here
- result.clear();
- }
-
- return nextInQueue(result);
- } catch (Throwable t) {
- ServerUtil.throwIOException(env.getRegion().getRegionInfo().getRegionNameAsString(), t);
- return false; // impossible
- }
+ throw new IOException("Next with scannerContext should not be called in Phoenix environment");
}
@Override
@@ -302,33 +291,12 @@ public class HashJoinRegionScanner implements RegionScanner {
@Override
public boolean next(List<Cell> result) throws IOException {
- try {
- while (shouldAdvance()) {
- hasMore = scanner.next(result);
- processResults(result, false);
- result.clear();
- }
-
- return nextInQueue(result);
- } catch (Throwable t) {
- ServerUtil.throwIOException(env.getRegion().getRegionInfo().getRegionNameAsString(), t);
- return false; // impossible
- }
+ throw new IOException("Next should not be used in HashJoin scanner");
}
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
- try {
- while (shouldAdvance()) {
- hasMore = scanner.next(result, scannerContext);
- processResults(result, false); // TODO detect if limit used here
- result.clear();
- }
- return nextInQueue(result);
- } catch (Throwable t) {
- ServerUtil.throwIOException(env.getRegion().getRegionInfo().getRegionNameAsString(), t);
- return false; // impossible
- }
+ throw new IOException("Next with scannerContext should not be called in Phoenix environment");
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/aaa41a33/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index 898a573..3dcbef9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
@@ -140,12 +141,7 @@ public abstract class RegionScannerFactory {
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
- try {
- return s.next(result, scannerContext);
- } catch (Throwable t) {
- ServerUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), t);
- return false; // impossible
- }
+ throw new IOException("Next with scannerContext should not be called in Phoenix environment");
}
@Override
@@ -221,45 +217,10 @@ public abstract class RegionScannerFactory {
@Override
public boolean nextRaw(List<Cell> result, ScannerContext scannerContext)
throws IOException {
- try {
- boolean next = s.nextRaw(result, scannerContext);
- Cell arrayElementCell = null;
- if (result.size() == 0) {
- return next;
- }
- if (arrayFuncRefs != null && arrayFuncRefs.length > 0 && arrayKVRefs.size() > 0) {
- int arrayElementCellPosition = replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result);
- arrayElementCell = result.get(arrayElementCellPosition);
- }
- if ((offset > 0 || ScanUtil.isLocalIndex(scan)) && !ScanUtil.isAnalyzeTable(scan)) {
- if(hasReferences && actualStartKey!=null) {
- next = scanTillScanStartRow(s, arrayKVRefs, arrayFuncRefs, result,
- scannerContext, arrayElementCell);
- if (result.isEmpty()) {
- return next;
- }
- }
- /* In the following, c is only used when data region is null.
- dataRegion will never be null in case of non-coprocessor call,
- therefore no need to refactor
- */
- IndexUtil.wrapResultUsingOffset(env, result, offset, dataColumns,
- tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr);
- }
- if (projector != null) {
- Tuple toProject = useQualifierAsListIndex ? new PositionBasedMultiKeyValueTuple(result) : new ResultTuple(Result.create(result));
- Tuple tuple = projector.projectResults(toProject, useNewValueColumnQualifier);
- result.clear();
- result.add(tuple.getValue(0));
- if(arrayElementCell != null)
- result.add(arrayElementCell);
- }
- // There is a scanattribute set to retrieve the specific array element
- return next;
- } catch (Throwable t) {
- ServerUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), t);
- return false; // impossible
- }
+ boolean res = next(result);
+ ScannerContextUtil.incrementSizeProgress(scannerContext, result);
+ ScannerContextUtil.updateTimeProgress(scannerContext);
+ return res;
}
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/aaa41a33/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
index ab6b3db..2fb6f14 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
@@ -73,9 +73,7 @@ public class StatisticsScanner implements InternalScanner {
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
- boolean ret = delegate.next(result, scannerContext);
- updateStats(result);
- return ret;
+ return next(result);
}
/**
[2/3] phoenix git commit: PHOENIX-3112 Partial row scan not handled
correctly
Posted by ss...@apache.org.
PHOENIX-3112 Partial row scan not handled correctly
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e5a9c72e
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e5a9c72e
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e5a9c72e
Branch: refs/heads/4.x-HBase-1.2
Commit: e5a9c72e368378f35fec8df3abdeb77cae3d9f64
Parents: d231db8
Author: Sergey Soldatov <ss...@apache.org>
Authored: Wed Aug 2 16:56:04 2017 -0700
Committer: Sergey Soldatov <ss...@apache.org>
Committed: Tue Oct 3 00:44:01 2017 -0700
----------------------------------------------------------------------
.../PartialResultServerConfigurationIT.java | 148 ++++++++++++++
.../PartialScannerResultsDisabledIT.java | 193 +++++++++++++++++++
.../DataTableLocalIndexRegionScanner.java | 7 +-
.../hbase/regionserver/ScannerContextUtil.java | 41 ++++
.../phoenix/coprocessor/BaseRegionScanner.java | 4 +-
.../coprocessor/BaseScannerRegionObserver.java | 11 +-
.../coprocessor/DelegateRegionScanner.java | 4 +-
.../coprocessor/HashJoinRegionScanner.java | 38 +---
.../phoenix/iterate/RegionScannerFactory.java | 51 +----
.../phoenix/schema/stats/StatisticsScanner.java | 4 +-
10 files changed, 411 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e5a9c72e/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java
new file mode 100644
index 0000000..1c9ac38
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.query.ConnectionQueryServices.Feature;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
+import static org.apache.phoenix.query.BaseTest.generateUniqueName;
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+import static org.apache.phoenix.query.QueryServices.*;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+
+/**
+ * This is a separate from @PartialResultDisabledIT because it requires server side configuration
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class PartialResultServerConfigurationIT {
+ private static HBaseTestingUtility hbaseTestUtil;
+ private static String zkQuorum;
+ private static String url;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ hbaseTestUtil = new HBaseTestingUtility(conf);
+ setUpConfigForMiniCluster(conf);
+
+ //Enforce the limit of the result, so scans will stop between cells.
+ conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 5);
+ conf.setLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 5);
+
+ hbaseTestUtil.startMiniCluster();
+ zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
+ url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
+
+ DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ try {
+ DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
+ } finally {
+ hbaseTestUtil.shutdownMiniCluster();
+ }
+ }
+
+ /**
+ * This test creates two tables with a single record at the end of each tables that match the join condition
+ * if scanner context is used during the scan, it would produce a partial row with NULL values.
+ * @throws Exception
+ */
+ @Test
+ public void testJoinScan() throws Exception {
+ String tableNameR = generateUniqueName();
+ String tableNameL = generateUniqueName();
+
+ int numRecords = 1000;
+ try (Connection conn = DriverManager.getConnection(url)) {
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableNameR + " (PK1 INTEGER NOT NULL PRIMARY KEY, KV1 VARCHAR)");
+ int i = 0;
+ String upsert = "UPSERT INTO " + tableNameR + " VALUES (?, ?)";
+ Random random = new Random();
+ PreparedStatement stmt = conn.prepareStatement(upsert);
+ while (i < numRecords) {
+ stmt.setInt(1, i);
+ stmt.setString(2, UUID.randomUUID().toString());
+ stmt.executeUpdate();
+ i++;
+ }
+ stmt.setInt(1, 9999);
+ stmt.setString(2, UUID.randomUUID().toString());
+ stmt.executeUpdate();
+ conn.commit();
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableNameL + " (PK1 INTEGER NOT NULL PRIMARY KEY, KV1 VARCHAR)");
+ upsert = "UPSERT INTO " + tableNameL + " VALUES (?, ?)";
+ stmt = conn.prepareStatement(upsert);
+ while (i < numRecords * 2) {
+ stmt.setInt(1, random.nextInt());
+ stmt.setString(2, "KV" + random.nextInt());
+ stmt.executeUpdate();
+ i++;
+ }
+ stmt.setInt(1, 9999);
+ stmt.setString(2, UUID.randomUUID().toString());
+ stmt.executeUpdate();
+ conn.commit();
+
+ String sql = "SELECT * FROM " + tableNameR + " A JOIN " + tableNameL + " B ON A.PK1 = B.PK1";
+ Statement s = conn.createStatement();
+ s.setFetchSize(2);
+ ResultSet rs = s.executeQuery(sql);
+ int count = 0;
+ while (rs.next()) {
+ if (rs.getString(2) == null || rs.getString(4) == null)
+ fail("Null value because of partial result from scan");
+ count++;
+ }
+ assertEquals(count, 1);
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e5a9c72e/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java
new file mode 100644
index 0000000..817b0bd
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.util.IndexScrutiny;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PartialScannerResultsDisabledIT extends ParallelStatsDisabledIT {
+ public static final String TEST_TABLE_DDL =
+ "CREATE TABLE IF NOT EXISTS %s\n" + "(\n" + " ORGANIZATION_ID CHAR(15) NOT NULL,\n"
+ + " FEED_ELEMENT_ID CHAR(15) NOT NULL,\n"
+ + " CONTAINER_ID CHAR(15) NOT NULL,\n"
+ + " FEED_TYPE VARCHAR(1) NOT NULL, \n"
+ + " NETWORK_ID CHAR(15) NOT NULL,\n" + " USER_ID CHAR(15) NOT NULL,\n"
+ + " CREATED_TIME TIMESTAMP,\n" + " LAST_UPDATE TIMESTAMP,\n"
+ + " RELEVANCE_SCORE DOUBLE,\n" + " FEED_ITEM_TYPE VARCHAR(1),\n"
+ + " FEED_ELEMENT_TYPE VARCHAR(1),\n"
+ + " FEED_ELEMENT_IS_SYS_GEN BOOLEAN,\n"
+ + " FEED_ELEMENT_STATUS VARCHAR(1),\n"
+ + " FEED_ELEMENT_VISIBILITY VARCHAR(1),\n" + " PARENT_ID CHAR(15),\n"
+ + " CREATED_BY CHAR(15),\n" + " BEST_COMMENT_ID CHAR(15),\n"
+ + " COMMENT_COUNT INTEGER,\n" + " CONSTRAINT PK PRIMARY KEY\n" + " (\n"
+ + " ORGANIZATION_ID,\n" + " FEED_ELEMENT_ID,\n"
+ + " CONTAINER_ID,\n" + " FEED_TYPE,\n" + " NETWORK_ID,\n"
+ + " USER_ID\n" + " )\n" + ") COLUMN_ENCODED_BYTES = 0";
+
+ public static final String INDEX_1_DDL =
+ "CREATE INDEX IF NOT EXISTS %s\n" + "ON %s (\n" + " NETWORK_ID,\n"
+ + " CONTAINER_ID,\n" + " FEED_TYPE,\n" + " USER_ID,\n"
+ + " CREATED_TIME DESC,\n" + " FEED_ELEMENT_ID DESC,\n"
+ + " CREATED_BY\n" + ") "
+ + " INCLUDE (\n" + " FEED_ITEM_TYPE,\n"
+ + " FEED_ELEMENT_TYPE,\n" + " FEED_ELEMENT_IS_SYS_GEN,\n"
+ + " FEED_ELEMENT_STATUS,\n" + " FEED_ELEMENT_VISIBILITY,\n"
+ + " PARENT_ID,\n" + " BEST_COMMENT_ID,\n" + " COMMENT_COUNT\n" + ")";
+
+ private static final String UPSERT_INTO_DATA_TABLE =
+ "UPSERT INTO %s\n" + "(\n" + " ORGANIZATION_ID,\n" + " FEED_ELEMENT_ID,\n"
+ + " CONTAINER_ID,\n" + " FEED_TYPE,\n" + " NETWORK_ID,\n"
+ + " USER_ID,\n" + " CREATED_TIME,\n" + " LAST_UPDATE,\n"
+ + " FEED_ITEM_TYPE,\n" + " FEED_ELEMENT_TYPE,\n"
+ + " FEED_ELEMENT_IS_SYS_GEN,\n" + " FEED_ELEMENT_STATUS,\n"
+ + " FEED_ELEMENT_VISIBILITY,\n" + " PARENT_ID,\n" + " CREATED_BY,\n"
+ + " BEST_COMMENT_ID,\n" + " COMMENT_COUNT\n" + ")"
+ + "VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
+
+ private String dataTableName;
+ private String indexTableName;
+ private String schemaName;
+ private String dataTableFullName;
+ private static String indexTableFullName;
+ private static final Logger logger = LoggerFactory.getLogger(PartialScannerResultsDisabledIT.class);
+ private static Random random = new Random(1);
+ // background writer threads
+ private static Random sourceOfRandomness = new Random(0);
+ private static AtomicInteger upsertIdCounter = new AtomicInteger(1);
+
+ @Before
+ public void setup() throws Exception {
+ // create the tables
+ generateUniqueTableNames();
+ createTestTable(getUrl(), String.format(TEST_TABLE_DDL, dataTableFullName));
+ createTestTable(getUrl(), String.format(INDEX_1_DDL, indexTableName, dataTableFullName));
+ }
+
+ @Test
+ public void testWithEnoughData() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ // Write enough data to trigger partial scanner results
+ // TODO: it's likely that less data could be written if whatever
+ // config parameters decide this are lowered.
+ writeSingleBatch(conn, 100, 20, dataTableFullName);
+ logger.info("Running scrutiny");
+ // Scutunize index to see if partial results are silently returned
+ // In that case we'll get a false positive on the scrutiny run.
+ long rowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+ assertEquals(2000,rowCount);
+ }
+ }
+
+ /**
+ * Simple select query with fetch size that exceed the result size. In that case scan would start to produce
+ * partial result sets that from Phoenix perspective are the rows with NULL values.
+ * @throws SQLException
+ */
+ @Test
+ public void partialResultDuringSelect () throws SQLException {
+ String tableName = generateUniqueName();
+ Properties props = new Properties();
+ props.setProperty(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, "5");
+ int numRecords = 10;
+ try (Connection conn = DriverManager.getConnection(url, props)) {
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableName + " (PK1 INTEGER NOT NULL PRIMARY KEY, KV1 VARCHAR)");
+ int i = 0;
+ String upsert = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+ PreparedStatement stmt = conn.prepareStatement(upsert);
+ while (i < numRecords) {
+ stmt.setInt(1, i);
+ stmt.setString(2, UUID.randomUUID().toString());
+ stmt.executeUpdate();
+ i++;
+ }
+ conn.commit();
+
+ String sql = "SELECT * FROM " + tableName;
+ // at every next call wait for this period. This will cause lease to expire.
+ Statement s = conn.createStatement();
+ s.setFetchSize(100);
+ ResultSet rs = s.executeQuery(sql);
+ int count = 0;
+ while (rs.next()) {
+ if (rs.getString(2) == null)
+ fail("Null value because of partial row scan");
+ }
+ count++;
+ }
+
+ }
+
+ private static String randString(int length, Random random) {
+ return RandomStringUtils.randomAlphabetic(length);
+ }
+
+ private void writeSingleBatch(Connection connection, int batchSize, int numBatches, String tableName) throws Exception {
+ for (int j = 0; j < numBatches; j++) {
+ try (PreparedStatement statement =
+ connection.prepareStatement(String.format(UPSERT_INTO_DATA_TABLE, tableName))) {
+ for (int i = 0; i < batchSize; i++) {
+ int index = 0;
+ String id = "" + upsertIdCounter.getAndIncrement();
+ statement.setString(++index, id); // ORGANIZATION_ID
+ statement.setString(++index, id); // FEED_ELEMENT_ID,\n"
+ statement.setString(++index, id); // CONTAINER_ID,\n"
+ statement.setString(++index, randString(1, sourceOfRandomness)); // FEED_TYPE,\n"
+ statement.setString(++index, randString(15, sourceOfRandomness)); // NETWORK_ID,\n"
+ statement.setString(++index, randString(15, sourceOfRandomness)); // USER_ID,\n"
+ statement.setTimestamp(++index, new Timestamp(System.currentTimeMillis())); // CREATED_TIME,\n"
+ statement.setTimestamp(++index, new Timestamp(System.currentTimeMillis())); // LAST_UPDATE\n"
+ statement.setString(++index, randString(1, sourceOfRandomness)); // FEED_ITEM_TYPE\n"
+ statement.setString(++index, randString(1, sourceOfRandomness)); // FEED_ELEMENT_TYPE\n"
+ statement.setBoolean(++index, false); // FEED_ELEMENT_IS_SYS_GEN\n"
+ statement.setString(++index, randString(1, sourceOfRandomness)); // FEED_ELEMENT_STATUS\n"
+ statement.setString(++index, randString(1, sourceOfRandomness)); // FEED_ELEMENT_VISIBILITY\n"
+ statement.setString(++index, randString(15, sourceOfRandomness)); // PARENT_ID\n"
+ statement.setString(++index, randString(15, sourceOfRandomness)); // CREATED_BY\n"
+ statement.setString(++index, randString(15, sourceOfRandomness)); // BEST_COMMENT_ID\n"
+ statement.setInt(++index, random.nextInt()); // COMMENT_COUNT\n" + ")"
+ statement.execute();
+ }
+ connection.commit();
+ }
+ }
+ }
+
+ private void generateUniqueTableNames() {
+ schemaName = generateUniqueName();
+ dataTableName = generateUniqueName() + "_DATA";
+ dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ indexTableName = generateUniqueName() + "_IDX";
+ indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e5a9c72e/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java
index 4c44e82..64d4ac4 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java
@@ -85,8 +85,13 @@ public class DataTableLocalIndexRegionScanner extends DelegateRegionScanner {
@Override
public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
+ return next(outResult);
+ }
+
+ @Override
+ public boolean next(List<Cell> results) throws IOException {
List<Cell> dataTableResults = new ArrayList<Cell>();
- boolean next = super.next(dataTableResults, scannerContext);
+ boolean next = super.next(dataTableResults);
addMutations(dataTableResults);
if (ServerUtil.readyToCommit(mutationList.size(), mutationList.byteSize(), maxBatchSize, maxBatchSizeBytes)||!next) {
region.batchMutate(mutationList.toArray(new Mutation[mutationList.size()]), HConstants.NO_NONCE,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e5a9c72e/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
new file mode 100644
index 0000000..126e0b1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+
+import java.util.List;
+
+/**
+ * @ScannerContext has all methods package visible. To properly update the context progress for our scanners we
+ * need this helper
+ */
+public class ScannerContextUtil {
+ public static void incrementSizeProgress(ScannerContext sc, List<Cell> cells) {
+ for (Cell cell : cells) {
+ sc.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));
+ }
+ }
+
+ public static void updateTimeProgress(ScannerContext sc) {
+ sc.updateTimeProgress();
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e5a9c72e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
index b5e9c9f..945c1c4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
@@ -41,7 +41,7 @@ public abstract class BaseRegionScanner extends DelegateRegionScanner {
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
- return next(result);
+ throw new IOException("Next with scannerContext should not be called in Phoenix environment");
}
@Override
@@ -56,6 +56,6 @@ public abstract class BaseRegionScanner extends DelegateRegionScanner {
@Override
public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
- return next(result, scannerContext);
+ throw new IOException("NextRaw with scannerContext should not be called in Phoenix environment");
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e5a9c72e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index d0d8694..0022a0b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
@@ -266,7 +267,10 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
overrideDelegate();
- return super.next(result, scannerContext);
+ boolean res = super.next(result);
+ ScannerContextUtil.incrementSizeProgress(scannerContext, result);
+ ScannerContextUtil.updateTimeProgress(scannerContext);
+ return res;
}
@Override
@@ -278,7 +282,10 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
@Override
public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
overrideDelegate();
- return super.nextRaw(result, scannerContext);
+ boolean res = super.nextRaw(result);
+ ScannerContextUtil.incrementSizeProgress(scannerContext, result);
+ ScannerContextUtil.updateTimeProgress(scannerContext);
+ return res;
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e5a9c72e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
index 0ddabed..95d449a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
@@ -64,7 +64,7 @@ public class DelegateRegionScanner implements RegionScanner {
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
- return delegate.next(result, scannerContext);
+ throw new IOException("Next with scannerContext should not be called in Phoenix environment");
}
@Override
@@ -74,7 +74,7 @@ public class DelegateRegionScanner implements RegionScanner {
@Override
public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
- return delegate.nextRaw(result, scannerContext);
+ throw new IOException("NextRaw with scannerContext should not be called in Phoenix environment");
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e5a9c72e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index 5061d94..59f844d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -276,18 +276,7 @@ public class HashJoinRegionScanner implements RegionScanner {
@Override
public boolean nextRaw(List<Cell> result, ScannerContext scannerContext)
throws IOException {
- try {
- while (shouldAdvance()) {
- hasMore = scanner.nextRaw(result, scannerContext);
- processResults(result, false); // TODO detect if limit used here
- result.clear();
- }
-
- return nextInQueue(result);
- } catch (Throwable t) {
- ServerUtil.throwIOException(env.getRegion().getRegionInfo().getRegionNameAsString(), t);
- return false; // impossible
- }
+ throw new IOException("Next with scannerContext should not be called in Phoenix environment");
}
@Override
@@ -302,33 +291,12 @@ public class HashJoinRegionScanner implements RegionScanner {
@Override
public boolean next(List<Cell> result) throws IOException {
- try {
- while (shouldAdvance()) {
- hasMore = scanner.next(result);
- processResults(result, false);
- result.clear();
- }
-
- return nextInQueue(result);
- } catch (Throwable t) {
- ServerUtil.throwIOException(env.getRegion().getRegionInfo().getRegionNameAsString(), t);
- return false; // impossible
- }
+ throw new IOException("Next should not be used in HashJoin scanner");
}
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
- try {
- while (shouldAdvance()) {
- hasMore = scanner.next(result, scannerContext);
- processResults(result, false); // TODO detect if limit used here
- result.clear();
- }
- return nextInQueue(result);
- } catch (Throwable t) {
- ServerUtil.throwIOException(env.getRegion().getRegionInfo().getRegionNameAsString(), t);
- return false; // impossible
- }
+ throw new IOException("Next with scannerContext should not be called in Phoenix environment");
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e5a9c72e/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index 898a573..3dcbef9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
@@ -140,12 +141,7 @@ public abstract class RegionScannerFactory {
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
- try {
- return s.next(result, scannerContext);
- } catch (Throwable t) {
- ServerUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), t);
- return false; // impossible
- }
+ throw new IOException("Next with scannerContext should not be called in Phoenix environment");
}
@Override
@@ -221,45 +217,10 @@ public abstract class RegionScannerFactory {
@Override
public boolean nextRaw(List<Cell> result, ScannerContext scannerContext)
throws IOException {
- try {
- boolean next = s.nextRaw(result, scannerContext);
- Cell arrayElementCell = null;
- if (result.size() == 0) {
- return next;
- }
- if (arrayFuncRefs != null && arrayFuncRefs.length > 0 && arrayKVRefs.size() > 0) {
- int arrayElementCellPosition = replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result);
- arrayElementCell = result.get(arrayElementCellPosition);
- }
- if ((offset > 0 || ScanUtil.isLocalIndex(scan)) && !ScanUtil.isAnalyzeTable(scan)) {
- if(hasReferences && actualStartKey!=null) {
- next = scanTillScanStartRow(s, arrayKVRefs, arrayFuncRefs, result,
- scannerContext, arrayElementCell);
- if (result.isEmpty()) {
- return next;
- }
- }
- /* In the following, c is only used when data region is null.
- dataRegion will never be null in case of non-coprocessor call,
- therefore no need to refactor
- */
- IndexUtil.wrapResultUsingOffset(env, result, offset, dataColumns,
- tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr);
- }
- if (projector != null) {
- Tuple toProject = useQualifierAsListIndex ? new PositionBasedMultiKeyValueTuple(result) : new ResultTuple(Result.create(result));
- Tuple tuple = projector.projectResults(toProject, useNewValueColumnQualifier);
- result.clear();
- result.add(tuple.getValue(0));
- if(arrayElementCell != null)
- result.add(arrayElementCell);
- }
- // There is a scanattribute set to retrieve the specific array element
- return next;
- } catch (Throwable t) {
- ServerUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), t);
- return false; // impossible
- }
+ boolean res = next(result);
+ ScannerContextUtil.incrementSizeProgress(scannerContext, result);
+ ScannerContextUtil.updateTimeProgress(scannerContext);
+ return res;
}
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e5a9c72e/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
index ab6b3db..2fb6f14 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
@@ -73,9 +73,7 @@ public class StatisticsScanner implements InternalScanner {
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
- boolean ret = delegate.next(result, scannerContext);
- updateStats(result);
- return ret;
+ return next(result);
}
/**