You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/05/12 00:09:19 UTC
[2/4] phoenix git commit: PHOENIX-3811 Do not disable index on write
failure by default
PHOENIX-3811 Do not disable index on write failure by default
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1f1ecc97
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1f1ecc97
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1f1ecc97
Branch: refs/heads/master
Commit: 1f1ecc97bab1b081a2b18e78ad141385c837ecd3
Parents: f6fbb0d
Author: James Taylor <ja...@apache.org>
Authored: Wed May 10 09:52:23 2017 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Thu May 11 17:04:09 2017 -0700
----------------------------------------------------------------------
.../phoenix/end2end/AutomaticRebuildIT.java | 221 --------------
.../end2end/IndexToolForPartialBuildIT.java | 15 +-
...olForPartialBuildWithNamespaceEnabledIT.java | 15 +-
.../end2end/index/MutableIndexFailureIT.java | 252 ++++++++++------
.../end2end/index/ReadOnlyIndexFailureIT.java | 291 -------------------
.../apache/phoenix/compile/DeleteCompiler.java | 5 +
.../apache/phoenix/compile/UpsertCompiler.java | 4 +
.../coprocessor/MetaDataEndpointImpl.java | 6 +-
.../coprocessor/MetaDataRegionObserver.java | 44 ++-
.../UngroupedAggregateRegionObserver.java | 7 +
.../phoenix/exception/SQLExceptionCode.java | 2 +
.../apache/phoenix/execute/CommitException.java | 8 +-
.../apache/phoenix/execute/MutationState.java | 13 +-
.../phoenix/hbase/index/write/IndexWriter.java | 4 +
.../write/LeaveIndexActiveFailurePolicy.java | 62 ++++
.../index/PhoenixIndexFailurePolicy.java | 82 +++++-
.../index/PhoenixTransactionalIndexer.java | 5 +-
.../apache/phoenix/jdbc/PhoenixConnection.java | 44 ++-
.../phoenix/mapreduce/index/IndexTool.java | 12 +-
.../query/ConnectionQueryServicesImpl.java | 2 +-
.../org/apache/phoenix/query/QueryServices.java | 3 +-
.../phoenix/query/QueryServicesOptions.java | 3 +-
.../apache/phoenix/schema/MetaDataClient.java | 7 +-
.../java/org/apache/phoenix/util/IndexUtil.java | 12 +-
.../java/org/apache/phoenix/util/JDBCUtil.java | 5 +
.../org/apache/phoenix/util/PhoenixRuntime.java | 14 +
.../org/apache/phoenix/util/ServerUtil.java | 37 +++
.../hbase/index/write/TestIndexWriter.java | 6 +
28 files changed, 500 insertions(+), 681 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f1ecc97/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java
deleted file mode 100644
index 25cab35..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end;
-
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
-import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.PIndexState;
-import org.apache.phoenix.schema.PTableType;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.StringUtil;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import com.google.common.collect.Maps;
-
-/**
- * Tests for the {@link AutomaticRebuildIT}
- */
-@RunWith(Parameterized.class)
-public class AutomaticRebuildIT extends BaseOwnClusterIT {
-
- private final boolean localIndex;
- protected boolean isNamespaceEnabled = false;
- protected final String tableDDLOptions;
-
- public AutomaticRebuildIT(boolean localIndex) {
- this.localIndex = localIndex;
- StringBuilder optionBuilder = new StringBuilder();
- optionBuilder.append(" SPLIT ON(1,2)");
- this.tableDDLOptions = optionBuilder.toString();
- }
-
- @BeforeClass
- public static void doSetup() throws Exception {
- Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(7);
- serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
- serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName());
- serverProps.put(" yarn.scheduler.capacity.maximum-am-resource-percent", "1.0");
- serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
- serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
- serverProps.put("hbase.client.pause", "5000");
- serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, "1000");
- serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE, "5");
- Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
- setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
- new ReadOnlyProps(clientProps.entrySet().iterator()));
- }
-
- @Parameters(name = "localIndex = {0}")
- public static Collection<Boolean[]> data() {
- return Arrays.asList(new Boolean[][] { { false }, { true } });
- }
-
- @Test
- public void testSecondaryAutomaticRebuildIndex() throws Exception {
- String schemaName = generateUniqueName();
- String dataTableName = generateUniqueName();
- String fullTableName = SchemaUtil.getTableName(schemaName, dataTableName);
- final String indxTable = String.format("%s_%s", dataTableName, FailingRegionObserver.INDEX_NAME);
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- props.setProperty(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
- props.setProperty(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, Boolean.FALSE.toString());
- props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceEnabled));
- final Connection conn = DriverManager.getConnection(getUrl(), props);
- Statement stmt = conn.createStatement();
- try {
- if (isNamespaceEnabled) {
- conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
- }
- stmt.execute(String.format(
- "CREATE TABLE %s (ID BIGINT NOT NULL, NAME VARCHAR, ZIP INTEGER CONSTRAINT PK PRIMARY KEY(ID ROW_TIMESTAMP)) %s",
- fullTableName, tableDDLOptions));
- String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", fullTableName);
- PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
- FailingRegionObserver.FAIL_WRITE = false;
- // insert two rows
- upsertRow(stmt1, 1000);
- upsertRow(stmt1, 2000);
-
- conn.commit();
- stmt.execute(String.format("CREATE %s INDEX %s ON %s (LPAD(UPPER(NAME),11,'x')||'_xyz') ",
- (localIndex ? "LOCAL" : ""), indxTable, fullTableName));
- FailingRegionObserver.FAIL_WRITE = true;
- upsertRow(stmt1, 3000);
- upsertRow(stmt1, 4000);
- upsertRow(stmt1, 5000);
- try {
- conn.commit();
- fail();
- } catch (SQLException e) {
- } catch (Exception e) {
- }
- FailingRegionObserver.FAIL_WRITE = false;
- ResultSet rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), indxTable,
- new String[] { PTableType.INDEX.toString() });
- assertTrue(rs.next());
- assertEquals(indxTable, rs.getString(3));
- String indexState = rs.getString("INDEX_STATE");
- assertEquals(PIndexState.DISABLE.toString(), indexState);
- assertFalse(rs.next());
- upsertRow(stmt1, 6000);
- upsertRow(stmt1, 7000);
- conn.commit();
- int maxTries = 4, nTries = 0;
- boolean isInactive = false;
- do {
- rs = conn.createStatement()
- .executeQuery(String.format("SELECT " + PhoenixDatabaseMetaData.INDEX_STATE + ","
- + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " FROM "
- +"\""+ SYSTEM_CATALOG_SCHEMA + "\"." + SYSTEM_CATALOG_TABLE + " ("
- + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " bigint) where "
- + PhoenixDatabaseMetaData.TABLE_SCHEM + "='" + schemaName + "' and "
- + PhoenixDatabaseMetaData.TABLE_NAME + "='" + indxTable + "'"));
- rs.next();
- if (PIndexState.INACTIVE.getSerializedValue().equals(rs.getString(1)) && rs.getLong(2) > 3000) {
- isInactive = true;
- break;
- }
- Thread.sleep(10 * 1000); // sleep 10 secs
- } while (++nTries < maxTries);
- assertTrue(isInactive);
- nTries = 0;
- boolean isActive = false;
- do {
- Thread.sleep(15 * 1000); // sleep 15 secs
- rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), indxTable,
- new String[] { PTableType.INDEX.toString() });
- assertTrue(rs.next());
- if (PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))) {
- isActive = true;
- break;
- }
- } while (++nTries < maxTries);
- assertTrue(isActive);
-
- } finally {
- conn.close();
- }
- }
-
- public static void upsertRow(PreparedStatement stmt, int i) throws SQLException {
- // insert row
- stmt.setInt(1, i);
- stmt.setString(2, "uname" + String.valueOf(i));
- stmt.setInt(3, 95050 + i);
- stmt.executeUpdate();
- }
-
- public static class FailingRegionObserver extends SimpleRegionObserver {
- public static volatile boolean FAIL_WRITE = false;
- public static final String INDEX_NAME = "IDX";
-
- @Override
- public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
- MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
- if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) {
- throw new DoNotRetryIOException();
- }
- Mutation operation = miniBatchOp.getOperation(0);
- Set<byte[]> keySet = operation.getFamilyMap().keySet();
- for (byte[] family : keySet) {
- if (Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX) && FAIL_WRITE) {
- throw new DoNotRetryIOException();
- }
- }
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f1ecc97/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
index 599e601..59a9106 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
@@ -89,9 +89,8 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
this.tableDDLOptions = optionBuilder.toString();
}
- @BeforeClass
- public static void doSetup() throws Exception {
- Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(7);
+ public static Map<String, String> getServerProperties() {
+ Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName());
serverProps.put(" yarn.scheduler.capacity.maximum-am-resource-percent", "1.0");
@@ -99,8 +98,14 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
serverProps.put("hbase.client.pause", "5000");
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, Boolean.FALSE.toString());
- Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
- setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
+ serverProps.put(QueryServices.INDEX_FAILURE_DISABLE_INDEX, Boolean.TRUE.toString());
+ return serverProps;
+ }
+
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Map<String, String> serverProps = getServerProperties();
+ setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), ReadOnlyProps.EMPTY_PROPS);
}
@Parameters(name="localIndex = {0}")
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f1ecc97/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
index 4b2371c..a8c1f1e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
@@ -21,13 +21,9 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.BeforeClass;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import com.google.common.collect.Maps;
@@ -35,7 +31,6 @@ import com.google.common.collect.Maps;
/**
* Tests for the {@link IndexToolForPartialBuildWithNamespaceEnabled}
*/
-@RunWith(Parameterized.class)
public class IndexToolForPartialBuildWithNamespaceEnabledIT extends IndexToolForPartialBuildIT {
@@ -45,15 +40,9 @@ public class IndexToolForPartialBuildWithNamespaceEnabledIT extends IndexToolFor
}
@BeforeClass
+ @Shadower(classBeingShadowed = IndexToolForPartialBuildIT.class)
public static void doSetup() throws Exception {
- Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(7);
- serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
- serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName());
- serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
- serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
- serverProps.put("hbase.client.pause", "5000");
- serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, "2000");
- serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, "1000");
+ Map<String, String> serverProps = getServerProperties();
serverProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true");
Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
clientProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true");
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f1ecc97/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index e612f49..11573a5 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -24,16 +24,17 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.sql.Connection;
+import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Set;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
@@ -44,18 +45,23 @@ import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
-import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.execute.CommitException;
+import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.StringUtil;
-import org.apache.phoenix.util.TestUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -64,6 +70,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
*
@@ -77,6 +84,7 @@ import com.google.common.collect.Maps;
@RunWith(Parameterized.class)
public class MutableIndexFailureIT extends BaseTest {
public static final String INDEX_NAME = "IDX";
+ public static final String TABLE_NAME = "T";
public static volatile boolean FAIL_WRITE = false;
public static volatile String fullTableName;
@@ -89,23 +97,29 @@ public class MutableIndexFailureIT extends BaseTest {
private final boolean localIndex;
private final String tableDDLOptions;
private final boolean isNamespaceMapped;
+ private final boolean leaveIndexActiveOnFailure;
+ private final boolean rebuildIndexOnWriteFailure;
private String schema = generateUniqueName();
+ private List<CommitException> exceptions = Lists.newArrayList();
@AfterClass
public static void doTeardown() throws Exception {
tearDownMiniCluster();
}
- public MutableIndexFailureIT(boolean transactional, boolean localIndex, boolean isNamespaceMapped) {
+ public MutableIndexFailureIT(boolean transactional, boolean localIndex, boolean isNamespaceMapped, Boolean disableIndexOnWriteFailure, Boolean rebuildIndexOnWriteFailure) {
this.transactional = transactional;
this.localIndex = localIndex;
- this.tableDDLOptions = " SALT_BUCKETS=2 " + (transactional ? ", TRANSACTIONAL=true " : "");
- this.tableName = (localIndex ? "L_" : "") + TestUtil.DEFAULT_DATA_TABLE_NAME + (transactional ? "_TXN" : "")
- + (isNamespaceMapped ? "_NM" : "");
- this.indexName = FailingRegionObserver.INDEX_NAME;
+ this.tableDDLOptions = " SALT_BUCKETS=2 " + (transactional ? ", TRANSACTIONAL=true " : "")
+ + (disableIndexOnWriteFailure == null ? "" : (", " + PhoenixIndexFailurePolicy.DISABLE_INDEX_ON_WRITE_FAILURE + "=" + disableIndexOnWriteFailure))
+ + (rebuildIndexOnWriteFailure == null ? "" : (", " + PhoenixIndexFailurePolicy.REBUILD_INDEX_ON_WRITE_FAILURE + "=" + rebuildIndexOnWriteFailure));
+ this.tableName = FailingRegionObserver.FAIL_TABLE_NAME;
+ this.indexName = "A_" + FailingRegionObserver.FAIL_INDEX_NAME;
fullTableName = SchemaUtil.getTableName(schema, tableName);
this.fullIndexName = SchemaUtil.getTableName(schema, indexName);
this.isNamespaceMapped = isNamespaceMapped;
+ this.leaveIndexActiveOnFailure = ! (disableIndexOnWriteFailure == null ? QueryServicesOptions.DEFAULT_INDEX_FAILURE_DISABLE_INDEX : disableIndexOnWriteFailure);
+ this.rebuildIndexOnWriteFailure = Boolean.TRUE.equals(rebuildIndexOnWriteFailure);
}
@BeforeClass
@@ -117,16 +131,30 @@ public class MutableIndexFailureIT extends BaseTest {
serverProps.put("hbase.client.pause", "5000");
serverProps.put("data.tx.snapshot.dir", "/tmp");
serverProps.put("hbase.balancer.period", String.valueOf(Integer.MAX_VALUE));
- Map<String, String> clientProps = Collections.singletonMap(QueryServices.TRANSACTIONS_ENABLED, "true");
+ serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, Boolean.TRUE.toString());
+ serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, "4000");
+ Map<String, String> clientProps = Collections.singletonMap(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
NUM_SLAVES_BASE = 4;
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
}
- @Parameters(name = "MutableIndexFailureIT_transactional={0},localIndex={1},isNamespaceMapped={2}") // name is used by failsafe as file name in reports
- public static Collection<Boolean[]> data() {
- return Arrays.asList(new Boolean[][] { { false, false, true }, { false, false, false }, { false, true, true },
- { false, true, false }, { true, false, true }, { true, true, true }, { true, false, false },
- { true, true, false } });
+ @Parameters(name = "MutableIndexFailureIT_transactional={0},localIndex={1},isNamespaceMapped={2},disableIndexOnWriteFailure={3},rebuildIndexOnWriteFailure={4}") // name is used by failsafe as file name in reports
+ public static List<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ { false, false, true, true, true },
+ { false, false, false, true, true },
+ { true, false, false, true, true },
+ { true, false, true, true, true },
+ { false, true, true, true, true },
+ { false, true, false, true, true },
+ { true, true, false, true, true },
+ { true, true, true, true, true },
+
+ { false, false, false, null, true },
+ { false, true, false, false, true },
+ { false, false, false, false, null },
+ }
+ );
}
@Test
@@ -135,9 +163,9 @@ public class MutableIndexFailureIT extends BaseTest {
}
public void helpTestWriteFailureDisablesIndex() throws Exception {
- String secondTableName = fullTableName + "_2";
- String secondIndexName = indexName + "_2";
- String secondFullIndexName = fullIndexName + "_2";
+ String secondIndexName = "B_" + FailingRegionObserver.FAIL_INDEX_NAME;
+// String thirdIndexName = "C_" + INDEX_NAME;
+// String thirdFullIndexName = SchemaUtil.getTableName(schema, thirdIndexName);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
props.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, String.valueOf(isNamespaceMapped));
try (Connection conn = driver.connect(url, props)) {
@@ -149,29 +177,26 @@ public class MutableIndexFailureIT extends BaseTest {
}
conn.createStatement().execute("CREATE TABLE " + fullTableName
+ " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + tableDDLOptions);
- conn.createStatement().execute("CREATE TABLE " + secondTableName
- + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + tableDDLOptions);
query = "SELECT * FROM " + fullTableName;
rs = conn.createStatement().executeQuery(query);
assertFalse(rs.next());
FailingRegionObserver.FAIL_WRITE = false;
conn.createStatement().execute(
- "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
+ "CREATE " + (localIndex ? "LOCAL " : "") + " INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
// Create other index which should be local/global if the other index is global/local to
// check the drop index.
conn.createStatement().execute(
- "CREATE " + (!localIndex ? "LOCAL " : "") + "INDEX " + indexName + "_3" + " ON "
- + fullTableName + " (v2) INCLUDE (v1)");
- conn.createStatement().execute(
- "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + secondIndexName + " ON " + secondTableName + " (v1) INCLUDE (v2)");
+ "CREATE " + (!localIndex ? "LOCAL " : "") + " INDEX " + secondIndexName + " ON " + fullTableName + " (v2) INCLUDE (v1)");
+// conn.createStatement().execute(
+// "CREATE " + (localIndex ? "LOCAL " : "") + " INDEX " + thirdIndexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
query = "SELECT * FROM " + fullIndexName;
rs = conn.createStatement().executeQuery(query);
assertFalse(rs.next());
// Verify the metadata for index is correct.
- rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), indexName+"%",
+ rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), null,
new String[] { PTableType.INDEX.toString() });
assertTrue(rs.next());
assertEquals(indexName, rs.getString(3));
@@ -179,11 +204,10 @@ public class MutableIndexFailureIT extends BaseTest {
assertTrue(rs.next());
assertEquals(secondIndexName, rs.getString(3));
assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
- assertTrue(rs.next());
- assertEquals(indexName+"_3", rs.getString(3));
- assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
+// assertTrue(rs.next());
+// assertEquals(thirdIndexName, rs.getString(3));
+// assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
initializeTable(conn, fullTableName);
- initializeTable(conn, secondTableName);
query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
rs = conn.createStatement().executeQuery("EXPLAIN " + query);
@@ -203,15 +227,14 @@ public class MutableIndexFailureIT extends BaseTest {
assertFalse(rs.next());
FailingRegionObserver.FAIL_WRITE = true;
- updateTable(conn, fullTableName);
- updateTable(conn, secondTableName);
+ updateTable(conn, true);
// Verify the metadata for index is correct.
- rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), indexName,
+ rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), StringUtil.escapeLike(indexName),
new String[] { PTableType.INDEX.toString() });
assertTrue(rs.next());
assertEquals(indexName, rs.getString(3));
// the index is only disabled for non-txn tables upon index table write failure
- if (transactional) {
+ if (transactional || leaveIndexActiveOnFailure) {
assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
} else {
String indexState = rs.getString("INDEX_STATE");
@@ -223,19 +246,7 @@ public class MutableIndexFailureIT extends BaseTest {
// in an all or none manner. If the table is not transactional, then the data writes
// would have succeeded while the index writes would have failed.
if (!transactional) {
- // Verify UPSERT on data table still work after index is disabled
- PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
- stmt.setString(1, "a3");
- stmt.setString(2, "x3");
- stmt.setString(3, "3");
- stmt.execute();
- conn.commit();
- stmt = conn.prepareStatement("UPSERT INTO " + secondTableName + " VALUES(?,?,?)");
- stmt.setString(1, "a3");
- stmt.setString(2, "x3");
- stmt.setString(3, "3");
- stmt.execute();
- conn.commit();
+ updateTableAgain(conn, leaveIndexActiveOnFailure);
// Verify previous writes succeeded to data table
query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
rs = conn.createStatement().executeQuery("EXPLAIN " + query);
@@ -257,14 +268,20 @@ public class MutableIndexFailureIT extends BaseTest {
assertEquals("d", rs.getString(2));
assertFalse(rs.next());
}
+ // Comment back in when PHOENIX-3815 is fixed
+// validateDataWithIndex(conn, fullTableName, thirdFullIndexName, false);
// re-enable index table
FailingRegionObserver.FAIL_WRITE = false;
- waitForIndexToBeActive(conn,indexName);
- waitForIndexToBeActive(conn,indexName+"_2");
- waitForIndexToBeActive(conn,secondIndexName);
+ if (rebuildIndexOnWriteFailure) {
+ // wait for index to be rebuilt automatically
+ waitForIndexToBeRebuilt(conn,indexName);
+ } else {
+ // simulate replaying failed mutation
+ replayMutations();
+ }
- // Verify UPSERT on data table still work after index table is recreated
+ // Verify UPSERT on data table still works after index table is recreated
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
stmt.setString(1, "a3");
stmt.setString(2, "x4");
@@ -272,33 +289,26 @@ public class MutableIndexFailureIT extends BaseTest {
stmt.execute();
conn.commit();
- stmt = conn.prepareStatement("UPSERT INTO " + secondTableName + " VALUES(?,?,?)");
- stmt.setString(1, "a3");
- stmt.setString(2, "x4");
- stmt.setString(3, "4");
- stmt.execute();
- conn.commit();
- // To clear the index name from connection.
- PhoenixConnection phoenixConn = conn.unwrap(PhoenixConnection.class);
- phoenixConn.getMetaDataCache().removeTable(null, fullTableName, null, HConstants.LATEST_TIMESTAMP);
- // verify index table has correct data
- validateDataWithIndex(conn, fullTableName, fullIndexName);
- validateDataWithIndex(conn, secondTableName, secondFullIndexName);
+ // verify index table has correct data (note that second index has been dropped)
+ validateDataWithIndex(conn, fullTableName, fullIndexName, localIndex);
} finally {
FAIL_WRITE = false;
}
}
- private void waitForIndexToBeActive(Connection conn, String index) throws InterruptedException, SQLException {
+ private void waitForIndexToBeRebuilt(Connection conn, String index) throws InterruptedException, SQLException {
boolean isActive = false;
if (!transactional) {
- int maxTries = 4, nTries = 0;
+ int maxTries = 12, nTries = 0;
do {
- Thread.sleep(15 * 1000); // sleep 15 secs
- ResultSet rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), index,
- new String[] { PTableType.INDEX.toString() });
+ Thread.sleep(5 * 1000); // sleep 5 secs
+ String query = "SELECT CAST(" + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " AS BIGINT) FROM " +
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " WHERE (" + PhoenixDatabaseMetaData.TABLE_SCHEM + "," + PhoenixDatabaseMetaData.TABLE_NAME
+ + ") = (" + "'" + schema + "','" + index + "') "
+ + "AND " + PhoenixDatabaseMetaData.COLUMN_FAMILY + " IS NULL AND " + PhoenixDatabaseMetaData.COLUMN_NAME + " IS NULL";
+ ResultSet rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
- if (PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))) {
+ if (rs.getLong(1) == 0 && !rs.wasNull()) {
isActive = true;
break;
}
@@ -325,14 +335,14 @@ public class MutableIndexFailureIT extends BaseTest {
}
- private void validateDataWithIndex(Connection conn, String tableName, String indexName) throws SQLException {
- String query = "SELECT /*+ INDEX(" + indexName + ") */ k,v1 FROM " + tableName;
+ private void validateDataWithIndex(Connection conn, String fullTableName, String fullIndexName, boolean localIndex) throws SQLException {
+ String query = "SELECT /*+ INDEX(" + fullTableName + " " + SchemaUtil.getTableNameFromFullName(fullIndexName) + ") */ k,v1 FROM " + fullTableName;
ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query);
String expectedPlan = " OVER "
+ (localIndex
? Bytes.toString(
- SchemaUtil.getPhysicalTableName(tableName.getBytes(), isNamespaceMapped).getName())
- : SchemaUtil.getPhysicalTableName(indexName.getBytes(), isNamespaceMapped).getNameAsString());
+ SchemaUtil.getPhysicalTableName(fullTableName.getBytes(), isNamespaceMapped).getName())
+ : SchemaUtil.getPhysicalTableName(fullIndexName.getBytes(), isNamespaceMapped).getNameAsString());
String explainPlan = QueryUtil.getExplainPlan(rs);
assertTrue(explainPlan, explainPlan.contains(expectedPlan));
rs = conn.createStatement().executeQuery(query);
@@ -367,8 +377,26 @@ public class MutableIndexFailureIT extends BaseTest {
}
}
- private void updateTable(Connection conn, String tableName) throws SQLException {
- PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)");
+ private void replayMutations() throws SQLException {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ for (int i = 0; i < exceptions.size(); i++) {
+ CommitException e = exceptions.get(i);
+ long ts = e.getServerTimestamp();
+ props.setProperty(PhoenixRuntime.REPLAY_AT_ATTRIB, Long.toString(ts));
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ if (i == 0) {
+ updateTable(conn, false);
+ } else if (i == 1) {
+ updateTableAgain(conn, false);
+ } else {
+ fail();
+ }
+ }
+ }
+ }
+
+ private void updateTable(Connection conn, boolean commitShouldFail) throws SQLException {
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
// Insert new row
stmt.setString(1, "d");
stmt.setString(2, "d");
@@ -380,35 +408,79 @@ public class MutableIndexFailureIT extends BaseTest {
stmt.setString(3, "2");
stmt.execute();
// Delete existing row
- stmt = conn.prepareStatement("DELETE FROM " + tableName + " WHERE k=?");
+ stmt = conn.prepareStatement("DELETE FROM " + fullTableName + " WHERE k=?");
stmt.setString(1, "b");
stmt.execute();
try {
conn.commit();
- fail();
- } catch (SQLException e) {
- } catch (Exception e) {
+ if (commitShouldFail) {
+ fail();
+ }
+ } catch (CommitException e) {
+ if (!commitShouldFail) {
+ throw e;
+ }
+ exceptions.add(e);
}
}
+ private void updateTableAgain(Connection conn, boolean commitShouldFail) throws SQLException {
+ // Verify UPSERT on data table still work after index is disabled
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+ stmt.setString(1, "a3");
+ stmt.setString(2, "x3");
+ stmt.setString(3, "3");
+ stmt.execute();
+ try {
+ conn.commit();
+ if (commitShouldFail) {
+ fail();
+ }
+ } catch (CommitException e) {
+ if (!commitShouldFail) {
+ throw e;
+ }
+ exceptions.add(e);
+ }
+ }
+
public static class FailingRegionObserver extends SimpleRegionObserver {
public static volatile boolean FAIL_WRITE = false;
- public static final String INDEX_NAME = "IDX";
+ public static final String FAIL_INDEX_NAME = "FAIL_IDX";
+ public static final String FAIL_TABLE_NAME = "FAIL_TABLE";
+
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
- if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) {
+ boolean throwException = false;
+ if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().endsWith("A_" + FAIL_INDEX_NAME)
+ && FAIL_WRITE) {
+ throwException = true;
+ } else {
+ // When local index updates are atomic with data updates, testing a write failure to a local
+ // index won't make sense.
+ Mutation operation = miniBatchOp.getOperation(0);
+ if (FAIL_WRITE) {
+ Map<byte[],List<Cell>>cellMap = operation.getFamilyCellMap();
+ for (Map.Entry<byte[],List<Cell>> entry : cellMap.entrySet()) {
+ byte[] family = entry.getKey();
+ if (Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) {
+ int regionStartKeyLen = c.getEnvironment().getRegionInfo().getStartKey().length;
+ Cell firstCell = entry.getValue().get(0);
+ short indexId = MetaDataUtil.getViewIndexIdDataType().getCodec().decodeShort(firstCell.getRowArray(), firstCell.getRowOffset() + regionStartKeyLen, SortOrder.getDefault());
+ // Only throw for first local index as the test may have multiple local indexes
+ if (indexId == Short.MIN_VALUE) {
+ throwException = true;
+ break;
+ }
+ }
+ }
+ }
+ }
+ if (throwException) {
dropIndex(c);
throw new DoNotRetryIOException();
}
- Mutation operation = miniBatchOp.getOperation(0);
- Set<byte[]> keySet = operation.getFamilyMap().keySet();
- for(byte[] family: keySet) {
- if(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX) && FAIL_WRITE) {
- dropIndex(c);
- throw new DoNotRetryIOException();
- }
- }
}
private void dropIndex(ObserverContext<RegionCoprocessorEnvironment> c) {
@@ -416,7 +488,7 @@ public class MutableIndexFailureIT extends BaseTest {
Connection connection =
QueryUtil.getConnection(c.getEnvironment().getConfiguration());
connection.createStatement().execute(
- "DROP INDEX IF EXISTS " + INDEX_NAME + "_3" + " ON "
+ "DROP INDEX IF EXISTS " + "B_" + FAIL_INDEX_NAME + " ON "
+ fullTableName);
} catch (ClassNotFoundException e) {
} catch (SQLException e) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f1ecc97/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
deleted file mode 100644
index cf3cb29..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end.index;
-
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
-import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.end2end.BaseOwnClusterIT;
-import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
-import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.hbase.index.Indexer;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.schema.PIndexState;
-import org.apache.phoenix.schema.PTableType;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.StringUtil;
-import org.apache.phoenix.util.TestUtil;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import com.google.common.collect.Maps;
-/**
- *
- * Test for failure of region server to write to index table.
- * For some reason dropping tables after running this test
- * fails unless it runs its own mini cluster.
- *
- *
- * @since 2.1
- */
-
-@Category(NeedsOwnMiniClusterTest.class)
-@RunWith(Parameterized.class)
-public class ReadOnlyIndexFailureIT extends BaseOwnClusterIT {
- public static volatile boolean FAIL_WRITE = false;
- public static final String INDEX_NAME = "IDX";
-
- private String tableName;
- private String indexName;
- private String fullTableName;
- private String fullIndexName;
- private final boolean localIndex;
-
- public ReadOnlyIndexFailureIT(boolean localIndex) {
- this.localIndex = localIndex;
- this.tableName = (localIndex ? "L_" : "") + TestUtil.DEFAULT_DATA_TABLE_NAME;
- this.indexName = INDEX_NAME;
- this.fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
- this.fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
- }
-
- @Parameters(name = "ReadOnlyIndexFailureIT_localIndex={0}") // name is used by failsafe as file name in reports
- public static Collection<Boolean[]> data() {
- return Arrays.asList(new Boolean[][] { { false }, { true } });
- }
-
- @BeforeClass
- public static void doSetup() throws Exception {
- Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
- serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
- serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
- serverProps.put("hbase.client.pause", "5000");
- serverProps.put("hbase.balancer.period", String.valueOf(Integer.MAX_VALUE));
- serverProps.put(QueryServices.INDEX_FAILURE_BLOCK_WRITE, "true");
- serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, "true");
- serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, "1000");
- serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName());
- serverProps.put("hbase.coprocessor.abortonerror", "false");
- serverProps.put(Indexer.CHECK_VERSION_CONF_KEY, "false");
- NUM_SLAVES_BASE = 4;
- setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
- ReadOnlyProps.EMPTY_PROPS);
- }
-
- @Test
- public void testWriteFailureReadOnlyIndex() throws Exception {
- helpTestWriteFailureReadOnlyIndex();
- }
-
- public void helpTestWriteFailureReadOnlyIndex() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- try (Connection conn = driver.connect(url, props)) {
- String query;
- ResultSet rs;
- conn.setAutoCommit(false);
- conn.createStatement().execute(
- "CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
- query = "SELECT * FROM " + fullTableName;
- rs = conn.createStatement().executeQuery(query);
- assertFalse(rs.next());
-
- FAIL_WRITE = false;
- if(localIndex) {
- conn.createStatement().execute(
- "CREATE LOCAL INDEX " + indexName + " ON " + fullTableName
- + " (v1) INCLUDE (v2)");
- } else {
- conn.createStatement().execute(
- "CREATE INDEX " + indexName + " ON " + fullTableName
- + " (v1) INCLUDE (v2)");
- }
-
- query = "SELECT * FROM " + fullIndexName;
- rs = conn.createStatement().executeQuery(query);
- assertFalse(rs.next());
-
- // Verify the metadata for index is correct.
- rs = conn.getMetaData().getTables(null,
- StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName,
- new String[] { PTableType.INDEX.toString() });
- assertTrue(rs.next());
- assertEquals(indexName, rs.getString(3));
- assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
- assertFalse(rs.next());
-
- PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName
- + " VALUES(?,?,?)");
- stmt.setString(1, "1");
- stmt.setString(2, "aaa");
- stmt.setString(3, "a1");
- stmt.execute();
- conn.commit();
-
- FAIL_WRITE = true;
- stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
- stmt.setString(1, "2");
- stmt.setString(2, "bbb");
- stmt.setString(3, "b2");
- stmt.execute();
- try {
- conn.commit();
- fail();
- } catch (SQLException e) {
- }
-
- // Only successfully committed row should be seen
- query = "SELECT /*+ NO_INDEX*/ v1 FROM " + fullTableName;
- rs = conn.createStatement().executeQuery(query);
- assertTrue(rs.next());
- assertEquals("aaa", rs.getString(1));
- assertFalse(rs.next());
-
- // Verify the metadata for index is correct.
- rs = conn.getMetaData().getTables(null,
- StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName,
- new String[] { PTableType.INDEX.toString() });
- assertTrue(rs.next());
- assertEquals(indexName, rs.getString(3));
- // the index is always active for tables upon index table write failure
- assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
- assertFalse(rs.next());
-
- // if the table is transactional the write to the index table will fail because the
- // index has not been disabled
- // Verify UPSERT on data table is blocked after index write failed
- stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
- stmt.setString(1, "3");
- stmt.setString(2, "ccc");
- stmt.setString(3, "3c");
- try {
- stmt.execute();
- /* Writes would be blocked */
- conn.commit();
- fail();
- } catch (SQLException e) {
- assertEquals(SQLExceptionCode.INDEX_FAILURE_BLOCK_WRITE.getErrorCode(), e.getErrorCode());
- }
-
- FAIL_WRITE = false;
- // Second attempt at writing will succeed
- int retries = 0;
- do {
- Thread.sleep(5 * 1000); // sleep 5 secs
- if(!hasIndexDisableTimestamp(conn, indexName)){
- break;
- }
- if (++retries == 5) {
- fail("Failed to rebuild index with allowed time");
- }
- } while(true);
-
- // Verify UPSERT on data table still work after index table is recreated
- stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
- stmt.setString(1, "4");
- stmt.setString(2, "ddd");
- stmt.setString(3, "4d");
- stmt.execute();
- conn.commit();
-
- // verify index table has data
- query = "SELECT count(1) FROM " + fullIndexName;
- rs = conn.createStatement().executeQuery(query);
- assertTrue(rs.next());
- assertEquals(3, rs.getInt(1));
-
- query = "SELECT /*+ INDEX(" + indexName + ") */ v1 FROM " + fullTableName;
- rs = conn.createStatement().executeQuery(query);
- assertTrue(rs.next());
- assertEquals("aaa", rs.getString(1));
- assertTrue(rs.next());
- assertEquals("bbb", rs.getString(1));
- assertTrue(rs.next());
- assertEquals("ddd", rs.getString(1));
- assertFalse(rs.next());
-
- query = "SELECT /*+ NO_INDEX*/ v1 FROM " + fullTableName;
- rs = conn.createStatement().executeQuery(query);
- assertTrue(rs.next());
- assertEquals("aaa", rs.getString(1));
- assertTrue(rs.next());
- assertEquals("bbb", rs.getString(1));
- assertTrue(rs.next());
- assertEquals("ddd", rs.getString(1));
- assertFalse(rs.next());
- }
- }
-
- private static boolean hasIndexDisableTimestamp(Connection conn, String indexName) throws SQLException {
- ResultSet rs = conn.createStatement().executeQuery("SELECT " + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP +
- " FROM " + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME +
- " WHERE " + PhoenixDatabaseMetaData.COLUMN_NAME + " IS NULL" +
- " AND " + PhoenixDatabaseMetaData.TENANT_ID + " IS NULL" +
- " AND " + PhoenixDatabaseMetaData.TABLE_SCHEM + " IS NULL" +
- " AND " + PhoenixDatabaseMetaData.TABLE_NAME + " = '" + indexName + "'");
- assertTrue(rs.next());
- long ts = rs.getLong(1);
- return (!rs.wasNull() && ts > 0);
- }
-
-
- public static class FailingRegionObserver extends SimpleRegionObserver {
- @Override
- public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
- if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) {
- throw new DoNotRetryIOException();
- }
- Mutation operation = miniBatchOp.getOperation(0);
- Set<byte[]> keySet = operation.getFamilyMap().keySet();
- for(byte[] family: keySet) {
- if(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX) && FAIL_WRITE) {
- throw new DoNotRetryIOException();
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f1ecc97/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index cee545a..fe9be6e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -79,6 +79,7 @@ import org.apache.phoenix.schema.ReadOnlyTableException;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.IndexUtil;
@@ -533,6 +534,10 @@ public class DeleteCompiler {
} else if (runOnServer) {
// TODO: better abstraction
Scan scan = context.getScan();
+ // Propagate IGNORE_NEWER_MUTATIONS when replaying mutations since there will be
+ // future dated data row mutations that will get in the way of generating the
+ // correct index rows on replay.
+ scan.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES);
scan.setAttribute(BaseScannerRegionObserver.DELETE_AGG, QueryConstants.TRUE);
// Build an ungrouped aggregate query: select COUNT(*) from <table> where <where>
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f1ecc97/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index bbbd483..e5307d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -713,6 +713,10 @@ public class UpsertCompiler {
*/
final StatementContext context = queryPlan.getContext();
final Scan scan = context.getScan();
+ // Propagate IGNORE_NEWER_MUTATIONS when replaying mutations since there will be
+ // future dated data row mutations that will get in the way of generating the
+ // correct index rows on replay.
+ scan.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES);
scan.setAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE, UngroupedAggregateRegionObserver.serialize(projectedTable));
scan.setAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS, UngroupedAggregateRegionObserver.serialize(projectedExpressions));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f1ecc97/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 4ad3a8c..d36bd7e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -3418,7 +3418,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
Cell newDisableTimeStampCell = newKVs.get(disableTimeStampKVIndex);
long newDisableTimeStamp = (Long) PLong.INSTANCE.toObject(newDisableTimeStampCell.getValueArray(),
newDisableTimeStampCell.getValueOffset(), newDisableTimeStampCell.getValueLength());
- if(curTimeStampVal > 0 && curTimeStampVal < newDisableTimeStamp){
+ // We use the sign of the INDEX_DISABLE_TIMESTAMP to differentiate the keep-index-active (negative)
+ // from block-writes-to-data-table case. In either case, we want to keep the oldest timestamp to
+ // drive the partial index rebuild rather than update it with each attempt to update the index
+ // when a new data table write occurs.
+ if (curTimeStampVal != 0 && Math.abs(curTimeStampVal) < Math.abs(newDisableTimeStamp)) {
// not reset disable timestamp
newKVs.remove(disableTimeStampKVIndex);
disableTimeStampKVIndex = -1;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f1ecc97/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index 9482d37..ce42de6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -20,7 +20,6 @@ package org.apache.phoenix.coprocessor;
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
import java.io.IOException;
-import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
@@ -74,7 +73,6 @@ import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.types.PLong;
@@ -100,7 +98,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
protected ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
private boolean enableRebuildIndex = QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD;
private long rebuildIndexTimeInterval = QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL;
- private boolean blockWriteRebuildIndex = false;
private static Map<PName, Long> batchExecutedPerTableMap = new HashMap<PName, Long>();
@Override
@@ -128,8 +125,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD);
rebuildIndexTimeInterval = env.getConfiguration().getLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB,
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL);
- blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE,
- QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
}
@@ -172,7 +167,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
t.setDaemon(true);
t.start();
- if (!enableRebuildIndex && !blockWriteRebuildIndex) {
+ if (!enableRebuildIndex) {
LOG.info("Failure Index Rebuild is skipped by configuration.");
return;
}
@@ -229,7 +224,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
Scan scan = new Scan();
SingleColumnValueFilter filter = new SingleColumnValueFilter(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
- CompareFilter.CompareOp.GREATER, PLong.INSTANCE.toBytes(0L));
+ CompareFilter.CompareOp.NOT_EQUAL, PLong.INSTANCE.toBytes(0L));
filter.setFilterIfMissing(true);
scan.setFilter(filter);
scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
@@ -240,10 +235,8 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
- PreparedStatement updateDisabledTimeStampSmt = null;
Map<PTable, List<PTable>> dataTableToIndexesMap = null;
- MetaDataClient client = null;
boolean hasMore = false;
List<Cell> results = new ArrayList<Cell>();
scanner = this.env.getRegion().getScanner(scan);
@@ -259,17 +252,10 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
byte[] indexState = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
- if (disabledTimeStamp == null || disabledTimeStamp.length == 0 || (indexState != null
- && PIndexState.BUILDING == PIndexState.fromSerializedValue(Bytes.toString(indexState)))) {
- // Don't rebuild the building index , because they are marked for aysnc
+ if (disabledTimeStamp == null || disabledTimeStamp.length == 0) {
continue;
}
- // disableTimeStamp has to be a positive value
- long disabledTimeStampVal = PLong.INSTANCE.getCodec().decodeLong(disabledTimeStamp, 0, SortOrder.getDefault());
- if (disabledTimeStampVal <= 0) {
- continue;
- }
byte[] dataTable = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES);
if ((dataTable == null || dataTable.length == 0) || (indexState == null || indexState.length == 0)) {
@@ -302,7 +288,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
// don't run a second index populations upsert select
props.setProperty(QueryServices.INDEX_POPULATION_SLEEP_TIME, "0");
conn = QueryUtil.getConnectionOnServer(props, env.getConfiguration()).unwrap(PhoenixConnection.class);
- client = new MetaDataClient(conn);
dataTableToIndexesMap = Maps.newHashMap();
}
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTable);
@@ -331,7 +316,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
dataTableToIndexesMap.put(dataPTable, indexesToPartiallyRebuild);
}
LOG.debug("We have found " + indexPTable.getIndexState() + " Index:" + indexPTable.getName()
- + " on data table:" + dataPTable.getName() + " which was disabled at "
+ + " on data table:" + dataPTable.getName() + " which failed to be updated at "
+ indexPTable.getIndexDisableTimestamp());
indexesToPartiallyRebuild.add(indexPTable);
} while (hasMore);
@@ -349,9 +334,22 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
long earliestDisableTimestamp = Long.MAX_VALUE;
List<IndexMaintainer> maintainers = Lists
.newArrayListWithExpectedSize(indexesToPartiallyRebuild.size());
+ int signOfDisableTimeStamp = 0;
for (PTable index : indexesToPartiallyRebuild) {
+ // We need a way of differentiating the block writes to data table case from
+ // the leave index active case. In either case, we need to know the time stamp
+ // at which writes started failing so we can rebuild from that point. If we
+ // keep the index active *and* have a positive INDEX_DISABLE_TIMESTAMP_BYTES,
+ // then writes to the data table will be blocked (this is client side logic
+ // and we can't change this in a minor release). So we use the sign of the
+ // time stamp to differentiate.
long disabledTimeStampVal = index.getIndexDisableTimestamp();
- if (disabledTimeStampVal > 0) {
+ if (disabledTimeStampVal != 0) {
+ if (signOfDisableTimeStamp != 0 && signOfDisableTimeStamp != Long.signum(disabledTimeStampVal)) {
+ LOG.warn("Found unexpected mix of signs with INDEX_DISABLE_TIMESTAMP for " + dataPTable.getName().getString() + " with " + indexesToPartiallyRebuild);
+ }
+ signOfDisableTimeStamp = Long.signum(disabledTimeStampVal);
+ disabledTimeStampVal = Math.abs(disabledTimeStampVal);
if (disabledTimeStampVal < earliestDisableTimestamp) {
earliestDisableTimestamp = disabledTimeStampVal;
}
@@ -409,8 +407,8 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
batchExecutedPerTableMap.remove(dataPTable.getName());
LOG.info("Making Index:" + indexPTable.getTableName() + " active after rebuilding");
} else {
-
- updateDisableTimestamp(conn, indexTableFullName, env, scanEndTime, metaTable);
+ // Maintain sign of INDEX_DISABLE_TIMESTAMP (see comment above)
+ updateDisableTimestamp(conn, indexTableFullName, env, scanEndTime * signOfDisableTimeStamp, metaTable);
Long noOfBatches = batchExecutedPerTableMap.get(dataPTable.getName());
if (noOfBatches == null) {
noOfBatches = 0l;
@@ -507,7 +505,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
PLong.INSTANCE.toBytes(disabledTimestamp));
metaTable.checkAndPut(indexTableKey, PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
- PhoenixDatabaseMetaData.INDEX_STATE_BYTES, CompareOp.EQUAL, PIndexState.INACTIVE.getSerializedBytes(),
+ PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, CompareOp.NOT_EQUAL, PLong.INSTANCE.toBytes(0),
put);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f1ecc97/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 49ef884..a056807 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -379,6 +379,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
RegionScanner theScanner = s;
+ boolean replayMutations = scan.getAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS) != null;
byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID);
byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE);
List<Expression> selectExpressions = null;
@@ -610,6 +611,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
Cell firstKV = results.get(0);
Delete delete = new Delete(firstKV.getRowArray(),
firstKV.getRowOffset(), firstKV.getRowLength(),ts);
+ if (replayMutations) {
+ delete.setAttribute(IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES);
+ }
mutations.add(delete);
// force tephra to ignore this deletes
delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
@@ -661,6 +665,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
}
for (Mutation mutation : row.toRowMutations()) {
+ if (replayMutations) {
+ mutation.setAttribute(IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES);
+ }
mutations.add(mutation);
}
for (i = 0; i < selectExpressions.size(); i++) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f1ecc97/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 2836c45..35ba187 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -182,6 +182,8 @@ public enum SQLExceptionCode {
ROWTIMESTAMP_COL_INVALID_TYPE(530, "42907", "A column can be added as ROW_TIMESTAMP only if it is of type DATE, BIGINT, TIME OR TIMESTAMP."),
ROWTIMESTAMP_NOT_ALLOWED_ON_VIEW(531, "42908", "Declaring a column as row_timestamp is not allowed for views."),
INVALID_SCN(532, "42909", "Value of SCN cannot be less than zero."),
+ INVALID_REPLAY_AT(533, "42910", "Value of REPLAY_AT cannot be less than zero."),
+ UNEQUAL_SCN_AND_REPLAY_AT(534, "42911", "If both specified, values of CURRENT_SCN and REPLAY_AT must be equal."),
/**
* HBase and Phoenix specific implementation defined sub-classes.
* Column family related exceptions.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f1ecc97/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
index a9d8311..b0d22d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
@@ -24,10 +24,16 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
public class CommitException extends SQLException {
private static final long serialVersionUID = 2L;
private final int[] uncommittedStatementIndexes;
+ private final long serverTimestamp;
- public CommitException(Exception e, int[] uncommittedStatementIndexes) {
+ public CommitException(Exception e, int[] uncommittedStatementIndexes, long serverTimestamp) {
super(e);
this.uncommittedStatementIndexes = uncommittedStatementIndexes;
+ this.serverTimestamp = serverTimestamp;
+ }
+
+ public long getServerTimestamp() {
+ return this.serverTimestamp;
}
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f1ecc97/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index d32199b..6144c7f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -82,6 +82,7 @@ import org.apache.phoenix.schema.RowKeySchema;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.ValueSchema.Field;
+import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.util.ByteUtil;
@@ -673,6 +674,14 @@ public class MutationState implements SQLCloseable {
rowMutationsPertainingToIndex = rowMutations;
}
mutationList.addAll(rowMutations);
+ if (connection.isReplayMutations()) {
+ // Propagate IGNORE_NEWER_MUTATIONS when replaying mutations since there will be
+ // future dated data row mutations that will get in the way of generating the
+ // correct index rows on replay.
+ for (Mutation mutation : rowMutations) {
+ mutation.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES);
+ }
+ }
if (mutationsPertainingToIndex != null) mutationsPertainingToIndex
.addAll(rowMutationsPertainingToIndex);
}
@@ -1030,6 +1039,7 @@ public class MutationState implements SQLCloseable {
joinMutationState(new TableRef(tableRef), valuesMap, txMutations);
}
}
+ long serverTimestamp = HConstants.LATEST_TIMESTAMP;
Iterator<Entry<TableInfo, List<Mutation>>> mutationsIterator = physicalTableMutationMap.entrySet().iterator();
while (mutationsIterator.hasNext()) {
Entry<TableInfo, List<Mutation>> pair = mutationsIterator.next();
@@ -1106,6 +1116,7 @@ public class MutationState implements SQLCloseable {
// Remove batches as we process them
mutations.remove(origTableRef);
} catch (Exception e) {
+ serverTimestamp = ServerUtil.parseServerTimestamp(e);
SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e);
if (inferredE != null) {
if (shouldRetry && retryCount == 0 && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) {
@@ -1127,7 +1138,7 @@ public class MutationState implements SQLCloseable {
}
// Throw to client an exception that indicates the statements that
// were not committed successfully.
- sqlE = new CommitException(e, getUncommittedStatementIndexes());
+ sqlE = new CommitException(e, getUncommittedStatementIndexes(), serverTimestamp);
} finally {
try {
if (cache!=null)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f1ecc97/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
index 831aa16..a037e92 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
@@ -61,6 +61,10 @@ public class IndexWriter implements Stoppable {
this(getCommitter(env), getFailurePolicy(env), env, name);
}
+ public IndexWriter(IndexFailurePolicy failurePolicy, RegionCoprocessorEnvironment env, String name) throws IOException {
+ this(getCommitter(env), failurePolicy, env, name);
+ }
+
public static IndexCommitter getCommitter(RegionCoprocessorEnvironment env) throws IOException {
Configuration conf = env.getConfiguration();
try {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f1ecc97/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/LeaveIndexActiveFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/LeaveIndexActiveFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/LeaveIndexActiveFailurePolicy.java
new file mode 100644
index 0000000..edacd3a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/LeaveIndexActiveFailurePolicy.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.write;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.util.ServerUtil;
+
+import com.google.common.collect.Multimap;
+
+/**
+ *
+ * Implementation of IndexFailurePolicy which takes no action when an
+ * index cannot be updated. As with the standard flow of control, an
+ * exception will still be thrown back to the client. Using this failure
+ * policy means that the action to take upon failure is completely up
+ * to the client.
+ *
+ */
+public class LeaveIndexActiveFailurePolicy implements IndexFailurePolicy {
+
+ @Override
+ public boolean isStopped() {
+ return false;
+ }
+
+ @Override
+ public void stop(String arg0) {
+ }
+
+ @Override
+ public void setup(Stoppable parent, RegionCoprocessorEnvironment env) {
+ }
+
+ @Override
+ public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause)
+ throws IOException {
+ // get timestamp of first cell
+ long ts = attempted.values().iterator().next().getFamilyCellMap().values().iterator().next().get(0).getTimestamp();
+ throw ServerUtil.wrapInDoNotRetryIOException("Unable to update the following indexes: " + attempted.keySet(), cause, ts);
+ }
+
+}