You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by vi...@apache.org on 2018/01/31 18:27:57 UTC
[1/2] phoenix git commit: Revert "PHOENIX-4130 Avoid server retries
for mutable indexes"
Repository: phoenix
Updated Branches:
refs/heads/4.x-HBase-1.2 d1241a09c -> 878a264e5
http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
index bc2b625..cd23dc5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
@@ -119,25 +119,6 @@ public class ServerUtil {
}
return new PhoenixIOException(t);
}
-
- /**
- * Return the first SQLException in the exception chain, otherwise parse it.
- * When we're receiving an exception locally, there's no need to string parse,
- * as the SQLException will already be part of the chain.
- * @param t
- * @return the SQLException, or null if none found
- */
- public static SQLException parseLocalOrRemoteServerException(Throwable t) {
- while (t.getCause() != null) {
- if (t instanceof NotServingRegionException) {
- return parseRemoteException(new StaleRegionBoundaryCacheException());
- } else if (t instanceof SQLException) {
- return (SQLException) t;
- }
- t = t.getCause();
- }
- return parseRemoteException(t);
- }
public static SQLException parseServerExceptionOrNull(Throwable t) {
while (t.getCause() != null) {
@@ -215,7 +196,7 @@ public class ServerUtil {
return parseTimestampFromRemoteException(t);
}
- public static long parseTimestampFromRemoteException(Throwable t) {
+ private static long parseTimestampFromRemoteException(Throwable t) {
String message = t.getLocalizedMessage();
if (message != null) {
// If the message matches the standard pattern, recover the SQLException and throw it.
@@ -235,7 +216,7 @@ public class ServerUtil {
msg = "";
}
if (t instanceof SQLException) {
- msg = t.getMessage() + " " + msg;
+ msg = constructSQLErrorMessage((SQLException) t, msg);
}
msg += String.format(FORMAT_FOR_TIMESTAMP, timestamp);
return new DoNotRetryIOException(msg, t);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
index 918c411..b0e3780 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
@@ -105,10 +105,6 @@ public class TestIndexWriter {
Configuration conf =new Configuration();
Mockito.when(e.getConfiguration()).thenReturn(conf);
Mockito.when(e.getSharedData()).thenReturn(new ConcurrentHashMap<String,Object>());
- Region mockRegion = Mockito.mock(Region.class);
- Mockito.when(e.getRegion()).thenReturn(mockRegion);
- HTableDescriptor mockTableDesc = Mockito.mock(HTableDescriptor.class);
- Mockito.when(mockRegion.getTableDesc()).thenReturn(mockTableDesc);
ExecutorService exec = Executors.newFixedThreadPool(1);
Map<ImmutableBytesPtr, HTableInterface> tables = new HashMap<ImmutableBytesPtr, HTableInterface>();
FakeTableFactory factory = new FakeTableFactory(tables);
@@ -165,10 +161,6 @@ public class TestIndexWriter {
Configuration conf =new Configuration();
Mockito.when(e.getConfiguration()).thenReturn(conf);
Mockito.when(e.getSharedData()).thenReturn(new ConcurrentHashMap<String,Object>());
- Region mockRegion = Mockito.mock(Region.class);
- Mockito.when(e.getRegion()).thenReturn(mockRegion);
- HTableDescriptor mockTableDesc = Mockito.mock(HTableDescriptor.class);
- Mockito.when(mockRegion.getTableDesc()).thenReturn(mockTableDesc);
FakeTableFactory factory = new FakeTableFactory(tables);
byte[] tableName = this.testName.getTableName();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java
index bfe1d0d..3e2b47c 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java
@@ -30,13 +30,11 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.phoenix.hbase.index.StubAbortable;
@@ -95,10 +93,6 @@ public class TestParalleIndexWriter {
Configuration conf =new Configuration();
Mockito.when(e.getConfiguration()).thenReturn(conf);
Mockito.when(e.getSharedData()).thenReturn(new ConcurrentHashMap<String,Object>());
- Region mockRegion = Mockito.mock(Region.class);
- Mockito.when(e.getRegion()).thenReturn(mockRegion);
- HTableDescriptor mockTableDesc = Mockito.mock(HTableDescriptor.class);
- Mockito.when(mockRegion.getTableDesc()).thenReturn(mockTableDesc);
ImmutableBytesPtr tableName = new ImmutableBytesPtr(this.test.getTableName());
Put m = new Put(row);
m.add(Bytes.toBytes("family"), Bytes.toBytes("qual"), null);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java
index 6f0881b..32a6661 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java
@@ -30,13 +30,11 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.phoenix.hbase.index.StubAbortable;
@@ -90,10 +88,6 @@ public class TestParalleWriterIndexCommitter {
Configuration conf =new Configuration();
Mockito.when(e.getConfiguration()).thenReturn(conf);
Mockito.when(e.getSharedData()).thenReturn(new ConcurrentHashMap<String,Object>());
- Region mockRegion = Mockito.mock(Region.class);
- Mockito.when(e.getRegion()).thenReturn(mockRegion);
- HTableDescriptor mockTableDesc = Mockito.mock(HTableDescriptor.class);
- Mockito.when(mockRegion.getTableDesc()).thenReturn(mockTableDesc);
Stoppable stop = Mockito.mock(Stoppable.class);
ExecutorService exec = Executors.newFixedThreadPool(1);
Map<ImmutableBytesPtr, HTableInterface> tables =
[2/2] phoenix git commit: Revert "PHOENIX-4130 Avoid server retries
for mutable indexes"
Posted by vi...@apache.org.
Revert "PHOENIX-4130 Avoid server retries for mutable indexes"
This reverts commit d1241a09c24925a46c6a0e64252d0bbbcd991c58.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/878a264e
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/878a264e
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/878a264e
Branch: refs/heads/4.x-HBase-1.2
Commit: 878a264e5d1f6316d611be1f31a4bcde620c4c8e
Parents: d1241a0
Author: Vincent Poon <vi...@apache.org>
Authored: Wed Jan 31 10:09:54 2018 -0800
Committer: Vincent Poon <vi...@apache.org>
Committed: Wed Jan 31 10:09:54 2018 -0800
----------------------------------------------------------------------
.../end2end/index/MutableIndexFailureIT.java | 12 +-
.../end2end/index/PartialIndexRebuilderIT.java | 76 ++------
.../coprocessor/MetaDataEndpointImpl.java | 53 ++----
.../phoenix/coprocessor/MetaDataProtocol.java | 6 +-
.../coprocessor/MetaDataRegionObserver.java | 19 +-
.../UngroupedAggregateRegionObserver.java | 82 ++------
.../phoenix/exception/SQLExceptionCode.java | 1 -
.../apache/phoenix/execute/MutationState.java | 39 +---
.../org/apache/phoenix/hbase/index/Indexer.java | 10 +
.../index/exception/IndexWriteException.java | 49 +----
.../MultiIndexWriteFailureException.java | 29 +--
.../SingleIndexWriteFailureException.java | 23 +--
.../hbase/index/write/IndexWriterUtils.java | 14 +-
.../write/ParallelWriterIndexCommitter.java | 5 +-
.../TrackingParallelWriterIndexCommitter.java | 5 +-
.../index/PhoenixIndexFailurePolicy.java | 189 ++-----------------
.../phoenix/jdbc/PhoenixDatabaseMetaData.java | 1 -
.../apache/phoenix/optimize/QueryOptimizer.java | 29 +--
.../org/apache/phoenix/query/QueryServices.java | 2 -
.../phoenix/query/QueryServicesOptions.java | 1 -
.../org/apache/phoenix/schema/PIndexState.java | 7 +-
.../org/apache/phoenix/util/KeyValueUtil.java | 12 --
.../org/apache/phoenix/util/ServerUtil.java | 23 +--
.../hbase/index/write/TestIndexWriter.java | 8 -
.../index/write/TestParalleIndexWriter.java | 6 -
.../write/TestParalleWriterIndexCommitter.java | 6 -
26 files changed, 116 insertions(+), 591 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index c2e0cb6..0318925 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -29,6 +29,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -129,6 +130,7 @@ public class MutableIndexFailureIT extends BaseTest {
public static void doSetup() throws Exception {
Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName());
+ serverProps.put(IndexWriterUtils.INDEX_WRITER_RPC_RETRIES_NUMBER, "2");
serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
serverProps.put(IndexWriterUtils.INDEX_WRITER_RPC_PAUSE, "5000");
serverProps.put("data.tx.snapshot.dir", "/tmp");
@@ -142,8 +144,7 @@ public class MutableIndexFailureIT extends BaseTest {
* because we want to control it's execution ourselves
*/
serverProps.put(QueryServices.INDEX_REBUILD_TASK_INITIAL_DELAY, Long.toString(Long.MAX_VALUE));
- Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
- clientProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
+ Map<String, String> clientProps = Collections.singletonMap(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
NUM_SLAVES_BASE = 4;
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
indexRebuildTaskRegionEnvironment =
@@ -160,8 +161,7 @@ public class MutableIndexFailureIT extends BaseTest {
@Parameters(name = "MutableIndexFailureIT_transactional={0},localIndex={1},isNamespaceMapped={2},disableIndexOnWriteFailure={3},failRebuildTask={4},throwIndexWriteFailure={5}") // name is used by failsafe as file name in reports
public static List<Object[]> data() {
return Arrays.asList(new Object[][] {
- // note - can't disableIndexOnWriteFailure without throwIndexWriteFailure, PHOENIX-4130
- { false, false, false, false, false, false},
+ { false, false, false, true, false, false},
{ false, false, true, true, false, null},
{ false, false, true, true, false, true},
{ false, false, false, true, false, null},
@@ -181,8 +181,8 @@ public class MutableIndexFailureIT extends BaseTest {
{ false, true, false, true, false, null},
{ false, false, false, true, true, null},
{ false, false, true, true, true, null},
- { false, false, false, false, true, false},
- { false, false, true, false, true, false},
+ { false, false, false, true, true, false},
+ { false, false, true, true, true, false},
}
);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
index dd986aa..31649bd 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
@@ -36,7 +36,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
@@ -47,13 +46,12 @@ import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.phoenix.coprocessor.MetaDataRegionObserver;
import org.apache.phoenix.coprocessor.MetaDataRegionObserver.BuildIndexScheduleTask;
import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
-import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.PhoenixIOException;
import org.apache.phoenix.execute.CommitException;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PMetaData;
import org.apache.phoenix.schema.PTable;
@@ -96,9 +94,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, "50000000");
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, Long.toString(REBUILD_PERIOD)); // batch at 50 seconds
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB, Long.toString(WAIT_AFTER_DISABLED));
- Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
- clientProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
- setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
+ setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), ReadOnlyProps.EMPTY_PROPS);
indexRebuildTaskRegionEnvironment =
(RegionCoprocessorEnvironment) getUtility()
.getRSForFirstRegionInTable(
@@ -1031,51 +1027,6 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
}
}
- // Tests that when we've been in PENDING_DISABLE for too long, queries don't use the index,
- // and the rebuilder should mark the index DISABLED
- @Test
- public void testPendingDisable() throws Throwable {
- String schemaName = generateUniqueName();
- String tableName = generateUniqueName();
- String indexName = generateUniqueName();
- final String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
- final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
- final MyClock clock = new MyClock(1000);
- EnvironmentEdgeManager.injectEdge(clock);
- try (Connection conn = DriverManager.getConnection(getUrl())) {
- conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, v3 VARCHAR) COLUMN_ENCODED_BYTES = 0, DISABLE_INDEX_ON_WRITE_FAILURE = TRUE");
- clock.time += 100;
- conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1, v2)");
- clock.time += 100;
- conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0')");
- conn.commit();
- clock.time += 100;
- HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
- IndexUtil.updateIndexState(fullIndexName, clock.currentTime(), metaTable, PIndexState.PENDING_DISABLE);
- Configuration conf =
- conn.unwrap(PhoenixConnection.class).getQueryServices().getConfiguration();
- // under threshold should use the index
- PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class);
- ResultSet rs = stmt.executeQuery("SELECT V2 FROM " + fullTableName + " WHERE V1 = 'a'");
- assertTrue(rs.next());
- assertEquals("0", rs.getString(1));
- assertEquals(fullIndexName, stmt.getQueryPlan().getContext().getCurrentTable().getTable().getName().getString());
- // over threshold should not use the index
- long pendingDisableThreshold = conf.getLong(QueryServices.INDEX_PENDING_DISABLE_THRESHOLD,
- QueryServicesOptions.DEFAULT_INDEX_PENDING_DISABLE_THRESHOLD);
- clock.time += pendingDisableThreshold + 1000;
- stmt = conn.createStatement().unwrap(PhoenixStatement.class);
- rs = stmt.executeQuery("SELECT V2 FROM " + fullTableName + " WHERE V1 = 'a'");
- assertTrue(rs.next());
- assertEquals("0", rs.getString(1));
- assertEquals(fullTableName, stmt.getQueryPlan().getContext().getCurrentTable().getTable().getName().getString());
- // if we're over the threshold, the rebuilder should disable the index
- waitForIndexState(conn, fullTableName, fullIndexName, PIndexState.DISABLE);
- } finally {
- EnvironmentEdgeManager.reset();
- }
- }
-
//Tests that when we're updating an index from within the RS (e.g. UngruopedAggregateRegionObserver),
// if the index write fails the index gets disabled
@Test
@@ -1097,9 +1048,22 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
try {
conn.createStatement().execute("DELETE FROM " + fullTableName);
fail();
- } catch (SQLException e) {
+ } catch (CommitException|PhoenixIOException e) {
+ // Expected
+ }
+ assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.DISABLE, null));
+ // reset the index state to ACTIVE
+ HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+ IndexUtil.updateIndexState(fullIndexName, 0, metaTable, PIndexState.INACTIVE);
+ IndexUtil.updateIndexState(fullIndexName, 0, metaTable, PIndexState.ACTIVE);
+ TestUtil.removeCoprocessor(conn, fullIndexName, WriteFailingRegionObserver.class);
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0', 't')");
+ TestUtil.addCoprocessor(conn, fullIndexName, WriteFailingRegionObserver.class);
+ try {
+ conn.createStatement().execute("DELETE FROM " + fullTableName + " WHERE v1='a'");
+ fail();
+ } catch (CommitException|PhoenixIOException e) {
// Expected
- assertEquals(SQLExceptionCode.INDEX_WRITE_FAILURE.getErrorCode(), e.getErrorCode());
}
assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.DISABLE, null));
} finally {
@@ -1111,12 +1075,6 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
public static class WriteFailingRegionObserver extends SimpleRegionObserver {
@Override
public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
- // we need to advance the clock, since the index retry logic (copied from HBase) has a time component
- EnvironmentEdge delegate = EnvironmentEdgeManager.getDelegate();
- if (delegate instanceof MyClock) {
- MyClock myClock = (MyClock) delegate;
- myClock.time += 1000;
- }
throw new DoNotRetryIOException("Simulating write failure on " + c.getEnvironment().getRegionInfo().getTable().getNameAsString());
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index b75bf3d..bf8ba39 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -541,28 +541,23 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
TableName.valueOf(table.getPhysicalName().getBytes()));
builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
- builder.setMutationTime(currentTime);
- if (blockWriteRebuildIndex) {
- long disableIndexTimestamp = table.getIndexDisableTimestamp();
- long minNonZerodisableIndexTimestamp = disableIndexTimestamp > 0 ? disableIndexTimestamp : Long.MAX_VALUE;
- for (PTable index : table.getIndexes()) {
- disableIndexTimestamp = index.getIndexDisableTimestamp();
- if (disableIndexTimestamp > 0
- && (index.getIndexState() == PIndexState.ACTIVE
- || index.getIndexState() == PIndexState.PENDING_ACTIVE
- || index.getIndexState() == PIndexState.PENDING_DISABLE)
- && disableIndexTimestamp < minNonZerodisableIndexTimestamp) {
- minNonZerodisableIndexTimestamp = disableIndexTimestamp;
- }
- }
- // Freeze time for table at min non-zero value of INDEX_DISABLE_TIMESTAMP
- // This will keep the table consistent with index as the table has had one more
- // batch applied to it.
- if (minNonZerodisableIndexTimestamp != Long.MAX_VALUE) {
- // Subtract one because we add one due to timestamp granularity in Windows
- builder.setMutationTime(minNonZerodisableIndexTimestamp - 1);
+ long disableIndexTimestamp = table.getIndexDisableTimestamp();
+ long minNonZerodisableIndexTimestamp = disableIndexTimestamp > 0 ? disableIndexTimestamp : Long.MAX_VALUE;
+ for (PTable index : table.getIndexes()) {
+ disableIndexTimestamp = index.getIndexDisableTimestamp();
+ if (disableIndexTimestamp > 0 && (index.getIndexState() == PIndexState.ACTIVE || index.getIndexState() == PIndexState.PENDING_ACTIVE) && disableIndexTimestamp < minNonZerodisableIndexTimestamp) {
+ minNonZerodisableIndexTimestamp = disableIndexTimestamp;
}
}
+ // Freeze time for table at min non-zero value of INDEX_DISABLE_TIMESTAMP
+ // This will keep the table consistent with index as the table has had one more
+ // batch applied to it.
+ if (minNonZerodisableIndexTimestamp == Long.MAX_VALUE) {
+ builder.setMutationTime(currentTime);
+ } else {
+ // Subtract one because we add one due to timestamp granularity in Windows
+ builder.setMutationTime(minNonZerodisableIndexTimestamp - 1);
+ }
if (table.getTimeStamp() != tableTimeStamp) {
builder.setTable(PTableImpl.toProto(table));
@@ -928,12 +923,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
if (indexState == PIndexState.PENDING_ACTIVE && clientVersion < PhoenixDatabaseMetaData.MIN_PENDING_ACTIVE_INDEX) {
indexState = PIndexState.ACTIVE;
}
- // If client is not yet up to 4.14, then translate PENDING_DISABLE to DISABLE
- // since the client won't have this index state in its enum.
- if (indexState == PIndexState.PENDING_DISABLE && clientVersion < PhoenixDatabaseMetaData.MIN_PENDING_DISABLE_INDEX) {
- // note: for older clients, we have to rely on the rebuilder to transition PENDING_DISABLE -> DISABLE
- indexState = PIndexState.DISABLE;
- }
Cell immutableRowsKv = tableKeyValues[IMMUTABLE_ROWS_INDEX];
boolean isImmutableRows =
immutableRowsKv == null ? false : (Boolean) PBoolean.INSTANCE.toObject(
@@ -3672,7 +3661,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
// Timestamp of INDEX_STATE gets updated with each call
long actualTimestamp = currentStateKV.getTimestamp();
long curTimeStampVal = 0;
- long newDisableTimeStamp = 0;
if ((currentDisableTimeStamp != null && currentDisableTimeStamp.getValueLength() > 0)) {
curTimeStampVal = (Long) PLong.INSTANCE.toObject(currentDisableTimeStamp.getValueArray(),
currentDisableTimeStamp.getValueOffset(), currentDisableTimeStamp.getValueLength());
@@ -3689,7 +3677,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
done.run(builder.build());
return;
}
- newDisableTimeStamp = (Long) PLong.INSTANCE.toObject(newDisableTimeStampCell.getValueArray(),
+ long newDisableTimeStamp = (Long) PLong.INSTANCE.toObject(newDisableTimeStampCell.getValueArray(),
newDisableTimeStampCell.getValueOffset(), newDisableTimeStampCell.getValueLength());
// We use the sign of the INDEX_DISABLE_TIMESTAMP to differentiate the keep-index-active (negative)
// from block-writes-to-data-table case. In either case, we want to keep the oldest timestamp to
@@ -3698,7 +3686,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
// We do legitimately move the INDEX_DISABLE_TIMESTAMP to be newer when we're rebuilding the
// index in which case the state will be INACTIVE or PENDING_ACTIVE.
if (curTimeStampVal != 0
- && (newState == PIndexState.DISABLE || newState == PIndexState.PENDING_ACTIVE || newState == PIndexState.PENDING_DISABLE)
+ && (newState == PIndexState.DISABLE || newState == PIndexState.PENDING_ACTIVE)
&& Math.abs(curTimeStampVal) < Math.abs(newDisableTimeStamp)) {
// do not reset disable timestamp as we want to keep the min
newKVs.remove(disableTimeStampKVIndex);
@@ -3727,13 +3715,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
if (newState == PIndexState.ACTIVE) {
newState = PIndexState.DISABLE;
}
- // Can't transition from DISABLE to PENDING_DISABLE
- if (newState == PIndexState.PENDING_DISABLE) {
- builder.setReturnCode(MetaDataProtos.MutationCode.UNALLOWED_TABLE_MUTATION);
- builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
- done.run(builder.build());
- return;
- }
}
if (currentState == PIndexState.BUILDING && newState != PIndexState.ACTIVE) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index efad1e7..fe11ec7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -63,7 +63,7 @@ import com.google.protobuf.ByteString;
*/
public abstract class MetaDataProtocol extends MetaDataService {
public static final int PHOENIX_MAJOR_VERSION = 4;
- public static final int PHOENIX_MINOR_VERSION = 14;
+ public static final int PHOENIX_MINOR_VERSION = 13;
public static final int PHOENIX_PATCH_NUMBER = 0;
public static final int PHOENIX_VERSION =
VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER);
@@ -93,9 +93,8 @@ public abstract class MetaDataProtocol extends MetaDataService {
// Since there's no upgrade code, keep the version the same as the previous version
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_12_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0;
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_13_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0;
- public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0;
// MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the MIN_SYSTEM_TABLE_TIMESTAMP_* constants
- public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0;
+ public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_4_13_0;
// ALWAYS update this map whenever rolling out a new release (major, minor or patch release).
// Key is the SYSTEM.CATALOG timestamp for the version and value is the version string.
@@ -115,7 +114,6 @@ public abstract class MetaDataProtocol extends MetaDataService {
TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0, "4.11.x");
TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_12_0, "4.12.x");
TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_13_0, "4.13.x");
- TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0, "4.14.x");
}
public static final String CURRENT_CLIENT_VERSION = PHOENIX_MAJOR_VERSION + "." + PHOENIX_MINOR_VERSION + "." + PHOENIX_PATCH_NUMBER;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index e51a61e..af06235 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -229,7 +229,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
private final long rebuildIndexBatchSize;
private final long configuredBatches;
private final long indexDisableTimestampThreshold;
- private final long pendingDisableThreshold;
private final ReadOnlyProps props;
private final List<String> onlyTheseTables;
@@ -248,9 +247,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
this.indexDisableTimestampThreshold =
configuration.getLong(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD,
QueryServicesOptions.DEFAULT_INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD);
- this.pendingDisableThreshold =
- configuration.getLong(QueryServices.INDEX_PENDING_DISABLE_THRESHOLD,
- QueryServicesOptions.DEFAULT_INDEX_PENDING_DISABLE_THRESHOLD);
this.props = new ReadOnlyProps(env.getConfiguration().iterator());
}
@@ -346,18 +342,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
}
PIndexState indexState = PIndexState.fromSerializedValue(indexStateBytes[0]);
- long elapsedSinceDisable = EnvironmentEdgeManager.currentTimeMillis() - Math.abs(indexDisableTimestamp);
-
- // on an index write failure, the server side transitions to PENDING_DISABLE, then the client
- // retries, and after retries are exhausted, disables the index
- if (indexState == PIndexState.PENDING_DISABLE) {
- if (elapsedSinceDisable > pendingDisableThreshold) {
- // too long in PENDING_DISABLE - client didn't disable the index, so we do it here
- IndexUtil.updateIndexState(conn, indexTableFullName, PIndexState.DISABLE, indexDisableTimestamp);
- }
- continue;
- }
-
// Only perform relatively expensive check for all regions online when index
// is disabled or pending active since that's the state it's placed into when
// an index write fails.
@@ -367,8 +351,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
+ indexPTable.getName() + " are online.");
continue;
}
-
- if (elapsedSinceDisable > indexDisableTimestampThreshold) {
+ if (EnvironmentEdgeManager.currentTimeMillis() - Math.abs(indexDisableTimestamp) > indexDisableTimestampThreshold) {
/*
* It has been too long since the index has been disabled and any future
* attempts to reenable it likely will fail. So we are going to mark the
http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 7692bc8..af50420 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -53,18 +53,22 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.CoprocessorHConnection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -77,9 +81,9 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.cache.ServerCacheClient;
+import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
import org.apache.phoenix.coprocessor.generated.PTableProtos;
import org.apache.phoenix.exception.DataExceedsCapacityException;
-import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.ExpressionType;
@@ -88,15 +92,13 @@ import org.apache.phoenix.expression.aggregator.Aggregators;
import org.apache.phoenix.expression.aggregator.ServerAggregators;
import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
-import org.apache.phoenix.hbase.index.exception.IndexWriteException;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexCodec;
-import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
-import org.apache.phoenix.index.PhoenixIndexFailurePolicy.MutateCommand;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
@@ -134,19 +136,22 @@ import org.apache.phoenix.util.ExpressionUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.KeyValueUtil;
import org.apache.phoenix.util.LogUtil;
+import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.StringUtil;
+import org.apache.phoenix.util.TimeKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
@@ -200,8 +205,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
private KeyValueBuilder kvBuilder;
private Configuration upsertSelectConfig;
private Configuration compactionConfig;
- private Configuration indexWriteConfig;
- private ReadOnlyProps indexWriteProps;
@Override
public void start(CoprocessorEnvironment e) throws IOException {
@@ -231,13 +234,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
compactionConfig.setInt(HConstants.HBASE_CLIENT_PAUSE,
e.getConfiguration().getInt(QueryServices.METADATA_WRITE_RETRY_PAUSE,
QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRY_PAUSE));
-
- // For retries of index write failures, use the same # of retries as the rebuilder
- indexWriteConfig = PropertiesUtil.cloneConfig(e.getConfiguration());
- indexWriteConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
- e.getConfiguration().getInt(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER,
- QueryServicesOptions.DEFAULT_INDEX_REBUILD_RPC_RETRIES_COUNTER));
- indexWriteProps = new ReadOnlyProps(indexWriteConfig.iterator());
}
private void commitBatch(Region region, List<Mutation> mutations, long blockingMemstoreSize) throws IOException {
@@ -258,7 +254,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
}
// TODO: should we use the one that is all or none?
- logger.debug("Committing batch of " + mutations.size() + " mutations for " + region.getRegionInfo().getTable().getNameAsString());
+ logger.debug("Committing bactch of " + mutations.size() + " mutations for " + region.getRegionInfo().getTable().getNameAsString());
region.batchMutate(mutations.toArray(mutationArray), HConstants.NO_NONCE, HConstants.NO_NONCE);
}
@@ -864,65 +860,21 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
}
- private void commit(final Region region, List<Mutation> mutations, byte[] indexUUID, final long blockingMemStoreSize,
- byte[] indexMaintainersPtr, byte[] txState, final HTable targetHTable, boolean useIndexProto,
+ private void commit(Region region, List<Mutation> mutations, byte[] indexUUID, long blockingMemStoreSize,
+ byte[] indexMaintainersPtr, byte[] txState, HTable targetHTable, boolean useIndexProto,
boolean isPKChanging)
throws IOException {
- final List<Mutation> localRegionMutations = Lists.newArrayList();
- final List<Mutation> remoteRegionMutations = Lists.newArrayList();
+ List<Mutation> localRegionMutations = Lists.newArrayList();
+ List<Mutation> remoteRegionMutations = Lists.newArrayList();
setIndexAndTransactionProperties(mutations, indexUUID, indexMaintainersPtr, txState, useIndexProto);
separateLocalAndRemoteMutations(targetHTable, region, mutations, localRegionMutations, remoteRegionMutations,
isPKChanging);
- try {
- commitBatch(region, localRegionMutations, blockingMemStoreSize);
- } catch (IOException e) {
- handleIndexWriteException(localRegionMutations, e, new MutateCommand() {
- @Override
- public void doMutation() throws IOException {
- commitBatch(region, localRegionMutations, blockingMemStoreSize);
- }
- });
- }
- try {
- commitBatchWithHTable(targetHTable, remoteRegionMutations);
- } catch (IOException e) {
- handleIndexWriteException(remoteRegionMutations, e, new MutateCommand() {
- @Override
- public void doMutation() throws IOException {
- commitBatchWithHTable(targetHTable, remoteRegionMutations);
- }
- });
- }
+ commitBatch(region, localRegionMutations, blockingMemStoreSize);
+ commitBatchWithHTable(targetHTable, remoteRegionMutations);
localRegionMutations.clear();
remoteRegionMutations.clear();
}
- private void handleIndexWriteException(final List<Mutation> localRegionMutations, IOException origIOE,
- MutateCommand mutateCommand) throws IOException {
- long serverTimestamp = ServerUtil.parseTimestampFromRemoteException(origIOE);
- SQLException inferredE = ServerUtil.parseLocalOrRemoteServerException(origIOE);
- if (inferredE != null && inferredE.getErrorCode() == SQLExceptionCode.INDEX_WRITE_FAILURE.getErrorCode()) {
- // For an index write failure, the data table write succeeded,
- // so when we retry we need to set REPLAY_WRITES
- for (Mutation mutation : localRegionMutations) {
- mutation.setAttribute(REPLAY_WRITES, REPLAY_ONLY_INDEX_WRITES);
- // use the server timestamp for index write retrys
- KeyValueUtil.setTimestamp(mutation, serverTimestamp);
- }
- IndexWriteException iwe = PhoenixIndexFailurePolicy.getIndexWriteException(inferredE);
- try (PhoenixConnection conn =
- QueryUtil.getConnectionOnServer(indexWriteConfig)
- .unwrap(PhoenixConnection.class)) {
- PhoenixIndexFailurePolicy.doBatchWithRetries(mutateCommand, iwe, conn,
- indexWriteProps);
- } catch (Exception e) {
- throw new DoNotRetryIOException(e);
- }
- } else {
- throw origIOE;
- }
- }
-
private void separateLocalAndRemoteMutations(HTable targetHTable, Region region, List<Mutation> mutations,
List<Mutation> localRegionMutations, List<Mutation> remoteRegionMutations,
boolean isPKChanging){
http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 0f29f3f..2301c32 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -367,7 +367,6 @@ public enum SQLExceptionCode {
CONNECTION_CLOSED(1111, "XCL11", "Connectioin is closed."),
INDEX_FAILURE_BLOCK_WRITE(1120, "XCL20", "Writes to table blocked until index can be updated."),
- INDEX_WRITE_FAILURE(1121, "XCL21", "Write to the index failed."),
UPDATE_CACHE_FREQUENCY_INVALID(1130, "XCL30", "UPDATE_CACHE_FREQUENCY cannot be set to ALWAYS if APPEND_ONLY_SCHEMA is true."),
CANNOT_DROP_COL_APPEND_ONLY_SCHEMA(1131, "XCL31", "Cannot drop column from table that with append only schema."),
http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 0719966..993438e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -53,14 +53,11 @@ import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
-import org.apache.phoenix.hbase.index.exception.IndexWriteException;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.IndexMetaDataCacheClient;
import org.apache.phoenix.index.PhoenixIndexBuilder;
import org.apache.phoenix.index.PhoenixIndexCodec;
-import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
-import org.apache.phoenix.index.PhoenixIndexFailurePolicy.MutateCommand;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
import org.apache.phoenix.monitoring.GlobalClientMetrics;
@@ -986,8 +983,6 @@ public class MutationState implements SQLCloseable {
long mutationCommitTime = 0;
long numFailedMutations = 0;;
long startTime = 0;
- boolean shouldRetryIndexedMutation = false;
- IndexWriteException iwe = null;
do {
TableRef origTableRef = tableInfo.getOrigTableRef();
PTable table = origTableRef.getTable();
@@ -1021,25 +1016,8 @@ public class MutationState implements SQLCloseable {
startTime = System.currentTimeMillis();
child.addTimelineAnnotation("Attempt " + retryCount);
List<List<Mutation>> mutationBatchList = getMutationBatchList(batchSize, batchSizeBytes, mutationList);
- for (final List<Mutation> mutationBatch : mutationBatchList) {
- if (shouldRetryIndexedMutation) {
- // if there was an index write failure, retry the mutation in a loop
- final HTableInterface finalHTable = hTable;
- PhoenixIndexFailurePolicy.doBatchWithRetries(new MutateCommand() {
- @Override
- public void doMutation() throws IOException {
- try {
- finalHTable.batch(mutationBatch);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException(e);
- }
- }}, iwe,
- connection, connection.getQueryServices().getProps());
- } else {
- hTable.batch(mutationBatch);
- }
-
+ for (List<Mutation> mutationBatch : mutationBatchList) {
+ hTable.batch(mutationBatch);
batchCount++;
if (logger.isDebugEnabled()) logger.debug("Sent batch of " + mutationBatch.size() + " for " + Bytes.toString(htableName));
}
@@ -1076,19 +1054,6 @@ public class MutationState implements SQLCloseable {
child = Tracing.child(span,"Failed batch, attempting retry");
continue;
- } else if (inferredE.getErrorCode() == SQLExceptionCode.INDEX_WRITE_FAILURE.getErrorCode()) {
- iwe = PhoenixIndexFailurePolicy.getIndexWriteException(inferredE);
- if (iwe != null && !shouldRetryIndexedMutation) {
- // For an index write failure, the data table write succeeded,
- // so when we retry we need to set REPLAY_WRITES
- for (Mutation m : mutationList) {
- m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES);
- KeyValueUtil.setTimestamp(m, serverTimestamp);
- }
- shouldRetry = true;
- shouldRetryIndexedMutation = true;
- continue;
- }
}
e = inferredE;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index f8195f1..9686789 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -185,6 +185,7 @@ public class Indexer extends BaseRegionObserver {
private long slowPostOpenThreshold;
private long slowPreIncrementThreshold;
private int rowLockWaitDuration;
+ private Configuration compactionConfig;
public static final String RecoveryFailurePolicyKeyForTesting = INDEX_RECOVERY_FAILURE_POLICY_KEY;
@@ -241,6 +242,15 @@ public class Indexer extends BaseRegionObserver {
this.metricSource = MetricsIndexerSourceFactory.getInstance().create();
setSlowThresholds(e.getConfiguration());
+ compactionConfig = PropertiesUtil.cloneConfig(e.getConfiguration());
+ // lower the number of rpc retries, so we don't hang the compaction
+ compactionConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ e.getConfiguration().getInt(QueryServices.METADATA_WRITE_RETRIES_NUMBER,
+ QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRIES_NUMBER));
+ compactionConfig.setInt(HConstants.HBASE_CLIENT_PAUSE,
+ e.getConfiguration().getInt(QueryServices.METADATA_WRITE_RETRY_PAUSE,
+ QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRY_PAUSE));
+
try {
// get the specified failure policy. We only ever override it in tests, but we need to do it
// here
http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/IndexWriteException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/IndexWriteException.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/IndexWriteException.java
index 531baa6..2ec29bc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/IndexWriteException.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/IndexWriteException.java
@@ -17,10 +17,7 @@
*/
package org.apache.phoenix.hbase.index.exception;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.phoenix.query.QueryServicesOptions;
/**
* Generic {@link Exception} that an index write has failed
@@ -28,57 +25,19 @@ import org.apache.phoenix.query.QueryServicesOptions;
@SuppressWarnings("serial")
public class IndexWriteException extends HBaseIOException {
- /*
- * We pass this message back to the client so that the config only needs to be set on the
- * server side.
- */
- private static final String DISABLE_INDEX_ON_FAILURE_MSG = "disableIndexOnFailure=";
- private boolean disableIndexOnFailure;
-
public IndexWriteException() {
super();
}
- /**
- * Used for the case where we cannot reach the index, but not sure of the table or the mutations
- * that caused the failure
- * @param message
- * @param cause
- */
public IndexWriteException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public IndexWriteException(String message, Throwable cause, boolean disableIndexOnFailure) {
- super(prependDisableIndexMsg(message, disableIndexOnFailure), cause);
+ super(message, cause);
}
- public IndexWriteException(String message, boolean disableIndexOnFailure) {
- super(prependDisableIndexMsg(message, disableIndexOnFailure));
- this.disableIndexOnFailure = disableIndexOnFailure;
+ public IndexWriteException(String message) {
+ super(message);
}
- private static String prependDisableIndexMsg(String message, boolean disableIndexOnFailure) {
- return DISABLE_INDEX_ON_FAILURE_MSG + disableIndexOnFailure + " " + message;
-}
-
-public IndexWriteException(Throwable cause) {
+ public IndexWriteException(Throwable cause) {
super(cause);
}
-
- public static boolean parseDisableIndexOnFailure(String message) {
- Pattern p =
- Pattern.compile(DISABLE_INDEX_ON_FAILURE_MSG + "(true|false)",
- Pattern.CASE_INSENSITIVE);
- Matcher m = p.matcher(message);
- if (m.find()) {
- boolean disableIndexOnFailure = Boolean.parseBoolean(m.group(1));
- return disableIndexOnFailure;
- }
- return QueryServicesOptions.DEFAULT_INDEX_FAILURE_DISABLE_INDEX;
- }
-
- public boolean isDisableIndexOnFailure() {
- return disableIndexOnFailure;
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/MultiIndexWriteFailureException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/MultiIndexWriteFailureException.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/MultiIndexWriteFailureException.java
index d593791..546b43d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/MultiIndexWriteFailureException.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/MultiIndexWriteFailureException.java
@@ -18,14 +18,8 @@
package org.apache.phoenix.hbase.index.exception;
import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-
-import com.google.common.collect.Lists;
/**
* Indicate a failure to write to multiple index tables.
@@ -33,34 +27,15 @@ import com.google.common.collect.Lists;
@SuppressWarnings("serial")
public class MultiIndexWriteFailureException extends IndexWriteException {
- public static final String FAILURE_MSG = "Failed to write to multiple index tables: ";
private List<HTableInterfaceReference> failures;
/**
* @param failures the tables to which the index write did not succeed
*/
- public MultiIndexWriteFailureException(List<HTableInterfaceReference> failures, boolean disableIndexOnFailure) {
- super(FAILURE_MSG + failures, disableIndexOnFailure);
+ public MultiIndexWriteFailureException(List<HTableInterfaceReference> failures) {
+ super("Failed to write to multiple index tables");
this.failures = failures;
- }
- /**
- * This constructor used to rematerialize this exception when receiving
- * an rpc exception from the server
- * @param message detail message
- */
- public MultiIndexWriteFailureException(String message) {
- super(message, IndexWriteException.parseDisableIndexOnFailure(message));
- Pattern p = Pattern.compile(FAILURE_MSG + "\\[(.*)\\]");
- Matcher m = p.matcher(message);
- if (m.find()) {
- failures = Lists.newArrayList();
- String tablesStr = m.group(1);
- for (String tableName : tablesStr.split(",\\s")) {
- HTableInterfaceReference tableRef = new HTableInterfaceReference(new ImmutableBytesPtr(Bytes.toBytes(tableName)));
- failures.add(tableRef);
- }
- }
}
public List<HTableInterfaceReference> getFailedTables() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/SingleIndexWriteFailureException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/SingleIndexWriteFailureException.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/SingleIndexWriteFailureException.java
index 610a82a..eb3b521 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/SingleIndexWriteFailureException.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/SingleIndexWriteFailureException.java
@@ -18,8 +18,6 @@
package org.apache.phoenix.hbase.index.exception;
import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import org.apache.hadoop.hbase.client.Mutation;
@@ -29,7 +27,6 @@ import org.apache.hadoop.hbase.client.Mutation;
@SuppressWarnings("serial")
public class SingleIndexWriteFailureException extends IndexWriteException {
- public static final String FAILED_MSG = "Failed to make index update:";
private String table;
/**
@@ -48,27 +45,13 @@ public class SingleIndexWriteFailureException extends IndexWriteException {
* @param cause underlying reason for the failure
*/
public SingleIndexWriteFailureException(String targetTableName, List<Mutation> mutations,
- Exception cause, boolean disableIndexOnFailure) {
- super(FAILED_MSG + "\n\t table: " + targetTableName + "\n\t edits: " + mutations
- + "\n\tcause: " + cause == null ? "UNKNOWN" : cause.getMessage(), cause, disableIndexOnFailure);
+ Exception cause) {
+ super("Failed to make index update:\n\t table: " + targetTableName + "\n\t edits: " + mutations
+ + "\n\tcause: " + cause == null ? "UNKNOWN" : cause.getMessage(), cause);
this.table = targetTableName;
}
/**
- * This constructor used to rematerialize this exception when receiving
- * an rpc exception from the server
- * @param message detail message
- */
- public SingleIndexWriteFailureException(String msg) {
- super(msg, IndexWriteException.parseDisableIndexOnFailure(msg));
- Pattern pattern = Pattern.compile(FAILED_MSG + ".* table: ([\\S]*)\\s.*", Pattern.DOTALL);
- Matcher m = pattern.matcher(msg);
- if (m.find()) {
- this.table = m.group(1);
- }
- }
-
- /**
* @return The table to which we failed to write the index updates. If unknown, returns
* <tt>null</tt>
*/
http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
index 29b9faf..3649069 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
@@ -70,13 +70,13 @@ public class IndexWriterUtils {
public static final String HTABLE_KEEP_ALIVE_KEY = "hbase.htable.threads.keepalivetime";
public static final String INDEX_WRITER_RPC_RETRIES_NUMBER = "phoenix.index.writes.rpc.retries.number";
- /**
- * Retry server-server index write rpc only once, and let the client retry the data write
- * instead to avoid typing up the handler
- */
- // note in HBase 2+, numTries = numRetries + 1
- // in prior versions, numTries = numRetries
- public static final int DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER = 1;
+ /**
+ * Based on the logic in HBase's AsyncProcess, a default of 11 retries with a pause of 100ms
+ * approximates 48 sec total retry time (factoring in backoffs). The total time should be less
+ * than HBase's rpc timeout (default of 60 sec) or else the client will retry before receiving
+ * the response
+ */
+ public static final int DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER = 11;
public static final String INDEX_WRITER_RPC_PAUSE = "phoenix.index.writes.rpc.pause";
public static final int DEFAULT_INDEX_WRITER_RPC_PAUSE = 100;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
index 0bb8784..e4e8343 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
@@ -35,7 +35,6 @@ import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
import org.apache.phoenix.hbase.index.table.HTableFactory;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
-import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
import org.apache.phoenix.util.IndexUtil;
import com.google.common.collect.Multimap;
@@ -168,11 +167,11 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
} catch (SingleIndexWriteFailureException e) {
throw e;
} catch (IOException e) {
- throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e, PhoenixIndexFailurePolicy.getDisableIndexOnFailure(env));
+ throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e);
} catch (InterruptedException e) {
// reset the interrupt status on the thread
Thread.currentThread().interrupt();
- throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e, PhoenixIndexFailurePolicy.getDisableIndexOnFailure(env));
+ throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e);
}
finally{
if (table != null) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
index 94d4f0f..0449e9e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
@@ -40,7 +40,6 @@ import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner;
import org.apache.phoenix.hbase.index.table.HTableFactory;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
-import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
import org.apache.phoenix.util.IndexUtil;
import com.google.common.collect.Multimap;
@@ -111,7 +110,6 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
this.factory = factory;
this.abortable = new CapturingAbortable(abortable);
this.stopped = stop;
- this.env = env;
}
@Override
@@ -228,8 +226,7 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
// if any of the tasks failed, then we need to propagate the failure
if (failures.size() > 0) {
// make the list unmodifiable to avoid any more synchronization concerns
- throw new MultiIndexWriteFailureException(Collections.unmodifiableList(failures),
- PhoenixIndexFailurePolicy.getDisableIndexOnFailure(env));
+ throw new MultiIndexWriteFailureException(Collections.unmodifiableList(failures));
}
return;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index 14f8307..ba6371b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -30,28 +30,21 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
-import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.exception.SQLExceptionInfo;
-import org.apache.phoenix.hbase.index.exception.IndexWriteException;
import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException;
-import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.write.DelegateIndexFailurePolicy;
import org.apache.phoenix.hbase.index.write.KillServerOnFailurePolicy;
@@ -66,7 +59,6 @@ import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
@@ -110,8 +102,14 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy {
rebuildIndexOnFailure = Boolean.parseBoolean(value);
}
}
- disableIndexOnFailure = getDisableIndexOnFailure(env);
- String value = htd.getValue(BLOCK_DATA_TABLE_WRITES_ON_WRITE_FAILURE);
+ String value = htd.getValue(DISABLE_INDEX_ON_WRITE_FAILURE);
+ if (value == null) {
+ disableIndexOnFailure = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_DISABLE_INDEX,
+ QueryServicesOptions.DEFAULT_INDEX_FAILURE_DISABLE_INDEX);
+ } else {
+ disableIndexOnFailure = Boolean.parseBoolean(value);
+ }
+ value = htd.getValue(BLOCK_DATA_TABLE_WRITES_ON_WRITE_FAILURE);
if (value == null) {
blockDataTableWritesOnFailure = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE,
QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
@@ -151,11 +149,7 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy {
throwing = false;
} finally {
if (!throwing) {
- SQLException sqlException =
- new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_WRITE_FAILURE)
- .setRootCause(cause).setMessage(cause.getLocalizedMessage()).build()
- .buildException();
- IOException ioException = ServerUtil.wrapInDoNotRetryIOException(null, sqlException, timestamp);
+ IOException ioException = ServerUtil.wrapInDoNotRetryIOException("Unable to update the following indexes: " + attempted.keySet(), cause, timestamp);
Mutation m = attempted.entries().iterator().next().getValue();
boolean isIndexRebuild = PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap());
// Always throw if rebuilding index since the rebuilder needs to know if it was successful
@@ -218,7 +212,7 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy {
return timestamp;
}
- final PIndexState newState = disableIndexOnFailure ? PIndexState.PENDING_DISABLE : PIndexState.PENDING_ACTIVE;
+ final PIndexState newState = disableIndexOnFailure ? PIndexState.DISABLE : PIndexState.PENDING_ACTIVE;
final long fTimestamp=timestamp;
// for all the index tables that we've found, try to disable them and if that fails, try to
return User.runAsLoginUser(new PrivilegedExceptionAction<Long>() {
@@ -260,9 +254,12 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy {
throw new DoNotRetryIOException("Attempt to disable " + indexTableName + " failed.");
}
}
- LOG.info("Successfully update INDEX_DISABLE_TIMESTAMP for " + indexTableName
- + " due to an exception while writing updates. indexState=" + newState,
- cause);
+ if (leaveIndexActive)
+ LOG.info("Successfully update INDEX_DISABLE_TIMESTAMP for " + indexTableName
+ + " due to an exception while writing updates.", cause);
+ else
+ LOG.info("Successfully disabled index " + indexTableName
+ + " due to an exception while writing updates.", cause);
} catch (Throwable t) {
if (t instanceof Exception) {
throw (Exception)t;
@@ -334,158 +331,4 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy {
}
return indexTableNames;
}
-
- /**
- * Check config for whether to disable index on index write failures
- * @param htd
- * @param config
- * @param connection
- * @return The table config for {@link PhoenixIndexFailurePolicy.DISABLE_INDEX_ON_WRITE_FAILURE}
- * @throws SQLException
- */
- public static boolean getDisableIndexOnFailure(RegionCoprocessorEnvironment env) {
- HTableDescriptor htd = env.getRegion().getTableDesc();
- Configuration config = env.getConfiguration();
- String value = htd.getValue(PhoenixIndexFailurePolicy.DISABLE_INDEX_ON_WRITE_FAILURE);
- boolean disableIndexOnFailure;
- if (value == null) {
- disableIndexOnFailure =
- config.getBoolean(QueryServices.INDEX_FAILURE_DISABLE_INDEX,
- QueryServicesOptions.DEFAULT_INDEX_FAILURE_DISABLE_INDEX);
- } else {
- disableIndexOnFailure = Boolean.parseBoolean(value);
- }
- return disableIndexOnFailure;
- }
-
- /**
- * If we're leaving the index active after index write failures on the server side, then we get
- * the exception on the client side here after hitting the max # of hbase client retries. We
- * disable the index as it may now be inconsistent. The indexDisableTimestamp was already set
- * on the server side, so the rebuilder will be run.
- */
- private static void handleIndexWriteFailureFromClient(IndexWriteException indexWriteException,
- PhoenixConnection conn) {
- handleExceptionFromClient(indexWriteException, conn, PIndexState.DISABLE);
- }
-
- private static void handleIndexWriteSuccessFromClient(IndexWriteException indexWriteException,
- PhoenixConnection conn) {
- handleExceptionFromClient(indexWriteException, conn, PIndexState.ACTIVE);
- }
-
- private static void handleExceptionFromClient(IndexWriteException indexWriteException,
- PhoenixConnection conn, PIndexState indexState) {
- try {
- Set<String> indexesToUpdate = new HashSet<>();
- if (indexWriteException instanceof MultiIndexWriteFailureException) {
- MultiIndexWriteFailureException indexException =
- (MultiIndexWriteFailureException) indexWriteException;
- List<HTableInterfaceReference> failedIndexes = indexException.getFailedTables();
- if (indexException.isDisableIndexOnFailure() && failedIndexes != null) {
- for (HTableInterfaceReference failedIndex : failedIndexes) {
- String failedIndexTable = failedIndex.getTableName();
- if (!indexesToUpdate.contains(failedIndexTable)) {
- updateIndex(failedIndexTable, conn, indexState);
- indexesToUpdate.add(failedIndexTable);
- }
- }
- }
- } else if (indexWriteException instanceof SingleIndexWriteFailureException) {
- SingleIndexWriteFailureException indexException =
- (SingleIndexWriteFailureException) indexWriteException;
- String failedIndex = indexException.getTableName();
- if (indexException.isDisableIndexOnFailure() && failedIndex != null) {
- updateIndex(failedIndex, conn, indexState);
- }
- }
- } catch (Exception handleE) {
- LOG.warn("Error while trying to handle index write exception", indexWriteException);
- }
- }
-
- public static interface MutateCommand {
- void doMutation() throws IOException;
- }
-
- /**
- * Retries a mutationBatch where the index write failed.
- * One attempt should have already been made before calling this.
- * Max retries and exponential backoff logic mimics that of HBase's client
- * If max retries are hit, the index is disabled.
- * If the write is successful on a subsequent retry, the index is set back to ACTIVE
- * @param mutateCommand mutation command to execute
- * @param iwe original IndexWriteException
- * @param connection connection to use
- * @param config config used to get retry settings
- * @throws Exception
- */
- public static void doBatchWithRetries(MutateCommand mutateCommand,
- IndexWriteException iwe, PhoenixConnection connection, ReadOnlyProps config)
- throws IOException {
- int maxTries = config.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
- HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
- long pause = config.getLong(HConstants.HBASE_CLIENT_PAUSE,
- HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
- int numRetry = 1; // already tried once
- // calculate max time to retry for
- int timeout = 0;
- for (int i = 0; i < maxTries; ++i) {
- timeout = (int) (timeout + ConnectionUtils.getPauseTime(pause, i));
- }
- long canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout;
- while (canRetryMore(numRetry++, maxTries, canRetryUntil)) {
- try {
- Thread.sleep(ConnectionUtils.getPauseTime(pause, numRetry)); // HBase's exponential backoff
- mutateCommand.doMutation();
- // success - change the index state from PENDING_DISABLE back to ACTIVE
- handleIndexWriteSuccessFromClient(iwe, connection);
- return;
- } catch (IOException e) {
- SQLException inferredE = ServerUtil.parseLocalOrRemoteServerException(e);
- if (inferredE == null || inferredE.getErrorCode() != SQLExceptionCode.INDEX_WRITE_FAILURE.getErrorCode()) {
- // if it's not an index write exception, throw exception, to be handled normally in caller's try-catch
- throw e;
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException(e);
- }
- }
- // max retries hit - disable the index
- handleIndexWriteFailureFromClient(iwe, connection);
- throw new DoNotRetryIOException(iwe); // send failure back to client
- }
-
- private static boolean canRetryMore(int numRetry, int maxRetries, long canRetryUntil) {
- // If there is a single try we must not take into account the time.
- return numRetry < maxRetries
- || (maxRetries > 1 && EnvironmentEdgeManager.currentTime() < canRetryUntil);
- }
-
- /**
- * Converts from SQLException to IndexWriteException
- * @param sqlE the SQLException
- * @return the IndexWriteException
- */
- public static IndexWriteException getIndexWriteException(SQLException sqlE) {
- String sqlMsg = sqlE.getMessage();
- if (sqlMsg.contains(MultiIndexWriteFailureException.FAILURE_MSG)) {
- return new MultiIndexWriteFailureException(sqlMsg);
- } else if (sqlMsg.contains(SingleIndexWriteFailureException.FAILED_MSG)) {
- return new SingleIndexWriteFailureException(sqlMsg);
- }
- return null;
- }
-
- private static void updateIndex(String indexFullName, PhoenixConnection conn,
- PIndexState indexState) throws SQLException {
- if (PIndexState.DISABLE.equals(indexState)) {
- LOG.info("Disabling index after hitting max number of index write retries: "
- + indexFullName);
- } else if (PIndexState.ACTIVE.equals(indexState)) {
- LOG.debug("Resetting index to active after subsequent success " + indexFullName);
- }
- IndexUtil.updateIndexState(conn, indexFullName, indexState, null);
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 094f743..23330d8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -322,7 +322,6 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
public static final int MIN_RENEW_LEASE_VERSION = VersionUtil.encodeVersion("1", "1", "3");
public static final int MIN_NAMESPACE_MAPPED_PHOENIX_VERSION = VersionUtil.encodeVersion("4", "8", "0");
public static final int MIN_PENDING_ACTIVE_INDEX = VersionUtil.encodeVersion("4", "12", "0");
- public static final int MIN_PENDING_DISABLE_INDEX = VersionUtil.encodeVersion("4", "14", "0");
// Version below which we should turn off essential column family.
public static final int ESSENTIAL_FAMILY_VERSION_THRESHOLD = VersionUtil.encodeVersion("0", "94", "7");
http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 3f5f5ed..4192869 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -57,9 +57,7 @@ import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.PTableType;
-import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.types.PDataType;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
import com.google.common.collect.Lists;
@@ -70,14 +68,11 @@ public class QueryOptimizer {
private final QueryServices services;
private final boolean useIndexes;
private final boolean costBased;
- private long indexPendingDisabledThreshold;
public QueryOptimizer(QueryServices services) {
this.services = services;
this.useIndexes = this.services.getProps().getBoolean(QueryServices.USE_INDEXES_ATTRIB, QueryServicesOptions.DEFAULT_USE_INDEXES);
this.costBased = this.services.getProps().getBoolean(QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED);
- this.indexPendingDisabledThreshold = this.services.getProps().getLong(QueryServices.INDEX_PENDING_DISABLE_THRESHOLD,
- QueryServicesOptions.DEFAULT_INDEX_PENDING_DISABLE_THRESHOLD);
}
public QueryPlan optimize(PhoenixStatement statement, QueryPlan dataPlan) throws SQLException {
@@ -163,7 +158,7 @@ public class QueryOptimizer {
return hintedPlan == null ? orderPlansBestToWorst(select, plans, stopAtBestPlan) : plans;
}
- private QueryPlan getHintedQueryPlan(PhoenixStatement statement, SelectStatement select, List<PTable> indexes, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, List<QueryPlan> plans) throws SQLException {
+ private static QueryPlan getHintedQueryPlan(PhoenixStatement statement, SelectStatement select, List<PTable> indexes, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, List<QueryPlan> plans) throws SQLException {
QueryPlan dataPlan = plans.get(0);
String indexHint = select.getHint().getHint(Hint.INDEX);
if (indexHint == null) {
@@ -220,7 +215,7 @@ public class QueryOptimizer {
return -1;
}
- private QueryPlan addPlan(PhoenixStatement statement, SelectStatement select, PTable index, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, QueryPlan dataPlan, boolean isHinted) throws SQLException {
+ private static QueryPlan addPlan(PhoenixStatement statement, SelectStatement select, PTable index, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, QueryPlan dataPlan, boolean isHinted) throws SQLException {
int nColumns = dataPlan.getProjector().getColumnCount();
String tableAlias = dataPlan.getTableRef().getTableAlias();
String alias = tableAlias==null ? null : '"' + tableAlias + '"'; // double quote in case it's case sensitive
@@ -234,11 +229,8 @@ public class QueryOptimizer {
// We will or will not do tuple projection according to the data plan.
boolean isProjected = dataPlan.getContext().getResolver().getTables().get(0).getTable().getType() == PTableType.PROJECTED;
// Check index state of now potentially updated index table to make sure it's active
- TableRef indexTableRef = resolver.getTables().get(0);
- PTable indexTable = indexTableRef.getTable();
- PIndexState indexState = indexTable.getIndexState();
- if (indexState == PIndexState.ACTIVE || indexState == PIndexState.PENDING_ACTIVE
- || (indexState == PIndexState.PENDING_DISABLE && isUnderPendingDisableThreshold(indexTableRef.getCurrentTime(), indexTable.getIndexDisableTimestamp()))) {
+ PIndexState indexState = resolver.getTables().get(0).getTable().getIndexState();
+ if (indexState == PIndexState.ACTIVE || indexState == PIndexState.PENDING_ACTIVE) {
try {
// translate nodes that match expressions that are indexed to the associated column parse node
indexSelect = ParseNodeRewriter.rewrite(indexSelect, new IndexExpressionParseNodeRewriter(index, null, statement.getConnection(), indexSelect.getUdfParseNodes()));
@@ -254,13 +246,10 @@ public class QueryOptimizer {
&& !plan.getContext().getDataColumns().isEmpty()) {
return null;
}
- indexTableRef = plan.getTableRef();
- indexTable = indexTableRef.getTable();
- indexState = indexTable.getIndexState();
+ indexState = plan.getTableRef().getTable().getIndexState();
// Checking number of columns handles the wildcard cases correctly, as in that case the index
// must contain all columns from the data table to be able to be used.
- if (indexState == PIndexState.ACTIVE || indexState == PIndexState.PENDING_ACTIVE
- || (indexState == PIndexState.PENDING_DISABLE && isUnderPendingDisableThreshold(indexTableRef.getCurrentTime(), indexTable.getIndexDisableTimestamp()))) {
+ if (indexState == PIndexState.ACTIVE || indexState == PIndexState.PENDING_ACTIVE) {
if (plan.getProjector().getColumnCount() == nColumns) {
return plan;
} else if (index.getIndexType() == IndexType.GLOBAL) {
@@ -323,12 +312,6 @@ public class QueryOptimizer {
return null;
}
- // returns true if we can still use the index
- // retuns false if we've been in PENDING_DISABLE too long - index should be considered disabled
- private boolean isUnderPendingDisableThreshold(long currentTimestamp, long indexDisableTimestamp) {
- return currentTimestamp - indexDisableTimestamp <= indexPendingDisabledThreshold;
- }
-
/**
* Order the plans among all the possible ones from best to worst.
* If option COST_BASED_OPTIMIZER_ENABLED is on and stats are available, we order the plans based on
http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 2a31f09..0b80f4d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -144,8 +144,6 @@ public interface QueryServices extends SQLCloseable {
public static final String INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE = "phoenix.index.rebuild.batch.perTable";
// If index disable timestamp is older than this threshold, then index rebuild task won't attempt to rebuild it
public static final String INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD = "phoenix.index.rebuild.disabletimestamp.threshold";
- // threshold number of ms an index has been in PENDING_DISABLE, beyond which we consider it disabled
- public static final String INDEX_PENDING_DISABLE_THRESHOLD = "phoenix.index.pending.disable.threshold";
// Block writes to data table when index write fails
public static final String INDEX_FAILURE_BLOCK_WRITE = "phoenix.index.failure.block.write";
http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index d749433..4d31974 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -198,7 +198,6 @@ public class QueryServicesOptions {
public static final long DEFAULT_INDEX_REBUILD_CLIENT_SCANNER_TIMEOUT = 30000 * 60; // 30 mins
public static final int DEFAULT_INDEX_REBUILD_RPC_RETRIES_COUNTER = 5; // 5 total tries at rpc level
public static final int DEFAULT_INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD = 30000 * 60; // 30 mins
- public static final long DEFAULT_INDEX_PENDING_DISABLE_THRESHOLD = 30000; // 30 secs
/**
* HConstants#HIGH_QOS is the max we will see to a standard table. We go higher to differentiate
http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/schema/PIndexState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PIndexState.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PIndexState.java
index 2b6ac4a..d7dbeca 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PIndexState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PIndexState.java
@@ -27,12 +27,7 @@ public enum PIndexState {
INACTIVE("i"),
DISABLE("x"),
REBUILD("r"),
- PENDING_ACTIVE("p"),
- // Used when disabling an index on write failure (PHOENIX-4130)
- // When an index write fails, it is put in this state, and we let the client retry the mutation
- // After retries are exhausted, the client should mark the index as disabled, but if that
- // doesn't happen, then the index is considered disabled if it's been in this state too long
- PENDING_DISABLE("w");
+ PENDING_ACTIVE("p");
private final String serializedValue;
private final byte[] serializedBytes;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
index 4d8565f..df6a349 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
import org.apache.phoenix.execute.MutationState.RowMutationState;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -133,17 +132,6 @@ public class KeyValueUtil {
return kvs[pos];
}
- public static void setTimestamp(Mutation m, long timestamp) {
- byte[] tsBytes = Bytes.toBytes(timestamp);
- for (List<Cell> family : m.getFamilyCellMap().values()) {
- List<KeyValue> familyKVs = org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValues(family);
- for (KeyValue kv : familyKVs) {
- int tsOffset = kv.getTimestampOffset();
- System.arraycopy(tsBytes, 0, kv.getBuffer(), tsOffset, Bytes.SIZEOF_LONG);
- }
- }
- }
-
/*
* Special comparator, *only* works for binary search.
*