You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2019/06/29 20:07:25 UTC
[phoenix] branch master updated: PHOENIX-5373 GlobalIndexChecker
should treat the rows created by the previous design as unverified
This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new daffc17 PHOENIX-5373 GlobalIndexChecker should treat the rows created by the previous design as unverified
daffc17 is described below
commit daffc17f46c4a7425225a676c4e321be43280e30
Author: Kadir <ko...@salesforce.com>
AuthorDate: Tue Jun 25 18:39:21 2019 -0700
PHOENIX-5373 GlobalIndexChecker should treat the rows created by the previous design as unverified
---
.../apache/phoenix/end2end/CsvBulkLoadToolIT.java | 8 +++++++-
.../apache/phoenix/end2end/IndexExtendedIT.java | 12 +++++------
.../org/apache/phoenix/end2end/IndexToolIT.java | 13 +++++++++---
.../phoenix/end2end/RegexBulkLoadToolIT.java | 8 ++++++--
.../org/apache/phoenix/util/IndexScrutinyIT.java | 5 +++--
.../apache/phoenix/index/GlobalIndexChecker.java | 24 +++++++++++-----------
.../phoenix/query/ConnectionQueryServicesImpl.java | 3 +++
.../apache/phoenix/query/QueryServicesOptions.java | 2 +-
8 files changed, 48 insertions(+), 27 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
index 699b469..f91956c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
@@ -29,14 +29,18 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
+import java.util.Map;
+import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.CsvBulkLoadTool;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.util.DateUtil;
@@ -53,7 +57,9 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT {
@BeforeClass
public static void doSetup() throws Exception {
- setUpTestDriver(ReadOnlyProps.EMPTY_PROPS);
+ Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
+ clientProps.put(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB, Boolean.FALSE.toString());
+ setUpTestDriver(ReadOnlyProps.EMPTY_PROPS, new ReadOnlyProps(clientProps.entrySet().iterator()));
zkQuorum = TestUtil.LOCALHOST + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + getUtility().getZkCluster().getClientPort();
conn = DriverManager.getConnection(getUrl());
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexExtendedIT.java
index 6af4d78..052a70e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexExtendedIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexExtendedIT.java
@@ -88,16 +88,16 @@ public class IndexExtendedIT extends BaseTest {
@Parameters(name="mutable = {0} , localIndex = {1}, directApi = {2}, useSnapshot = {3}")
public static Collection<Boolean[]> data() {
- List<Boolean[]> list = Lists.newArrayListWithExpectedSize(16);
+ List<Boolean[]> list = Lists.newArrayListWithExpectedSize(10);
boolean[] Booleans = new boolean[]{false, true};
for (boolean mutable : Booleans ) {
- for (boolean localIndex : Booleans ) {
- for (boolean directApi : Booleans ) {
- for (boolean useSnapshot : Booleans ) {
- list.add(new Boolean[]{ mutable, localIndex, directApi, useSnapshot});
- }
+ for (boolean directApi : Booleans ) {
+ for (boolean useSnapshot : Booleans) {
+ list.add(new Boolean[]{mutable, true, directApi, useSnapshot});
}
}
+ // Due to PHOENIX-5375 and PHOENIX-5376, the useSnapshot and bulk load options are ignored for global indexes
+ list.add(new Boolean[]{ mutable, false, true, false});
}
return list;
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 868ba35..bdfa20f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -134,9 +134,16 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
|| !TransactionFactory.getTransactionProvider(
TransactionFactory.Provider.valueOf(transactionProvider))
.isUnsupported(Feature.ALLOW_LOCAL_INDEX)) {
- for (boolean directApi : Booleans) {
- list.add(new Object[] { transactionProvider, mutable, localIndex,
- directApi, false, false});
+ if (localIndex) {
+ for (boolean directApi : Booleans) {
+ list.add(new Object[]{transactionProvider, mutable, localIndex,
+ directApi, false, false});
+ }
+ }
+ else {
+ // Due to PHOENIX-5376, the bulk load option is ignored for global indexes
+ list.add(new Object[]{transactionProvider, mutable, localIndex,
+ true, false, false});
}
}
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/RegexBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RegexBulkLoadToolIT.java
index 47b0db7..2b2918a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/RegexBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RegexBulkLoadToolIT.java
@@ -40,6 +40,7 @@ import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.TestUtil;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
public class RegexBulkLoadToolIT extends BaseOwnClusterIT {
@@ -171,10 +172,12 @@ public class RegexBulkLoadToolIT extends BaseOwnClusterIT {
rs.close();
stmt.close();
}
-
+ // Due to PHOENIX-5376, the bulk load option is ignored for global indexes
+ @Ignore
@Test
public void testImportWithIndex() throws Exception {
+
Statement stmt = conn.createStatement();
stmt.execute("CREATE TABLE TABLE3 (ID INTEGER NOT NULL PRIMARY KEY, " +
"FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
@@ -244,7 +247,8 @@ public class RegexBulkLoadToolIT extends BaseOwnClusterIT {
rs.close();
stmt.close();
}
-
+ // Due to PHOENIX-5376, the bulk load option is ignored for global indexes
+ @Ignore
@Test
public void testImportOneIndexTable() throws Exception {
testImportOneIndexTable("TABLE4", false);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
index e7e27a9..20ec965 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
@@ -90,14 +90,15 @@ public class IndexScrutinyIT extends ParallelStatsDisabledIT {
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb','0')");
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','1')");
conn.commit();
-
+
+ // Writing index directly will generate unverified rows with no corresponding data rows. These rows will not be visible to the applications
conn.createStatement().executeUpdate("UPSERT INTO " + fullIndexName + " VALUES ('ccc','a','2')");
conn.commit();
try {
IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
fail();
} catch (AssertionError e) {
- assertEquals("Expected equality for V2, but '2'!='1'", e.getMessage());
+ assertEquals("Expected data table row count to match expected:<2> but was:<1>", e.getMessage());
}
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index 2a9f27a..f9056bd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -21,7 +21,7 @@ import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CHECK_VER
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME;
-import static org.apache.phoenix.hbase.index.IndexRegionObserver.UNVERIFIED_BYTES;
+import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
import static org.apache.phoenix.index.IndexMaintainer.getIndexMaintainer;
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
@@ -33,7 +33,9 @@ import java.util.Optional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
@@ -52,6 +54,8 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.hbase.index.table.HTableFactory;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.util.EnvironmentEdgeManager;
@@ -66,12 +70,13 @@ import org.apache.phoenix.util.ServerUtil;
*/
public class GlobalIndexChecker implements RegionCoprocessor, RegionObserver {
private static final Log LOG = LogFactory.getLog(GlobalIndexChecker.class);
+
/**
* Class that verifies a given row of a non-transactional global index.
* An instance of this class is created for each scanner on an index
* and used to verify individual rows and rebuild them if they are not valid
*/
- private static class GlobalIndexScanner implements RegionScanner {
+ private class GlobalIndexScanner implements RegionScanner {
RegionScanner scanner;
private long ageThreshold;
private int repairCount;
@@ -212,8 +217,8 @@ public class GlobalIndexChecker implements RegionCoprocessor, RegionObserver {
indexScan = new Scan(scan);
byte[] dataTableName = scan.getAttribute(PHYSICAL_DATA_TABLE_NAME);
byte[] indexTableName = region.getRegionInfo().getTable().getName();
- dataHTable = ServerUtil.getHTableForCoprocessorScan(env,
- SchemaUtil.getPhysicalTableName(dataTableName, env.getConfiguration()));
+ dataHTable = ServerUtil.ConnectionFactory.getConnection(ServerUtil.ConnectionType.DEFAULT_SERVER_CONNECTION,
+ env).getTable(TableName.valueOf(dataTableName));
if (indexMaintainer == null) {
byte[] md = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(md, true);
@@ -293,8 +298,8 @@ public class GlobalIndexChecker implements RegionCoprocessor, RegionObserver {
LOG.warn("The empty column does not exist in a row in " + region.getRegionInfo().getTable().getNameAsString());
return false;
}
- if (Bytes.compareTo(result.getValue(emptyCF, emptyCQ), 0, UNVERIFIED_BYTES.length,
- UNVERIFIED_BYTES, 0, UNVERIFIED_BYTES.length) == 0) {
+ if (Bytes.compareTo(result.getValue(emptyCF, emptyCQ), 0, VERIFIED_BYTES.length,
+ VERIFIED_BYTES, 0, VERIFIED_BYTES.length) != 0) {
return false;
}
return true;
@@ -310,12 +315,8 @@ public class GlobalIndexChecker implements RegionCoprocessor, RegionObserver {
while (cellIterator.hasNext()) {
cell = cellIterator.next();
if (isEmptyColumn(cell)) {
- // Before PHOENIX-5156, the empty column value was set to 'x'. With PHOENIX-5156, it is now
- // set to VERIFIED (1) and UNVERIFIED (2). In order to skip the index rows that are inserted before PHOENIX-5156
- // we consider anything that is not UNVERIFIED means VERIFIED. IndexTool should be used to
- // rebuild old rows to ensure their correctness after the PHOENIX-5156 upgrade
if (Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
- UNVERIFIED_BYTES, 0, UNVERIFIED_BYTES.length) == 0) {
+ VERIFIED_BYTES, 0, VERIFIED_BYTES.length) != 0) {
return false;
}
// Empty column is not supposed to be returned to the client except it is the only column included
@@ -382,5 +383,4 @@ public class GlobalIndexChecker implements RegionCoprocessor, RegionObserver {
}
return new GlobalIndexScanner(c.getEnvironment(), scan, s);
}
-
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 133737f..9bc517b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -972,6 +972,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
Indexer.enableIndexing(builder, PhoenixIndexBuilder.class, opts, priority);
}
+ if (newDesc.hasCoprocessor(IndexRegionObserver.class.getName())) {
+ builder.removeCoprocessor(IndexRegionObserver.class.getName());
+ }
}
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 65189fc..6434d22 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -355,7 +355,7 @@ public class QueryServicesOptions {
public static final long DEFAULT_TASK_HANDLING_INITIAL_DELAY_MS = 10*1000; // 10 sec
public static final long DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS = 10*60*1000; /* 10 min */
- public static final int DEFAULT_GLOBAL_INDEX_REPAIR_COUNT = DEFAULT_MUTATE_BATCH_SIZE;
+ public static final int DEFAULT_GLOBAL_INDEX_REPAIR_COUNT = 1;
public static final boolean DEFAULT_INDEX_REGION_OBSERVER_ENABLED = true;
public static final boolean DEFAULT_ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK = false;