You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2021/01/13 12:57:30 UTC
[phoenix] branch 4.x updated: PHOENIX-6298 : Use timestamp of
PENDING_DISABLE_COUNT to calculate elapse time for PENDING_DISABLE state
This is an automated email from the ASF dual-hosted git repository.
stoty pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new 67ffa6a PHOENIX-6298 : Use timestamp of PENDING_DISABLE_COUNT to calculate elapse time for PENDING_DISABLE state
67ffa6a is described below
commit 67ffa6a347f222cd4f297b3ef36abd691cd2e76f
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Tue Jan 5 16:28:04 2021 +0530
PHOENIX-6298 : Use timestamp of PENDING_DISABLE_COUNT to calculate elapse time for PENDING_DISABLE state
---
.../end2end/index/PartialIndexRebuilderIT.java | 84 ++++++++++++++++++++--
.../coprocessor/MetaDataRegionObserver.java | 16 +++--
.../java/org/apache/phoenix/util/IndexUtil.java | 21 ++++++
3 files changed, 109 insertions(+), 12 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
index d0811ed..bc9bcfe 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.end2end.index;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -30,6 +31,7 @@ import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -63,6 +65,7 @@ import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexScrutiny;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.Repeat;
import org.apache.phoenix.util.SchemaUtil;
@@ -83,9 +86,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
private static final long REBUILD_PERIOD = 50000;
private static final long REBUILD_INTERVAL = 2000;
private static RegionCoprocessorEnvironment indexRebuildTaskRegionEnvironment;
- private static Boolean runRebuildOnce = true;
-
@BeforeClass
public static synchronized void doSetup() throws Exception {
Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
@@ -128,8 +129,8 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
private static void runIndexRebuilderAsync(final int interval, final boolean[] cancel, String table) {
runIndexRebuilderAsync(interval, cancel, Collections.<String>singletonList(table));
}
+
private static void runIndexRebuilderAsync(final int interval, final boolean[] cancel, final List<String> tables) {
- runRebuildOnce = true;
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
@@ -142,8 +143,6 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
throw new RuntimeException(e);
} catch (SQLException e) {
LOGGER.error(e.getMessage(),e);
- } finally {
- runRebuildOnce = false;
}
}
}
@@ -561,7 +560,13 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
@Override
public long currentTime() {
- return time++;
+ synchronized (this) {
+ return time++;
+ }
+ }
+
+ private synchronized void addTime(long diff) {
+ time += diff;
}
}
@@ -1088,4 +1093,71 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
throw new DoNotRetryIOException("Simulating write failure on " + c.getEnvironment().getRegionInfo().getTable().getNameAsString());
}
}
+
+ @Test
+ public void testPendingDisableWithDisableCountTs() throws Throwable {
+ final String schemaName = generateUniqueName();
+ final String tableName = generateUniqueName();
+ final String indexName = generateUniqueName();
+ final String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+ final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+ final MyClock clock =
+ new MyClock(EnvironmentEdgeManager.currentTimeMillis());
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute(String.format(
+ "CREATE TABLE %s (k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, "
+ + "v3 VARCHAR, v4 VARCHAR) COLUMN_ENCODED_BYTES = 0, "
+ + "DISABLE_INDEX_ON_WRITE_FAILURE = TRUE", fullTableName));
+ EnvironmentEdgeManager.injectEdge(clock);
+ clock.addTime(100);
+ conn.createStatement().execute(
+ String.format("CREATE INDEX %s ON %s (v1, v2)", indexName,
+ fullTableName));
+ clock.addTime(100);
+ conn.createStatement().execute(
+ String.format("UPSERT INTO %s VALUES('k01', 'v01', 'v02', 'v03', 'v04')",
+ fullTableName));
+ conn.commit();
+ clock.addTime(100);
+
+ try (HTableInterface systemCatalog = conn.unwrap(PhoenixConnection.class)
+ .getQueryServices()
+ .getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES)) {
+ IndexUtil.updateIndexState(fullIndexName, clock.currentTime(),
+ systemCatalog, PIndexState.PENDING_DISABLE);
+ }
+
+ Configuration conf =
+ conn.unwrap(PhoenixConnection.class).getQueryServices().getConfiguration();
+
+ PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class);
+ ResultSet rs = stmt.executeQuery(
+ String.format("SELECT V2 FROM %s WHERE V1 = 'v01'", fullTableName));
+ assertTrue(rs.next());
+ assertEquals("v02", rs.getString(1));
+
+ long pendingDisableThreshold = conf.getLong(
+ QueryServices.INDEX_PENDING_DISABLE_THRESHOLD,
+ QueryServicesOptions.DEFAULT_INDEX_PENDING_DISABLE_THRESHOLD);
+ long pendingDisableCountLastUpdatedTs =
+ IndexUtil.getIndexPendingDisableCountLastUpdatedTimestamp(
+ conn.unwrap(PhoenixConnection.class), fullIndexName);
+ clock.addTime(pendingDisableThreshold + pendingDisableCountLastUpdatedTs);
+
+ stmt = conn.createStatement().unwrap(PhoenixStatement.class);
+ rs = stmt.executeQuery(
+ String.format("SELECT V2 FROM %s WHERE V1 = 'v01'", fullTableName));
+ assertTrue(rs.next());
+ assertEquals("v02", rs.getString(1));
+
+ Thread.sleep(1000);
+ waitForIndexState(conn, fullTableName, fullIndexName,
+ PIndexState.DISABLE);
+ } finally {
+ EnvironmentEdgeManager.reset();
+ }
+ }
+
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index b4e2b84..4058dde 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -314,9 +314,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
continue;
}
- long indexDisableTimestamp =
- PLong.INSTANCE.getCodec().decodeLong(disabledTimeStamp, 0,
- SortOrder.ASC);
byte[] dataTable = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES);
if ((dataTable == null || dataTable.length == 0) || indexStateCell == null) {
@@ -361,14 +358,21 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
}
PIndexState indexState = PIndexState.fromSerializedValue(indexStateBytes[0]);
- long elapsedSinceDisable = EnvironmentEdgeManager.currentTimeMillis() - Math.abs(indexDisableTimestamp);
+ long pendingDisableCountLastUpdatedTs =
+ IndexUtil.getIndexPendingDisableCountLastUpdatedTimestamp(conn, indexTableFullName);
+ long elapsedSinceDisable =
+ EnvironmentEdgeManager.currentTimeMillis() - pendingDisableCountLastUpdatedTs;
// on an index write failure, the server side transitions to PENDING_DISABLE, then the client
// retries, and after retries are exhausted, disables the index
if (indexState == PIndexState.PENDING_DISABLE) {
if (elapsedSinceDisable > pendingDisableThreshold) {
- // too long in PENDING_DISABLE - client didn't disable the index, so we do it here
- IndexUtil.updateIndexState(conn, indexTableFullName, PIndexState.DISABLE, indexDisableTimestamp);
+ // too long in PENDING_DISABLE -
+ // client didn't disable the index because last time when
+ // PENDING_DISABLE_COUNT was updated is greater than pendingDisableThreshold,
+ // so we do it here
+ IndexUtil.updateIndexState(conn, indexTableFullName,
+ PIndexState.DISABLE, pendingDisableCountLastUpdatedTs);
}
continue;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 6e9e8f0..ec1a4f2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -913,4 +913,25 @@ public class IndexUtil {
throw new IOException(e);
}
}
+
+ public static long getIndexPendingDisableCountLastUpdatedTimestamp(
+ PhoenixConnection conn, String failedIndexTable)
+ throws IOException {
+ byte[] indexTableKey =
+ SchemaUtil.getTableKeyFromFullName(failedIndexTable);
+ Get get = new Get(indexTableKey);
+ get.addColumn(TABLE_FAMILY_BYTES,
+ PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES);
+ byte[] systemCatalog = SchemaUtil.getPhysicalTableName(
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME,
+ conn.getQueryServices().getProps()).getName();
+ try (Table table = conn.getQueryServices().getTable(systemCatalog)) {
+ Result result = table.get(get);
+ Cell cell = result.listCells().get(0);
+ return cell.getTimestamp();
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
}