You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2019/07/01 21:22:38 UTC
[phoenix] branch 4.14-HBase-1.3 updated: PHOENIX-5211 Consistent
Immutable Global Indexes for Non-Transactional Tables
This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch 4.14-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.14-HBase-1.3 by this push:
new 36782e2 PHOENIX-5211 Consistent Immutable Global Indexes for Non-Transactional Tables
36782e2 is described below
commit 36782e2b8117b48de2279664e479dd17866c2706
Author: Gokcen Iskender <gi...@salesforce.com>
AuthorDate: Tue Jun 11 16:13:25 2019 -0700
PHOENIX-5211 Consistent Immutable Global Indexes for Non-Transactional Tables
---
.../end2end/IndexToolForPartialBuildIT.java | 90 ++--
.../org/apache/phoenix/end2end/IndexToolIT.java | 7 +-
.../phoenix/end2end/index/ImmutableIndexIT.java | 193 +++++++-
.../org/apache/phoenix/execute/MutationState.java | 493 +++++++++++++--------
.../phoenix/hbase/index/IndexRegionObserver.java | 2 +-
.../org/apache/phoenix/index/IndexMaintainer.java | 1 +
.../phoenix/query/ConnectionQueryServicesImpl.java | 3 +-
.../apache/phoenix/query/QueryServicesOptions.java | 1 +
.../java/org/apache/phoenix/util/IndexUtil.java | 63 ++-
.../java/org/apache/phoenix/util/TestUtil.java | 35 +-
10 files changed, 612 insertions(+), 276 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
index 19ffe1a..a0956f2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
@@ -17,13 +17,10 @@
*/
package org.apache.phoenix.end2end;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -34,21 +31,18 @@ import java.sql.Statement;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Set;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
-import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.index.IndexTool;
-import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PIndexState;
@@ -60,6 +54,7 @@ import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.StringUtil;
+import org.apache.phoenix.util.TestUtil;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -83,7 +78,6 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
public static Map<String, String> getServerProperties() {
Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
- serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName());
serverProps.put(" yarn.scheduler.capacity.maximum-am-resource-percent", "1.0");
serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
@@ -105,14 +99,14 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String fullTableName = SchemaUtil.getTableName(schemaName, dataTableName);
- final String indxTable = String.format("%s_%s", dataTableName, FailingRegionObserver.INDEX_NAME);
+ final String indxTable = String.format("%s_IDX", dataTableName);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
props.setProperty(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
props.setProperty(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, Boolean.FALSE.toString());
props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceEnabled));
final Connection conn = DriverManager.getConnection(getUrl(), props);
Statement stmt = conn.createStatement();
- try {
+ try (Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();){
if (isNamespaceEnabled) {
conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
}
@@ -121,25 +115,36 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
fullTableName, tableDDLOptions));
String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", fullTableName);
PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
- FailingRegionObserver.FAIL_WRITE = false;
// insert two rows
upsertRow(stmt1, 1000);
upsertRow(stmt1, 2000);
conn.commit();
stmt.execute(String.format("CREATE INDEX %s ON %s (LPAD(UPPER(NAME),11,'x')||'_xyz') ", indxTable, fullTableName));
- FailingRegionObserver.FAIL_WRITE = true;
upsertRow(stmt1, 3000);
upsertRow(stmt1, 4000);
upsertRow(stmt1, 5000);
- try {
- conn.commit();
- fail();
- } catch (SQLException e) {} catch (Exception e) {}
+ conn.commit();
+
+ // delete these indexes
+ PTable pindexTable = PhoenixRuntime.getTable(conn, SchemaUtil.getTableName(schemaName, indxTable));
+ try (Table hTable = admin.getConnection().
+ getTable(TableName.valueOf(pindexTable.getPhysicalName().toString()));) {
+ Scan scan = new Scan();
+ ResultScanner scanner = hTable.getScanner(scan);
+ int cnt = 0;
+ for (Result res = scanner.next(); res != null; res = scanner.next()) {
+ cnt++;
+ if (cnt > 2) {
+ hTable.delete(new Delete(res.getRow()));
+ }
+ }
+ }
+ TestUtil.doMajorCompaction(conn, pindexTable.getPhysicalName().toString());
+
conn.createStatement()
.execute(String.format("ALTER INDEX %s on %s REBUILD ASYNC", indxTable, fullTableName));
- FailingRegionObserver.FAIL_WRITE = false;
ResultSet rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), indxTable,
new String[] { PTableType.INDEX.toString() });
assertTrue(rs.next());
@@ -150,21 +155,6 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
upsertRow(stmt1, 6000);
upsertRow(stmt1, 7000);
conn.commit();
-
- rs = conn.createStatement()
- .executeQuery(String.format("SELECT " + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + ","
- + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " FROM "
- +"\""+ SYSTEM_CATALOG_SCHEMA + "\"." + SYSTEM_CATALOG_TABLE + " ("
- + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " bigint) where "
- + PhoenixDatabaseMetaData.TABLE_SCHEM + "='" + schemaName + "' and "
- + PhoenixDatabaseMetaData.TABLE_NAME + "='" + indxTable + "'"));
- rs.next();
- PTable pindexTable = PhoenixRuntime.getTable(conn, SchemaUtil.getTableName(schemaName, indxTable));
- assertEquals(PIndexState.BUILDING, pindexTable.getIndexState());
- assertEquals(rs.getLong(1), pindexTable.getTimeStamp());
-
- //assert disabled timestamp
- assertEquals(0, rs.getLong(2));
String selectSql = String.format("SELECT LPAD(UPPER(NAME),11,'x')||'_xyz',ID FROM %s", fullTableName);
rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
@@ -173,7 +163,7 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
// assert we are pulling from data table.
assertExplainPlan(actualExplainPlan, schemaName, dataTableName, null, isNamespaceEnabled);
- rs = stmt1.executeQuery(selectSql);
+ rs = stmt1.executeQuery(selectSql);
for (int i = 1; i <= 7; i++) {
assertTrue(rs.next());
assertEquals("xxUNAME" + i*1000 + "_xyz", rs.getString(1));
@@ -260,24 +250,4 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
stmt.executeUpdate();
}
-
- public static class FailingRegionObserver extends SimpleRegionObserver {
- public static volatile boolean FAIL_WRITE = false;
- public static final String INDEX_NAME = "IDX";
- @Override
- public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
- if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) {
- throw new DoNotRetryIOException();
- }
- Mutation operation = miniBatchOp.getOperation(0);
- Set<byte[]> keySet = operation.getFamilyMap().keySet();
- for(byte[] family: keySet) {
- if(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX) && FAIL_WRITE) {
- throw new DoNotRetryIOException();
- }
- }
- }
-
- }
-
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 9b248e1..63c7337 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -63,6 +63,7 @@ import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -136,14 +137,14 @@ public class IndexToolIT extends ParallelStatsEnabledIT {
}
}
else {
- // Due to PHOENIX-5375 and PHOENIX-5376, the useSnapshot and bulk load option are ignored for global indexes
- list.add(new Boolean[]{transactional, mutable, false, true, false, false});
+ // Due to PHOENIX-5375 and PHOENIX-5376, the snapshot and bulk load options are ignored for global indexes
+ list.add(new Boolean[]{transactional, mutable, localIndex, true, false, false});
}
}
}
}
// Add the usetenantId
- list.add(new Boolean[] { false, false, false, true, false, true});
+ list.add(new Boolean[] { null, false, false, true, false, true});
return list;
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
index 1db9787..0a409d9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
@@ -17,15 +17,22 @@
*/
package org.apache.phoenix.end2end.index;
+import static org.apache.phoenix.end2end.IndexToolIT.assertExplainPlan;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME;
+import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.getRowCount;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
@@ -40,23 +47,44 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.transaction.PhoenixTransactionProvider;
+import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
+import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
@@ -104,9 +132,12 @@ public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT {
public static void doSetup() throws Exception {
Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(1);
serverProps.put("hbase.coprocessor.region.classes", CreateIndexRegionObserver.class.getName());
- Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
+ Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(5);
clientProps.put(QueryServices.TRANSACTIONS_ENABLED, "true");
clientProps.put(QueryServices.INDEX_POPULATION_SLEEP_TIME, "15000");
+ clientProps.put(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB, "true");
+ clientProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "1");
+ clientProps.put(HConstants.HBASE_CLIENT_PAUSE, "1");
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
}
@@ -254,7 +285,165 @@ public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT {
iterator.next();
assertEquals(!localIndex, iterator.hasNext());
}
-
+
+ private void createAndPopulateTableAndIndexForConsistentIndex(Connection conn, String tableName, String indexName,
+ int numOfRowsToInsert, String storageProps)
+ throws Exception {
+ String tableOptions = tableDDLOptions;
+ if (storageProps != null) {
+ tableOptions += " ,IMMUTABLE_STORAGE_SCHEME=" + storageProps;
+ }
+ String ddl = "CREATE TABLE " + tableName + TestUtil.TEST_TABLE_SCHEMA + tableOptions;
+ INDEX_DDL =
+ "CREATE " + " INDEX IF NOT EXISTS " + SchemaUtil.getTableNameFromFullName(indexName)
+ + " ON " + tableName + " (long_pk, varchar_pk)"
+ + " INCLUDE (long_col1, long_col2) ";
+
+ conn.createStatement().execute(ddl);
+ conn.createStatement().execute(INDEX_DDL);
+ upsertRows(conn, tableName, numOfRowsToInsert);
+ conn.commit();
+
+ TestUtil.waitForIndexState(conn, indexName, PIndexState.ACTIVE);
+ }
+
+ @Test
+ public void testGlobalImmutableIndexCreate() throws Exception {
+ if (localIndex || transactional) {
+ return;
+ }
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+ ArrayList<String> immutableStorageProps = new ArrayList<String>();
+ immutableStorageProps.add(null);
+ if (!tableDDLOptions.contains(IMMUTABLE_STORAGE_SCHEME)) {
+ immutableStorageProps.add(SINGLE_CELL_ARRAY_WITH_OFFSETS.toString());
+ }
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ for (String storageProp : immutableStorageProps) {
+ String tableName = "TBL_" + generateUniqueName();
+ String indexName = "IND_" + generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+ String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+ TABLE_NAME = fullTableName;
+ int numRows = 1;
+ createAndPopulateTableAndIndexForConsistentIndex(conn, fullTableName, fullIndexName,
+ numRows, storageProp);
+
+ ResultSet rs;
+ rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ COUNT(*) FROM " + TABLE_NAME);
+ assertTrue(rs.next());
+ assertEquals(numRows, rs.getInt(1));
+ rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName);
+ assertTrue(rs.next());
+ assertEquals(numRows, rs.getInt(1));
+ assertEquals(true, verifyRowsForEmptyColValue(conn, fullIndexName,
+ IndexRegionObserver.VERIFIED_BYTES));
+
+ // Now try to fail Phase1 and observe that index state is not DISABLED
+ try (Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();) {
+ admin.disableTable(TableName.valueOf(fullIndexName));
+ boolean isWriteOnDisabledIndexFailed = false;
+ try {
+ upsertRows(conn, fullTableName, numRows);
+ } catch (SQLException ex) {
+ isWriteOnDisabledIndexFailed = true;
+ }
+ assertEquals(true, isWriteOnDisabledIndexFailed);
+ PIndexState indexState = TestUtil.getIndexState(conn, fullIndexName);
+ assertEquals(PIndexState.ACTIVE, indexState);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testGlobalImmutableIndexDelete() throws Exception {
+ if (localIndex || transactional) {
+ return;
+ }
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = "TBL_" + generateUniqueName();
+ String indexName = "IND_" + generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+ String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+ TABLE_NAME = fullTableName;
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();) {
+ conn.setAutoCommit(true);
+ int numRows = 2;
+ createAndPopulateTableAndIndexForConsistentIndex(conn, fullTableName, fullIndexName, numRows, null);
+
+ String dml = "DELETE from " + fullTableName + " WHERE varchar_pk='varchar1'";
+ conn.createStatement().execute(dml);
+ conn.commit();
+ ResultSet rs;
+ rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ COUNT(*) FROM " + TABLE_NAME);
+ assertTrue(rs.next());
+ assertEquals(numRows - 1, rs.getInt(1));
+ rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName);
+ assertTrue(rs.next());
+ assertEquals(numRows - 1, rs.getInt(1));
+ assertEquals(true, verifyRowsForEmptyColValue(conn, fullIndexName, IndexRegionObserver.VERIFIED_BYTES));
+
+ // Force delete to fail (data removed but operation failed) on data table and check index table row remains as unverified
+ TestUtil.addCoprocessor(conn, fullTableName, DeleteFailingRegionObserver.class);
+ dml = "DELETE from " + fullTableName + " WHERE varchar_pk='varchar2'";
+ boolean isDeleteFailed = false;
+ try {
+ conn.createStatement().execute(dml);
+ } catch (Exception ex) {
+ isDeleteFailed = true;
+ }
+ assertEquals(true, isDeleteFailed);
+ TestUtil.removeCoprocessor(conn, fullTableName, DeleteFailingRegionObserver.class);
+ assertEquals(numRows - 1, getRowCount(conn.unwrap(PhoenixConnection.class).getQueryServices()
+ .getTable(Bytes.toBytes(fullIndexName)), false));
+ assertEquals(true, verifyRowsForEmptyColValue(conn, fullIndexName, IndexRegionObserver.UNVERIFIED_BYTES));
+
+ // Now delete via hbase, read from unverified index and see that we don't get any data
+ admin.disableTable(TableName.valueOf(fullTableName));
+ admin.truncateTable(TableName.valueOf(fullTableName), true);
+ String selectFromIndex = "SELECT long_pk, varchar_pk, long_col1 FROM " + TABLE_NAME + " WHERE varchar_pk='varchar2' AND long_pk=2";
+ rs =
+ conn.createStatement().executeQuery(
+ "EXPLAIN " + selectFromIndex);
+ String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+ assertExplainPlan(false, actualExplainPlan, fullTableName, fullIndexName);
+
+ rs = conn.createStatement().executeQuery(selectFromIndex);
+ assertFalse(rs.next());
+ }
+ }
+
+ public static class DeleteFailingRegionObserver extends SimpleRegionObserver {
+ @Override
+ public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws
+ IOException {
+ throw new DoNotRetryIOException();
+ }
+ }
+
+ public static boolean verifyRowsForEmptyColValue(Connection conn, String tableName, byte[] valueBytes)
+ throws IOException, SQLException {
+ PTable table = PhoenixRuntime.getTable(conn, tableName);
+ byte[] emptyCF = SchemaUtil.getEmptyColumnFamily(table);
+ byte[] emptyCQ = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
+ HTable htable = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(table.getPhysicalName().getBytes());
+ Scan scan = new Scan();
+ scan.addColumn(emptyCF, emptyCQ);
+ ResultScanner resultScanner = htable.getScanner(scan);
+
+ for (Result result = resultScanner.next(); result != null; result = resultScanner.next()) {
+ if (Bytes.compareTo(result.getValue(emptyCF, emptyCQ), 0, valueBytes.length,
+ valueBytes, 0, valueBytes.length) != 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+
// This test is know to flap. We need PHOENIX-2582 to be fixed before enabling this back.
@Ignore
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index e54e892..1b84549 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -30,6 +30,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -40,7 +41,10 @@ import javax.annotation.concurrent.Immutable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
@@ -53,6 +57,7 @@ import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.hbase.index.exception.IndexWriteException;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMaintainer;
@@ -92,10 +97,14 @@ import org.apache.phoenix.transaction.PhoenixTransactionContext;
import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.transaction.TransactionFactory.Provider;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.KeyValueUtil;
import org.apache.phoenix.util.LogUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.SQLCloseable;
+import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.SizedUtil;
import org.apache.phoenix.util.TransactionUtil;
@@ -891,13 +900,20 @@ public class MutationState implements SQLCloseable {
continue;
}
// Validate as we go if transactional since we can undo if a problem occurs (which is unlikely)
- long serverTimestamp = serverTimeStamps == null ? validateAndGetServerTimestamp(tableRef,
- multiRowMutationState) : serverTimeStamps[i++];
- Long scn = connection.getSCN();
- long mutationTimestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
+ long
+ serverTimestamp =
+ serverTimeStamps == null ?
+ validateAndGetServerTimestamp(tableRef, multiRowMutationState) :
+ serverTimeStamps[i++];
final PTable table = tableRef.getTable();
- Iterator<Pair<PName, List<Mutation>>> mutationsIterator = addRowMutations(tableRef,
- multiRowMutationState, mutationTimestamp, serverTimestamp, false, sendAll);
+ Long scn = connection.getSCN();
+ long mutationTimestamp = scn == null ?
+ (table.isTransactional() == true ? HConstants.LATEST_TIMESTAMP : EnvironmentEdgeManager.currentTimeMillis())
+ : scn;
+ Iterator<Pair<PName, List<Mutation>>>
+ mutationsIterator =
+ addRowMutations(tableRef, multiRowMutationState, mutationTimestamp,
+ serverTimestamp, false, sendAll);
// build map from physical table to mutation list
boolean isDataTable = true;
while (mutationsIterator.hasNext()) {
@@ -905,7 +921,9 @@ public class MutationState implements SQLCloseable {
PName hTableName = pair.getFirst();
List<Mutation> mutationList = pair.getSecond();
TableInfo tableInfo = new TableInfo(isDataTable, hTableName, tableRef);
- List<Mutation> oldMutationList = physicalTableMutationMap.put(tableInfo, mutationList);
+ List<Mutation>
+ oldMutationList =
+ physicalTableMutationMap.put(tableInfo, mutationList);
if (oldMutationList != null) mutationList.addAll(0, oldMutationList);
isDataTable = false;
}
@@ -926,215 +944,304 @@ public class MutationState implements SQLCloseable {
joinMutationState(new TableRef(tableRef), multiRowMutationState, txMutations);
}
}
- long serverTimestamp = HConstants.LATEST_TIMESTAMP;
- Iterator<Entry<TableInfo, List<Mutation>>> mutationsIterator = physicalTableMutationMap.entrySet()
- .iterator();
- while (mutationsIterator.hasNext()) {
- Entry<TableInfo, List<Mutation>> pair = mutationsIterator.next();
- TableInfo tableInfo = pair.getKey();
- byte[] htableName = tableInfo.getHTableName().getBytes();
- List<Mutation> mutationList = pair.getValue();
- List<List<Mutation>> mutationBatchList =
- getMutationBatchList(batchSize, batchSizeBytes, mutationList);
-
- // create a span per target table
- // TODO maybe we can be smarter about the table name to string here?
- Span child = Tracing.child(span, "Writing mutation batch for table: " + Bytes.toString(htableName));
-
- int retryCount = 0;
- boolean shouldRetry = false;
- long numMutations = 0;
- long mutationSizeBytes = 0;
- long mutationCommitTime = 0;
- long numFailedMutations = 0;
- ;
- long startTime = 0;
- boolean shouldRetryIndexedMutation = false;
- IndexWriteException iwe = null;
- do {
- TableRef origTableRef = tableInfo.getOrigTableRef();
- PTable table = origTableRef.getTable();
- table.getIndexMaintainers(indexMetaDataPtr, connection);
- final ServerCache cache = tableInfo.isDataTable() ?
- IndexMetaDataCacheClient.setMetaDataOnMutations(connection, table,
- mutationList, indexMetaDataPtr) : null;
- // If we haven't retried yet, retry for this case only, as it's possible that
- // a split will occur after we send the index metadata cache to all known
- // region servers.
- shouldRetry = cache != null;
- SQLException sqlE = null;
- HTableInterface hTable = connection.getQueryServices().getTable(htableName);
- try {
- if (table.isTransactional()) {
- // Track tables to which we've sent uncommitted data
- if (tableInfo.isDataTable()) {
- uncommittedPhysicalNames.add(table.getPhysicalName().getString());
- phoenixTransactionContext.markDMLFence(table);
- }
- hTable = phoenixTransactionContext.getTransactionalTableWriter(connection, table, hTable, !tableInfo.isDataTable());
+
+ Map<TableInfo, List<Mutation>> unverifiedIndexMutations = new LinkedHashMap<>();
+ Map<TableInfo, List<Mutation>> verifiedOrDeletedIndexMutations = new LinkedHashMap<>();
+ filterIndexCheckerMutations(physicalTableMutationMap, unverifiedIndexMutations,
+ verifiedOrDeletedIndexMutations);
+
+ // Phase 1: Send index mutations with the empty column value = "unverified"
+ sendMutations(unverifiedIndexMutations.entrySet().iterator(), span, indexMetaDataPtr);
+
+ // Phase 2: Send data table and other indexes
+ sendMutations(physicalTableMutationMap.entrySet().iterator(), span, indexMetaDataPtr);
+
+ // Phase 3: Send put index mutations with the empty column value = "verified" and/or delete index mutations
+ try {
+ sendMutations(verifiedOrDeletedIndexMutations.entrySet().iterator(), span, indexMetaDataPtr);
+ } catch (SQLException ex) {
+ // TODO: add a metric here
+ logger.warn(
+ "Ignoring exception that happened during setting index verified value to verified=TRUE "
+ + verifiedOrDeletedIndexMutations.toString(),
+ ex);
+ }
+
+ }
+ }
+
+ private void sendMutations(Iterator<Entry<TableInfo, List<Mutation>>> mutationsIterator, Span span, ImmutableBytesWritable indexMetaDataPtr)
+ throws SQLException {
+ while (mutationsIterator.hasNext()) {
+ Entry<TableInfo, List<Mutation>> pair = mutationsIterator.next();
+ TableInfo tableInfo = pair.getKey();
+ byte[] htableName = tableInfo.getHTableName().getBytes();
+ List<Mutation> mutationList = pair.getValue();
+ List<List<Mutation>> mutationBatchList =
+ getMutationBatchList(batchSize, batchSizeBytes, mutationList);
+
+ // create a span per target table
+ // TODO maybe we can be smarter about the table name to string here?
+ Span child = Tracing.child(span, "Writing mutation batch for table: " + Bytes.toString(htableName));
+
+ int retryCount = 0;
+ boolean shouldRetry = false;
+ long numMutations = 0;
+ long mutationSizeBytes = 0;
+ long mutationCommitTime = 0;
+ long numFailedMutations = 0;
+
+ long startTime = 0;
+ boolean shouldRetryIndexedMutation = false;
+ IndexWriteException iwe = null;
+ do {
+ TableRef origTableRef = tableInfo.getOrigTableRef();
+ PTable table = origTableRef.getTable();
+ table.getIndexMaintainers(indexMetaDataPtr, connection);
+ final ServerCache cache = tableInfo.isDataTable() ?
+ IndexMetaDataCacheClient.setMetaDataOnMutations(connection, table,
+ mutationList, indexMetaDataPtr) : null;
+ // If we haven't retried yet, retry for this case only, as it's possible that
+ // a split will occur after we send the index metadata cache to all known
+ // region servers.
+ shouldRetry = cache != null;
+ SQLException sqlE = null;
+ HTableInterface hTable = connection.getQueryServices().getTable(htableName);
+ try {
+ if (table.isTransactional()) {
+ // Track tables to which we've sent uncommitted data
+ if (tableInfo.isDataTable()) {
+ uncommittedPhysicalNames.add(table.getPhysicalName().getString());
+ phoenixTransactionContext.markDMLFence(table);
}
- numMutations = mutationList.size();
- GLOBAL_MUTATION_BATCH_SIZE.update(numMutations);
- mutationSizeBytes = calculateMutationSize(mutationList);
-
- startTime = System.currentTimeMillis();
- child.addTimelineAnnotation("Attempt " + retryCount);
- Iterator<List<Mutation>> itrListMutation = mutationBatchList.iterator();
- while (itrListMutation.hasNext()) {
- final List<Mutation> mutationBatch = itrListMutation.next();
- if (shouldRetryIndexedMutation) {
- // if there was an index write failure, retry the mutation in a loop
- final Table finalHTable = hTable;
- final ImmutableBytesWritable finalindexMetaDataPtr =
- indexMetaDataPtr;
- final PTable finalPTable = table;
- PhoenixIndexFailurePolicy.doBatchWithRetries(new MutateCommand() {
- @Override
- public void doMutation() throws IOException {
- try {
- finalHTable.batch(mutationBatch);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException(e);
- } catch (IOException e) {
- e = updateTableRegionCacheIfNecessary(e);
- throw e;
- }
+ // Only pass true for last argument if the index is being written to on it's own (i.e. initial
+ // index population), not if it's being written to for normal maintenance due to writes to
+ // the data table. This case is different because the initial index population does not need
+ // to be done transactionally since the index is only made active after all writes have
+ // occurred successfully.
+ hTable = phoenixTransactionContext.getTransactionalTableWriter(connection, table, hTable, tableInfo.isDataTable() && table.getType() == PTableType.INDEX);
+ }
+ numMutations = mutationList.size();
+ GLOBAL_MUTATION_BATCH_SIZE.update(numMutations);
+ mutationSizeBytes = calculateMutationSize(mutationList);
+
+ startTime = System.currentTimeMillis();
+ child.addTimelineAnnotation("Attempt " + retryCount);
+ Iterator<List<Mutation>> itrListMutation = mutationBatchList.iterator();
+ while (itrListMutation.hasNext()) {
+ final List<Mutation> mutationBatch = itrListMutation.next();
+ if (shouldRetryIndexedMutation) {
+ // if there was an index write failure, retry the mutation in a loop
+ final Table finalHTable = hTable;
+ final ImmutableBytesWritable finalindexMetaDataPtr =
+ indexMetaDataPtr;
+ final PTable finalPTable = table;
+ PhoenixIndexFailurePolicy.doBatchWithRetries(new MutateCommand() {
+ @Override
+ public void doMutation() throws IOException {
+ try {
+ finalHTable.batch(mutationBatch);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e);
+ } catch (IOException e) {
+ e = updateTableRegionCacheIfNecessary(e);
+ throw e;
}
+ }
- @Override
- public List<Mutation> getMutationList() {
- return mutationBatch;
- }
+ @Override
+ public List<Mutation> getMutationList() {
+ return mutationBatch;
+ }
- private IOException
- updateTableRegionCacheIfNecessary(IOException ioe) {
- SQLException sqlE =
- ServerUtil.parseLocalOrRemoteServerException(ioe);
- if (sqlE != null
- && sqlE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND
- .getErrorCode()) {
- try {
- connection.getQueryServices().clearTableRegionCache(
+ private IOException
+ updateTableRegionCacheIfNecessary(IOException ioe) {
+ SQLException sqlE =
+ ServerUtil.parseLocalOrRemoteServerException(ioe);
+ if (sqlE != null
+ && sqlE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND
+ .getErrorCode()) {
+ try {
+ connection.getQueryServices().clearTableRegionCache(
finalHTable.getName().getName());
- IndexMetaDataCacheClient.setMetaDataOnMutations(
+ IndexMetaDataCacheClient.setMetaDataOnMutations(
connection, finalPTable, mutationBatch,
finalindexMetaDataPtr);
- } catch (SQLException e) {
- return ServerUtil.createIOException(
+ } catch (SQLException e) {
+ return ServerUtil.createIOException(
"Exception during updating index meta data cache",
ioe);
- }
}
- return ioe;
}
- }, iwe, connection, connection.getQueryServices().getProps());
- shouldRetryIndexedMutation = false;
- } else {
- hTable.batch(mutationBatch);
- }
- // remove each batch from the list once it gets applied
- // so when failures happens for any batch we only start
- // from that batch only instead of doing duplicate reply of already
- // applied batches from entire list, also we can set
- // REPLAY_ONLY_INDEX_WRITES for first batch
- // only in case of 1121 SQLException
- itrListMutation.remove();
-
- batchCount++;
- if (logger.isDebugEnabled())
- logger.debug("Sent batch of " + mutationBatch.size() + " for "
- + Bytes.toString(htableName));
- }
- child.stop();
- child.stop();
- shouldRetry = false;
- mutationCommitTime = System.currentTimeMillis() - startTime;
- GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime);
- numFailedMutations = 0;
-
- // Remove batches as we process them
- mutations.remove(origTableRef);
- if (tableInfo.isDataTable()) {
- numRows -= numMutations;
- // recalculate the estimated size
- estimatedSize = KeyValueUtil.getEstimatedRowMutationSize(mutations);
+ return ioe;
+ }
+ }, iwe, connection, connection.getQueryServices().getProps());
+ shouldRetryIndexedMutation = false;
+ } else {
+ hTable.batch(mutationBatch);
}
- } catch (Exception e) {
- mutationCommitTime = System.currentTimeMillis() - startTime;
- serverTimestamp = ServerUtil.parseServerTimestamp(e);
- SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e);
- if (inferredE != null) {
- if (shouldRetry
- && retryCount == 0
- && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND
- .getErrorCode()) {
- // Swallow this exception once, as it's possible that we split after sending the index
- // metadata
- // and one of the region servers doesn't have it. This will cause it to have it the next
- // go around.
- // If it fails again, we don't retry.
- String msg = "Swallowing exception and retrying after clearing meta cache on connection. "
- + inferredE;
- logger.warn(LogUtil.addCustomAnnotations(msg, connection));
- connection.getQueryServices().clearTableRegionCache(htableName);
-
- // add a new child span as this one failed
- child.addTimelineAnnotation(msg);
- child.stop();
- child = Tracing.child(span, "Failed batch, attempting retry");
-
- continue;
- } else if (inferredE.getErrorCode() == SQLExceptionCode.INDEX_WRITE_FAILURE.getErrorCode()) {
- iwe = PhoenixIndexFailurePolicy.getIndexWriteException(inferredE);
- if (iwe != null && !shouldRetryIndexedMutation) {
- // For an index write failure, the data table write succeeded,
- // so when we retry we need to set REPLAY_WRITES
- // for first batch in list only.
- for (Mutation m : mutationBatchList.get(0)) {
- if (!PhoenixIndexMetaData.
- isIndexRebuild(m.getAttributesMap())) {
- m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
+ // remove each batch from the list once it gets applied
+ // so when failures happens for any batch we only start
+ // from that batch only instead of doing duplicate reply of already
+ // applied batches from entire list, also we can set
+ // REPLAY_ONLY_INDEX_WRITES for first batch
+ // only in case of 1121 SQLException
+ itrListMutation.remove();
+
+ batchCount++;
+ if (logger.isDebugEnabled())
+ logger.debug("Sent batch of " + mutationBatch.size() + " for "
+ + Bytes.toString(htableName));
+ }
+ child.stop();
+ child.stop();
+ shouldRetry = false;
+ mutationCommitTime = System.currentTimeMillis() - startTime;
+ GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime);
+ numFailedMutations = 0;
+
+ // Remove batches as we process them
+ mutations.remove(origTableRef);
+ if (tableInfo.isDataTable()) {
+ numRows -= numMutations;
+ // recalculate the estimated size
+ estimatedSize = KeyValueUtil.getEstimatedRowMutationSize(mutations);
+ }
+ } catch (Exception e) {
+ mutationCommitTime = System.currentTimeMillis() - startTime;
+ long serverTimestamp = ServerUtil.parseServerTimestamp(e);
+ SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e);
+ if (inferredE != null) {
+ if (shouldRetry
+ && retryCount == 0
+ && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND
+ .getErrorCode()) {
+ // Swallow this exception once, as it's possible that we split after sending the index
+ // metadata
+ // and one of the region servers doesn't have it. This will cause it to have it the next
+ // go around.
+ // If it fails again, we don't retry.
+ String msg = "Swallowing exception and retrying after clearing meta cache on connection. "
+ + inferredE;
+ logger.warn(LogUtil.addCustomAnnotations(msg, connection));
+ connection.getQueryServices().clearTableRegionCache(htableName);
+
+ // add a new child span as this one failed
+ child.addTimelineAnnotation(msg);
+ child.stop();
+ child = Tracing.child(span, "Failed batch, attempting retry");
+
+ continue;
+ } else if (inferredE.getErrorCode() == SQLExceptionCode.INDEX_WRITE_FAILURE.getErrorCode()) {
+ iwe = PhoenixIndexFailurePolicy.getIndexWriteException(inferredE);
+ if (iwe != null && !shouldRetryIndexedMutation) {
+ // For an index write failure, the data table write succeeded,
+ // so when we retry we need to set REPLAY_WRITES
+ // for first batch in list only.
+ for (Mutation m : mutationBatchList.get(0)) {
+ if (!PhoenixIndexMetaData.isIndexRebuild(
+ m.getAttributesMap())){
+ m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES
- );
- }
- KeyValueUtil.setTimestamp(m, serverTimestamp);
+ );
}
- shouldRetry = true;
- shouldRetryIndexedMutation = true;
- continue;
+ KeyValueUtil.setTimestamp(m, serverTimestamp);
}
+ shouldRetry = true;
+ shouldRetryIndexedMutation = true;
+ continue;
}
- e = inferredE;
}
- // Throw to client an exception that indicates the statements that
- // were not committed successfully.
- int[] uncommittedStatementIndexes = getUncommittedStatementIndexes();
- sqlE = new CommitException(e, uncommittedStatementIndexes, serverTimestamp);
- numFailedMutations = uncommittedStatementIndexes.length;
- GLOBAL_MUTATION_BATCH_FAILED_COUNT.update(numFailedMutations);
+ e = inferredE;
+ }
+ // Throw to client an exception that indicates the statements that
+ // were not committed successfully.
+ int[] uncommittedStatementIndexes = getUncommittedStatementIndexes();
+ sqlE = new CommitException(e, uncommittedStatementIndexes, serverTimestamp);
+ numFailedMutations = uncommittedStatementIndexes.length;
+ GLOBAL_MUTATION_BATCH_FAILED_COUNT.update(numFailedMutations);
+ } finally {
+ MutationMetric mutationsMetric = new MutationMetric(numMutations, mutationSizeBytes,
+ mutationCommitTime, numFailedMutations);
+ mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), mutationsMetric);
+ try {
+ if (cache != null) cache.close();
} finally {
- MutationMetric mutationsMetric = new MutationMetric(numMutations, mutationSizeBytes,
- mutationCommitTime, numFailedMutations);
- mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), mutationsMetric);
try {
- if (cache != null) cache.close();
- } finally {
- try {
- hTable.close();
- } catch (IOException e) {
- if (sqlE != null) {
- sqlE.setNextException(ServerUtil.parseServerException(e));
- } else {
- sqlE = ServerUtil.parseServerException(e);
- }
+ hTable.close();
+ } catch (IOException e) {
+ if (sqlE != null) {
+ sqlE.setNextException(ServerUtil.parseServerException(e));
+ } else {
+ sqlE = ServerUtil.parseServerException(e);
}
- if (sqlE != null) { throw sqlE; }
}
+ if (sqlE != null) { throw sqlE; }
}
- } while (shouldRetry && retryCount++ < 1);
+ }
+ } while (shouldRetry && retryCount++ < 1);
+ }
+ }
+
+ private void filterIndexCheckerMutations(Map<TableInfo, List<Mutation>> mutationMap,
+ Map<TableInfo, List<Mutation>> unverifiedIndexMutations,
+ Map<TableInfo, List<Mutation>> verifiedOrDeletedIndexMutations) throws SQLException {
+ Iterator<Entry<TableInfo, List<Mutation>>> mapIter = mutationMap.entrySet().iterator();
+
+ while (mapIter.hasNext()) {
+ Entry<TableInfo, List<Mutation>> pair = mapIter.next();
+ TableInfo tableInfo = pair.getKey();
+ if (tableInfo.getOrigTableRef().getTable().isImmutableRows() && IndexUtil.isGlobalIndexCheckerEnabled(connection, tableInfo.getHTableName())) {
+ PTable table = PhoenixRuntime.getTable(connection, tableInfo.getHTableName().getString());
+ byte[] emptyCF = SchemaUtil.getEmptyColumnFamily(table);
+ byte[] emptyCQ = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
+
+ List<Mutation> mutations = pair.getValue();
+
+ for (Mutation m : mutations) {
+ if (m == null) {
+ continue;
+ }
+ if (m instanceof Delete) {
+ Put put = new Put(m.getRow());
+ put.addColumn(emptyCF, emptyCQ, IndexRegionObserver.getMaxTimestamp(m),
+ IndexRegionObserver.UNVERIFIED_BYTES);
+ // The Delete gets marked as unverified in Phase 1 and gets deleted on Phase 3.
+ addToMap(unverifiedIndexMutations, tableInfo, put);
+ addToMap(verifiedOrDeletedIndexMutations, tableInfo, m);
+ } else if (m instanceof Put) {
+ long timestamp = IndexRegionObserver.getMaxTimestamp(m);
+
+ // Phase 1 index mutations are set to unverified
+ ((Put) m).addColumn(emptyCF, emptyCQ, timestamp, IndexRegionObserver.UNVERIFIED_BYTES);
+ addToMap(unverifiedIndexMutations, tableInfo, m);
+
+ // Phase 3 mutations are verified
+ Put verifiedPut = new Put(m.getRow());
+ verifiedPut.addColumn(emptyCF, emptyCQ, timestamp,
+ IndexRegionObserver.VERIFIED_BYTES);
+ addToMap(verifiedOrDeletedIndexMutations, tableInfo, verifiedPut);
+ } else {
+ addToMap(unverifiedIndexMutations, tableInfo, m);
+ }
+ }
+
+ mapIter.remove();
}
+
+ }
+ }
+
+ private void addToMap(Map<TableInfo, List<Mutation>> map, TableInfo tableInfo, Mutation mutation) {
+ List<Mutation> mutations = null;
+ if (map.containsKey(tableInfo)) {
+ mutations = map.get(tableInfo);
+ } else {
+ mutations = Lists.newArrayList();
}
+ mutations.add(mutation);
+ map.put(tableInfo, mutations);
}
/**
@@ -1259,7 +1366,7 @@ public class MutationState implements SQLCloseable {
logger.info(e.getClass().getName() + " at timestamp " + getInitialWritePointer()
+ " with retry count of " + retryCount);
retryCommit = (e.getErrorCode() == SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION
- .getErrorCode() && retryCount < MAX_COMMIT_RETRIES);
+ .getErrorCode() && retryCount < MAX_COMMIT_RETRIES);
if (sqlE == null) {
sqlE = e;
} else {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 520aff3..516a088 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -376,7 +376,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
"Somehow didn't return an index update but also didn't propagate the failure to the client!");
}
- private long getMaxTimestamp(Mutation m) {
+ public static long getMaxTimestamp(Mutation m) {
long maxTs = 0;
long ts = 0;
Iterator iterator = m.getFamilyCellMap().entrySet().iterator();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index cd7b97e..970d46b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -972,6 +972,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR));
put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
}
+
ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey);
if (immutableStorageScheme != ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
// map from index column family to list of pair of index column and data column (for covered columns)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index bca361c..fe64bce 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -857,7 +857,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if (tableType == PTableType.INDEX && !isTransactional) {
if (!indexRegionObserverEnabled && descriptor.hasCoprocessor(GlobalIndexChecker.class.getName())) {
descriptor.removeCoprocessor(GlobalIndexChecker.class.getName());
- } else if (indexRegionObserverEnabled && !descriptor.hasCoprocessor(GlobalIndexChecker.class.getName())) {
+ } else if (indexRegionObserverEnabled && !descriptor.hasCoprocessor(GlobalIndexChecker.class.getName()) &&
+ !isLocalIndexTable(descriptor.getFamiliesKeys())) {
descriptor.addCoprocessor(GlobalIndexChecker.class.getName(), null, priority - 1, null);
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 944ff35..333e14c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -185,6 +185,7 @@ public class QueryServicesOptions {
public static final long DEFAULT_GROUPBY_MAX_CACHE_MAX = 1024L*1024L*100L; // 100 Mb
public static final long DEFAULT_SEQUENCE_CACHE_SIZE = 100; // reserve 100 sequences at a time
+ public static final int GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN = 10;
public static final long DEFAULT_MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS = 60000 * 30; // 30 mins
public static final long DEFAULT_MAX_SERVER_METADATA_CACHE_SIZE = 1024L*1024L*20L; // 20 Mb
public static final long DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE = 1024L*1024L*10L; // 10 Mb
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 2e71487..7b0bf09 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -40,7 +40,10 @@ import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.NavigableSet;
+import java.util.concurrent.TimeUnit;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -96,6 +99,7 @@ import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.hbase.index.util.VersionUtil;
+import org.apache.phoenix.index.GlobalIndexChecker;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -106,6 +110,7 @@ import org.apache.phoenix.parse.SQLParser;
import org.apache.phoenix.parse.SelectStatement;
import org.apache.phoenix.protobuf.ProtobufUtil;
import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
import org.apache.phoenix.schema.ColumnNotFoundException;
import org.apache.phoenix.schema.ColumnRef;
@@ -113,6 +118,7 @@ import org.apache.phoenix.schema.KeyValueSchema;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnFamily;
import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
@@ -134,7 +140,12 @@ import com.google.common.collect.Lists;
public class IndexUtil {
public static final String INDEX_COLUMN_NAME_SEP = ":";
public static final byte[] INDEX_COLUMN_NAME_SEP_BYTES = Bytes.toBytes(INDEX_COLUMN_NAME_SEP);
-
+
+ private static Cache<String, Boolean> indexNameGlobalIndexCheckerEnabledMap = CacheBuilder.newBuilder()
+ .expireAfterWrite(QueryServicesOptions.GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN,
+ TimeUnit.MINUTES)
+ .build();
+
private IndexUtil() {
}
@@ -280,11 +291,37 @@ public class IndexUtil {
.getLength()) == 0);
}
+
+ public static boolean isGlobalIndexCheckerEnabled(PhoenixConnection connection, PName index)
+ throws SQLException {
+ String indexName = index.getString();
+ Boolean entry = indexNameGlobalIndexCheckerEnabledMap.getIfPresent(indexName);
+ if (entry != null){
+ return entry;
+ }
+
+ boolean result = false;
+ try {
+ HTableDescriptor desc = connection.getQueryServices().getTableDescriptor(index.getBytes());
+
+ if (desc != null) {
+ if (desc.hasCoprocessor(GlobalIndexChecker.class.getName())) {
+ result = true;
+ }
+ }
+ indexNameGlobalIndexCheckerEnabledMap.put(indexName, result);
+ } catch (TableNotFoundException ex) {
+ // We can swallow this because some indexes don't have separate tables like local indexes
+ }
+
+ return result;
+ }
+
public static List<Mutation> generateIndexData(final PTable table, PTable index,
final MultiRowMutationState multiRowMutationState, List<Mutation> dataMutations, final KeyValueBuilder kvBuilder, PhoenixConnection connection)
throws SQLException {
try {
- final ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+ final ImmutableBytesPtr ptr = new ImmutableBytesPtr();
IndexMaintainer maintainer = index.getIndexMaintainer(table, connection);
List<Mutation> indexMutations = Lists.newArrayListWithExpectedSize(dataMutations.size());
for (final Mutation dataMutation : dataMutations) {
@@ -298,12 +335,12 @@ public class IndexUtil {
*/
if (dataMutation instanceof Put) {
ValueGetter valueGetter = new ValueGetter() {
-
- @Override
+
+ @Override
public byte[] getRowKey() {
- return dataMutation.getRow();
- }
-
+ return dataMutation.getRow();
+ }
+
@Override
public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) {
// Always return null for our empty key value, as this will cause the index
@@ -320,15 +357,15 @@ public class IndexUtil {
}
for (Cell kv : kvs) {
if (Bytes.compareTo(kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), family, 0, family.length) == 0 &&
- Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(), qualifier, 0, qualifier.length) == 0) {
- ImmutableBytesPtr ptr = new ImmutableBytesPtr();
- kvBuilder.getValueAsPtr(kv, ptr);
- return ptr;
+ Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(), qualifier, 0, qualifier.length) == 0) {
+ ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+ kvBuilder.getValueAsPtr(kv, ptr);
+ return ptr;
}
}
return null;
}
-
+
};
byte[] regionStartKey = null;
byte[] regionEndkey = null;
@@ -911,7 +948,7 @@ public class IndexUtil {
}
public static void setScanAttributesForIndexReadRepair(Scan scan, PTable table, PhoenixConnection phoenixConnection) throws SQLException {
- if (table.isTransactional() || table.isImmutableRows() || table.getType() != PTableType.INDEX) {
+ if (table.isTransactional() || table.getType() != PTableType.INDEX) {
return;
}
PTable indexTable = table;
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 811e209..5bbdbc0 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
@@ -858,9 +859,13 @@ public class TestUtil {
System.out.println("-----------------------------------------------");
}
- public static int getRawRowCount(HTableInterface table) throws IOException {
+ public static int getRawRowCount(Table table) throws IOException {
+ return getRowCount(table, true);
+ }
+
+ public static int getRowCount(Table table, boolean isRaw) throws IOException {
Scan s = new Scan();
- s.setRaw(true);;
+ s.setRaw(isRaw);;
s.setMaxVersions();
int rows = 0;
try (ResultScanner scanner = table.getScanner(s)) {
@@ -918,7 +923,31 @@ public class TestUtil {
this.success = success;
}
}
-
+
+ public static void waitForIndexState(Connection conn, String fullIndexName, PIndexState expectedIndexState) throws InterruptedException, SQLException {
+ int maxTries = 60, nTries = 0;
+ PIndexState actualIndexState = null;
+ do {
+ String schema = SchemaUtil.getSchemaNameFromFullName(fullIndexName);
+ String index = SchemaUtil.getTableNameFromFullName(fullIndexName);
+ Thread.sleep(1000); // sleep 1 sec
+ String query = "SELECT " + PhoenixDatabaseMetaData.INDEX_STATE + " FROM " +
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " WHERE (" + PhoenixDatabaseMetaData.TABLE_SCHEM + "," + PhoenixDatabaseMetaData.TABLE_NAME
+ + ") = (" + "'" + schema + "','" + index + "') "
+ + "AND " + PhoenixDatabaseMetaData.COLUMN_FAMILY + " IS NULL AND " + PhoenixDatabaseMetaData.COLUMN_NAME + " IS NULL";
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ if (rs.next()) {
+ actualIndexState = PIndexState.fromSerializedValue(rs.getString(1));
+ boolean matchesExpected = (actualIndexState == expectedIndexState);
+ if (matchesExpected) {
+ return;
+ }
+ }
+ } while (++nTries < maxTries);
+ fail("Ran out of time waiting for index state to become " + expectedIndexState + " last seen actual state is " +
+ (actualIndexState == null ? "Unknown" : actualIndexState.toString()));
+ }
+
public static void waitForIndexState(Connection conn, String fullIndexName, PIndexState expectedIndexState, Long expectedIndexDisableTimestamp) throws InterruptedException, SQLException {
int maxTries = 60, nTries = 0;
do {