You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2021/01/13 12:57:30 UTC

[phoenix] branch 4.x updated: PHOENIX-6298 : Use timestamp of PENDING_DISABLE_COUNT to calculate elapse time for PENDING_DISABLE state

This is an automated email from the ASF dual-hosted git repository.

stoty pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new 67ffa6a  PHOENIX-6298 : Use timestamp of PENDING_DISABLE_COUNT to calculate elapse time for PENDING_DISABLE state
67ffa6a is described below

commit 67ffa6a347f222cd4f297b3ef36abd691cd2e76f
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Tue Jan 5 16:28:04 2021 +0530

    PHOENIX-6298 : Use timestamp of PENDING_DISABLE_COUNT to calculate elapse time for PENDING_DISABLE state
---
 .../end2end/index/PartialIndexRebuilderIT.java     | 84 ++++++++++++++++++++--
 .../coprocessor/MetaDataRegionObserver.java        | 16 +++--
 .../java/org/apache/phoenix/util/IndexUtil.java    | 21 ++++++
 3 files changed, 109 insertions(+), 12 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
index d0811ed..bc9bcfe 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.end2end.index;
 
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -30,6 +31,7 @@ import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -63,6 +65,7 @@ import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexScrutiny;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.Repeat;
 import org.apache.phoenix.util.SchemaUtil;
@@ -83,9 +86,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
     private static final long REBUILD_PERIOD = 50000;
     private static final long REBUILD_INTERVAL = 2000;
     private static RegionCoprocessorEnvironment indexRebuildTaskRegionEnvironment;
-    private static Boolean runRebuildOnce = true;
 
-    
     @BeforeClass
     public static synchronized void doSetup() throws Exception {
         Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
@@ -128,8 +129,8 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
     private static void runIndexRebuilderAsync(final int interval, final boolean[] cancel, String table) {
         runIndexRebuilderAsync(interval, cancel, Collections.<String>singletonList(table));
     }
+
     private static void runIndexRebuilderAsync(final int interval, final boolean[] cancel, final List<String> tables) {
-        runRebuildOnce = true;
         Thread thread = new Thread(new Runnable() {
             @Override
             public void run() {
@@ -142,8 +143,6 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
                         throw new RuntimeException(e);
                     } catch (SQLException e) {
                         LOGGER.error(e.getMessage(),e);
-                    } finally {
-                        runRebuildOnce = false;
                     }
                 }
             }
@@ -561,7 +560,13 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
         
         @Override
         public long currentTime() {
-            return time++;
+            synchronized (this) {
+                return time++;
+            }
+        }
+
+        private synchronized void addTime(long diff) {
+            time += diff;
         }
     }
     
@@ -1088,4 +1093,71 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
             throw new DoNotRetryIOException("Simulating write failure on " + c.getEnvironment().getRegionInfo().getTable().getNameAsString());
         }
     }
