You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sk...@apache.org on 2020/01/08 23:36:38 UTC

[phoenix] branch 4.15-HBase-1.4 updated: PHOENIX-5644: IndexUpgradeTool should sleep only once if there is at least one immutable table provided

This is an automated email from the ASF dual-hosted git repository.

skadam pushed a commit to branch 4.15-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.15-HBase-1.4 by this push:
     new 411cf3f  PHOENIX-5644: IndexUpgradeTool should sleep only once if there is at least one immutable table provided
411cf3f is described below

commit 411cf3f63419e49d774d91a87fe47e01033f8b44
Author: s.kadam <s....@apache.org>
AuthorDate: Tue Jan 7 15:53:18 2020 -0800

    PHOENIX-5644: IndexUpgradeTool should sleep only once if there is at least one immutable table provided
---
 .../end2end/ParameterizedIndexUpgradeToolIT.java   |   2 +
 .../phoenix/mapreduce/index/IndexUpgradeTool.java  | 202 +++++++++++++++++----
 2 files changed, 164 insertions(+), 40 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java
index a529f33..fe8ee3c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java
@@ -312,6 +312,8 @@ public class ParameterizedIndexUpgradeToolIT extends BaseTest {
         iut.executeTool();
         //testing actual run
         validate(false);
+        Assert.assertEquals("Index upgrade tool didn't wait for client cache to expire",
+                true, iut.getWaited());
     }
 
     @Test
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java
index 0a1f276..6ea8e3b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java
@@ -49,6 +49,8 @@ import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.EnvironmentEdge;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 
@@ -72,7 +74,8 @@ import java.util.UUID;
 import java.util.logging.FileHandler;
 import java.util.logging.SimpleFormatter;
 