+
+    @Test
+    public void testPendingDisableWithDisableCountTs() throws Throwable {
+        final String schemaName = generateUniqueName();
+        final String tableName = generateUniqueName();
+        final String indexName = generateUniqueName();
+        final String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        final MyClock clock =
+            new MyClock(EnvironmentEdgeManager.currentTimeMillis());
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute(String.format(
+                "CREATE TABLE %s (k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, "
+                    + "v3 VARCHAR, v4 VARCHAR) COLUMN_ENCODED_BYTES = 0, "
+                    + "DISABLE_INDEX_ON_WRITE_FAILURE = TRUE", fullTableName));
+            EnvironmentEdgeManager.injectEdge(clock);
+            clock.addTime(100);
+            conn.createStatement().execute(
+                String.format("CREATE INDEX %s ON %s (v1, v2)", indexName,
+                    fullTableName));
+            clock.addTime(100);
+            conn.createStatement().execute(
+                String.format("UPSERT INTO %s VALUES('k01', 'v01', 'v02', 'v03', 'v04')",
+                    fullTableName));
+            conn.commit();
+            clock.addTime(100);
+
+            try (HTableInterface systemCatalog = conn.unwrap(PhoenixConnection.class)
+                    .getQueryServices()
+                    .getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES)) {
+                IndexUtil.updateIndexState(fullIndexName, clock.currentTime(),
+                    systemCatalog, PIndexState.PENDING_DISABLE);
+            }
+
+            Configuration conf =
+                conn.unwrap(PhoenixConnection.class).getQueryServices().getConfiguration();
+
+            PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class);
+            ResultSet rs = stmt.executeQuery(
+                String.format("SELECT V2 FROM %s WHERE V1 = 'v01'", fullTableName));
+            assertTrue(rs.next());
+            assertEquals("v02", rs.getString(1));
+
+            long pendingDisableThreshold = conf.getLong(
+                QueryServices.INDEX_PENDING_DISABLE_THRESHOLD,
+                QueryServicesOptions.DEFAULT_INDEX_PENDING_DISABLE_THRESHOLD);
+            long pendingDisableCountLastUpdatedTs =
+                IndexUtil.getIndexPendingDisableCountLastUpdatedTimestamp(
+                    conn.unwrap(PhoenixConnection.class), fullIndexName);
+            clock.addTime(pendingDisableThreshold + pendingDisableCountLastUpdatedTs);
+
+            stmt = conn.createStatement().unwrap(PhoenixStatement.class);
+            rs = stmt.executeQuery(
+                String.format("SELECT V2 FROM %s WHERE V1 = 'v01'", fullTableName));
+            assertTrue(rs.next());
+            assertEquals("v02", rs.getString(1));
+
+            Thread.sleep(1000);
+            waitForIndexState(conn, fullTableName, fullIndexName,
+                PIndexState.DISABLE);
+        } finally {
+            EnvironmentEdgeManager.reset();
+        }
+    }
+
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index b4e2b84..4058dde 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -314,9 +314,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
                         continue;
                     }
 
-                    long indexDisableTimestamp =
-                            PLong.INSTANCE.getCodec().decodeLong(disabledTimeStamp, 0,
-                                SortOrder.ASC);
                     byte[] dataTable = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                         PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES);
                     if ((dataTable == null || dataTable.length == 0) || indexStateCell == null) {
@@ -361,14 +358,21 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
                     }
                     
                     PIndexState indexState = PIndexState.fromSerializedValue(indexStateBytes[0]);
-                    long elapsedSinceDisable = EnvironmentEdgeManager.currentTimeMillis() - Math.abs(indexDisableTimestamp);
+                    long pendingDisableCountLastUpdatedTs =
+                        IndexUtil.getIndexPendingDisableCountLastUpdatedTimestamp(conn, indexTableFullName);
+                    long elapsedSinceDisable =
+                        EnvironmentEdgeManager.currentTimeMillis() - pendingDisableCountLastUpdatedTs;
 
                     // on an index write failure, the server side transitions to PENDING_DISABLE, then the client
                     // retries, and after retries are exhausted, disables the index
                     if (indexState == PIndexState.PENDING_DISABLE) {
                         if (elapsedSinceDisable > pendingDisableThreshold) {
-                            // too long in PENDING_DISABLE - client didn't disable the index, so we do it here
-                            IndexUtil.updateIndexState(conn, indexTableFullName, PIndexState.DISABLE, indexDisableTimestamp);
+                            // too long in PENDING_DISABLE -
+                            // client didn't disable the index because last time when
+                            // PENDING_DISABLE_COUNT was updated is greater than pendingDisableThreshold,
+                            // so we do it here
+                            IndexUtil.updateIndexState(conn, indexTableFullName,
+                                PIndexState.DISABLE, pendingDisableCountLastUpdatedTs);
                         }
                         continue;
                     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 6e9e8f0..ec1a4f2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -913,4 +913,25 @@ public class IndexUtil {
             throw new IOException(e);
         }
     }
+
+    public static long getIndexPendingDisableCountLastUpdatedTimestamp(
+            PhoenixConnection conn, String failedIndexTable)
+            throws IOException {
+        byte[] indexTableKey =
+            SchemaUtil.getTableKeyFromFullName(failedIndexTable);
+        Get get = new Get(indexTableKey);
+        get.addColumn(TABLE_FAMILY_BYTES,
+            PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES);
+        byte[] systemCatalog = SchemaUtil.getPhysicalTableName(
+            PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME,
+            conn.getQueryServices().getProps()).getName();
+        try (Table table = conn.getQueryServices().getTable(systemCatalog)) {
+            Result result = table.get(get);
+            Cell cell = result.listCells().get(0);
+            return cell.getTimestamp();
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+
 }