-import static org.apache.phoenix.query.QueryServicesOptions.GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN;
+import static org.apache.phoenix.query.QueryServicesOptions.
+        GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN;
 
 public class IndexUpgradeTool extends Configured implements Tool {
 
@@ -116,6 +119,7 @@ public class IndexUpgradeTool extends Configured implements Tool {
     private String inputFile;
 
     private boolean test = false;
+    private boolean waited = false;
 
     public void setDryRun(boolean dryRun) {
         this.dryRun = dryRun;
@@ -134,6 +138,7 @@ public class IndexUpgradeTool extends Configured implements Tool {
     }
 
     public void setTest(boolean test) { this.test = test; }
+    public boolean getWaited() { return this.waited; }
     public boolean getDryRun() {
         return this.dryRun;
     }
@@ -318,53 +323,153 @@ public class IndexUpgradeTool extends Configured implements Tool {
         return -1;
     }
 
-    private int executeTool(Connection conn, ConnectionQueryServices queryServices,
+    private int executeTool(Connection conn,
+            ConnectionQueryServices queryServices,
             Configuration conf) {
-        LOGGER.info("Executing " + operation);
+        ArrayList<String> immutableList = new ArrayList<>();
+        ArrayList<String> mutableList = new ArrayList<>();
         for (Map.Entry<String, HashSet<String>> entry :tablesAndIndexes.entrySet()) {
             String dataTableFullName = entry.getKey();
-            HashSet<String> indexes = entry.getValue();
+            try {
+                PTable dataTable = PhoenixRuntime.getTableNoCache(conn, dataTableFullName);
+                if (dataTable.isImmutableRows()) {
+                    //add to list where immutable tables are processed in a different function
+                    immutableList.add(dataTableFullName);
+                } else {
+                    mutableList.add(dataTableFullName);
+                }
+            } catch (SQLException e) {
+                LOGGER.severe("Something went wrong while getting the PTable "
+                        + dataTableFullName + " "+e);
+                return -1;
+            }
+        }
+        long startWaitTime = executeToolForImmutableTables(queryServices, immutableList);
+        executeToolForMutableTables(conn, queryServices, conf, mutableList);
+        enableImmutableTables(queryServices, immutableList, startWaitTime);
+        rebuildIndexes(conn, conf, immutableList);
+        return 0;
+    }
 
+    private long executeToolForImmutableTables(ConnectionQueryServices queryServices,
+            ArrayList<String> immutableList) {
+        LOGGER.info("Started " + operation + " for immutable tables");
+        for (String dataTableFullName : immutableList) {
             try (Admin admin = queryServices.getAdmin()) {
-                PTable dataTable = PhoenixRuntime.getTableNoCache(conn, dataTableFullName);
-                LOGGER.info("Executing " + operation + " of " + dataTableFullName);
+                HashSet<String> indexes = tablesAndIndexes.get(dataTableFullName);
+                LOGGER.info("Executing " + operation + " of " + dataTableFullName
+                        + " (immutable)");
+                disableTable(admin, dataTableFullName, indexes);
+                modifyTable(admin, dataTableFullName, indexes);
+            } catch (IOException | SQLException e) {
+                LOGGER.severe("Something went wrong while disabling "
+                        + "or modifying immutable table " + e);
+                handleFailure(queryServices, dataTableFullName, immutableList);
+            }
+        }
+        long startWaitTime = EnvironmentEdgeManager.currentTimeMillis();
+        return startWaitTime;
+    }
 
+    private void executeToolForMutableTables(Connection conn,
+            ConnectionQueryServices queryServices,
+            Configuration conf,
+            ArrayList<String> mutableTables) {
+        LOGGER.info("Started " + operation + " for mutable tables");
+        for (String dataTableFullName : mutableTables) {
+            try (Admin admin = queryServices.getAdmin()) {
+                HashSet<String> indexes = tablesAndIndexes.get(dataTableFullName);
+                LOGGER.info("Executing " + operation + " of " + dataTableFullName);
                 disableTable(admin, dataTableFullName, indexes);
                 modifyTable(admin, dataTableFullName, indexes);
-                if (dataTable.isImmutableRows()) {
-                    // If the table is immutable, we need to wait for clients to purge
-                    // their caches of table metadata
-                    LOGGER.info(dataTableFullName + " is an immutable table, waiting for "
-                            + (GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN + 1)
-                            + " minutes for client cache to expire");
-                    if (!(test || dryRun || indexes.isEmpty())) {
-                        try {
-                            Thread.sleep((GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN + 1)
-                                    * 60 * 1000);
-                        } catch (InterruptedException e) {
-                            LOGGER.warning("Sleep before starting index rebuild interrupted. "
-                                    + e.getMessage());
-                        }
-                    }
-                }
                 enableTable(admin, dataTableFullName, indexes);
                 LOGGER.info("Completed " + operation + " of " + dataTableFullName);
-
             } catch (IOException | SQLException e) {
-                LOGGER.severe("Something went wrong while executing " + operation
-                        + " steps " + e);
-                return -1;
+                LOGGER.severe("Something went wrong while executing "
+                    + operation + " steps for "+ dataTableFullName + " " + e);
+                handleFailure(queryServices, dataTableFullName, mutableTables);
             }
         }
         // Opportunistically kick-off index rebuilds after upgrade operation
-        if (upgrade) {
-            for (String dataTableFullName : tablesAndIndexes.keySet()) {
-                rebuildIndexes(conn, conf, dataTableFullName);
-                LOGGER.info("Started index rebuild post " + operation + " of "
-                        + dataTableFullName);
+        rebuildIndexes(conn, conf, mutableTables);
+    }
+
+    private void handleFailure(ConnectionQueryServices queryServices,
+            String dataTableFullName,
+            ArrayList<String> tableList) {
+        LOGGER.info("Performing error handling to revert the steps taken during " +operation);
+        HashSet<String> indexes = tablesAndIndexes.get(dataTableFullName);
+        try (Admin admin = queryServices.getAdmin()) {
+            upgrade = !upgrade;
+            disableTable(admin, dataTableFullName, indexes);
+            modifyTable(admin, dataTableFullName, indexes);
+            enableTable(admin, dataTableFullName, indexes);
+            upgrade = !upgrade;
+
+            tablesAndIndexes.remove(dataTableFullName); //removing from the map
+            tableList.remove(dataTableFullName); //removing from the list
+
+            LOGGER.severe(dataTableFullName+" has been removed from the list as tool failed"
+                    + " to perform "+operation);
+        } catch (IOException | SQLException e) {
+            LOGGER.severe("Revert of the "+operation +" failed in error handling, "
+                    + "throwing runtime exception");
+            LOGGER.severe("Confirm the state for "+getSubListString(tableList, dataTableFullName));
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void enableImmutableTables(ConnectionQueryServices queryServices,
+            ArrayList<String> immutableList,
+            long startWaitTime) {
+        long endWaitTime = EnvironmentEdgeManager.currentTimeMillis();
+        long waitMore = getWaitMoreTime(endWaitTime, startWaitTime);
+        while (waitMore>0) {
+            // If the table is immutable, we need to wait for clients to purge
+            // their caches of table metadata
+            LOGGER.info("waiting for more " + waitMore + " ms for client cache "
+                    + "to expire for immutable tables");
+            try {
+                startWaitTime = EnvironmentEdgeManager.currentTimeMillis();
+                Thread.sleep(waitMore);
+                waited = true;
+            } catch (InterruptedException e) {
+                endWaitTime = EnvironmentEdgeManager.currentTimeMillis();
+                waitMore = getWaitMoreTime(endWaitTime, startWaitTime);
+                LOGGER.warning("Sleep before starting index rebuild is interrupted. "
+                        + "Attempting to sleep again! " + e.getMessage());
             }
         }
-        return 0;
+        for (String dataTableFullName: immutableList) {
+            try (Admin admin = queryServices.getAdmin()) {
+                HashSet<String> indexes = tablesAndIndexes.get(dataTableFullName);
+                enableTable(admin, dataTableFullName, indexes);
+            } catch (IOException | SQLException e) {
+                LOGGER.severe("Something went wrong while enabling immutable table " + e);
+                //removing to avoid any rebuilds after upgrade
+                tablesAndIndexes.remove(dataTableFullName);
+                immutableList.remove(dataTableFullName);
+                throw new RuntimeException("Manually enable the following tables "
+                    + getSubListString(immutableList, dataTableFullName)
+                    + " and run the index rebuild ", e);
+            }
+        }
+    }
+
+    private String getSubListString(ArrayList<String> tableList, String dataTableFullName) {
+        return StringUtils.join(tableList.subList(tableList.indexOf(dataTableFullName),
+                tableList.size()), ",");
+    }
+
+    private long getWaitMoreTime(long endWaitTime, long startWaitTime) {
+        int waitTime = GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN+1;
+        if(test) {
+            return 1;
+        }
+        if(dryRun) {
+            return 0; //no wait
+        }
+        return (((waitTime) * 60000) - Math.abs(endWaitTime-startWaitTime));
     }
 
     private void disableTable(Admin admin, String dataTable, HashSet<String>indexes)
@@ -422,9 +527,19 @@ public class IndexUpgradeTool extends Configured implements Tool {
         }
     }
 
+    private void rebuildIndexes(Connection conn, Configuration conf, ArrayList<String> tableList) {
+        if (!upgrade) {
+            return;
+        }
+        for (String table: tableList) {
+            rebuildIndexes(conn, conf, table);
+        }
+    }
+
     private void rebuildIndexes(Connection conn, Configuration conf, String dataTableFullName) {
         try {
-            HashMap<String, IndexInfo> rebuildMap = prepareToRebuildIndexes(conn, dataTableFullName);
+            HashMap<String, IndexInfo>
+                    rebuildMap = prepareToRebuildIndexes(conn, dataTableFullName);
 
             //for rebuilding indexes in case of upgrade and if there are indexes on the table/view.
             if (rebuildMap.isEmpty()) {
@@ -441,7 +556,8 @@ public class IndexUpgradeTool extends Configured implements Tool {
 
         } catch (SQLException e) {
             LOGGER.severe("Failed to prepare the map for index rebuilds " + e);
-            throw new RuntimeException("Failed to prepare the map for index rebuilds");
+            throw new RuntimeException("Failed to prepare the map for index rebuilds, "
+                    + "manually rebuild the indexes");
         }
     }
 
@@ -475,7 +591,8 @@ public class IndexUpgradeTool extends Configured implements Tool {
             }
             LOGGER.info("Loaded " + coprocName + " coprocessor on table " + tableName);
         } else {
-            LOGGER.info(coprocName + " coprocessor on table " + tableName + "is already loaded");
+            LOGGER.info(coprocName + " coprocessor on table " + tableName
+                    + " is already loaded");
         }
     }
 
@@ -487,7 +604,8 @@ public class IndexUpgradeTool extends Configured implements Tool {
             }
             LOGGER.info("Unloaded "+ coprocName +"coprocessor on table " + tableName);
         } else {
-            LOGGER.info(coprocName + " coprocessor on table " + tableName + " is already unloaded");
+            LOGGER.info(coprocName + " coprocessor on table " + tableName
+                    + " is already unloaded");
         }
     }
 
@@ -531,8 +649,10 @@ public class IndexUpgradeTool extends Configured implements Tool {
             try {
                 LOGGER.info("Rebuilding index: " + StringUtils.join( args,","));
                 if (!dryRun) {
-                    // If the index is in DISABLED state, indexTool will fail. First to ALTER REBUILD ASYNC.
-                    // ALTER REBUILD ASYNC will set the index state to BUILDING which is safe to make ACTIVE later.
+                    // If the index is in DISABLED state, indexTool will fail.
+                    // First to ALTER REBUILD ASYNC.
+                    // ALTER REBUILD ASYNC will set the index state to BUILDING
+                    // which is safe to make ACTIVE later.
                     if (!Strings.isNullOrEmpty(tenantId) && !GLOBAL_INDEX_ID.equals(tenantId)) {
                         Configuration conf = HBaseConfiguration.addHbaseResources(getConf());
                         conf.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
@@ -540,7 +660,8 @@ public class IndexUpgradeTool extends Configured implements Tool {
                         tenantConnection = newConnection;
                     }
 
-                    PTable indexPTable = PhoenixRuntime.getTable(newConnection, indexInfo.getPhysicalIndexTableName());
+                    PTable indexPTable = PhoenixRuntime.getTable(newConnection,
+                            indexInfo.getPhysicalIndexTableName());
                     if (indexPTable.getIndexState() == PIndexState.DISABLE) {
                         String dataTableFullName = dataTable;
                         if (!dataTableFullName.contains(":") && !dataTableFullName.contains(".")) {
@@ -659,7 +780,8 @@ public class IndexUpgradeTool extends Configured implements Tool {
                         tenantId);
                 for (String viewIndex : viewIndexes) {
                     IndexInfo indexInfo = new IndexInfo(schemaName, viewName,
-                            tenantId == null ? GLOBAL_INDEX_ID : tenantId, viewIndex, viewIndexPhysicalName);
+                            tenantId == null ? GLOBAL_INDEX_ID : tenantId, viewIndex,
+                            viewIndexPhysicalName);
                     indexInfos.put(viewIndex, indexInfo);
                 }
             }