You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sk...@apache.org on 2020/01/08 23:34:16 UTC
[phoenix] branch 4.x-HBase-1.4 updated: PHOENIX-5644:
IndexUpgradeTool should sleep only once if there is at least one immutable
table provided
This is an automated email from the ASF dual-hosted git repository.
skadam pushed a commit to branch 4.x-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.4 by this push:
new 4fb6cc6 PHOENIX-5644: IndexUpgradeTool should sleep only once if there is at least one immutable table provided
4fb6cc6 is described below
commit 4fb6cc667946d3e1aa998d16488d5af717f4dad2
Author: s.kadam <s....@apache.org>
AuthorDate: Tue Jan 7 15:53:18 2020 -0800
PHOENIX-5644: IndexUpgradeTool should sleep only once if there is at least one immutable table provided
---
.../end2end/ParameterizedIndexUpgradeToolIT.java | 2 +
.../phoenix/mapreduce/index/IndexUpgradeTool.java | 202 +++++++++++++++++----
2 files changed, 164 insertions(+), 40 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java
index a529f33..fe8ee3c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java
@@ -312,6 +312,8 @@ public class ParameterizedIndexUpgradeToolIT extends BaseTest {
iut.executeTool();
//testing actual run
validate(false);
+ Assert.assertEquals("Index upgrade tool didn't wait for client cache to expire",
+ true, iut.getWaited());
}
@Test
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java
index 0a1f276..6ea8e3b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java
@@ -49,6 +49,8 @@ import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.EnvironmentEdge;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -72,7 +74,8 @@ import java.util.UUID;
import java.util.logging.FileHandler;
import java.util.logging.SimpleFormatter;
-import static org.apache.phoenix.query.QueryServicesOptions.GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN;
+import static org.apache.phoenix.query.QueryServicesOptions.
+ GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN;
public class IndexUpgradeTool extends Configured implements Tool {
@@ -116,6 +119,7 @@ public class IndexUpgradeTool extends Configured implements Tool {
private String inputFile;
private boolean test = false;
+ private boolean waited = false;
public void setDryRun(boolean dryRun) {
this.dryRun = dryRun;
@@ -134,6 +138,7 @@ public class IndexUpgradeTool extends Configured implements Tool {
}
public void setTest(boolean test) { this.test = test; }
+ public boolean getWaited() { return this.waited; }
public boolean getDryRun() {
return this.dryRun;
}
@@ -318,53 +323,153 @@ public class IndexUpgradeTool extends Configured implements Tool {
return -1;
}
- private int executeTool(Connection conn, ConnectionQueryServices queryServices,
+ private int executeTool(Connection conn,
+ ConnectionQueryServices queryServices,
Configuration conf) {
- LOGGER.info("Executing " + operation);
+ ArrayList<String> immutableList = new ArrayList<>();
+ ArrayList<String> mutableList = new ArrayList<>();
for (Map.Entry<String, HashSet<String>> entry :tablesAndIndexes.entrySet()) {
String dataTableFullName = entry.getKey();
- HashSet<String> indexes = entry.getValue();
+ try {
+ PTable dataTable = PhoenixRuntime.getTableNoCache(conn, dataTableFullName);
+ if (dataTable.isImmutableRows()) {
+ //add to list where immutable tables are processed in a different function
+ immutableList.add(dataTableFullName);
+ } else {
+ mutableList.add(dataTableFullName);
+ }
+ } catch (SQLException e) {
+ LOGGER.severe("Something went wrong while getting the PTable "
+ + dataTableFullName + " "+e);
+ return -1;
+ }
+ }
+ long startWaitTime = executeToolForImmutableTables(queryServices, immutableList);
+ executeToolForMutableTables(conn, queryServices, conf, mutableList);
+ enableImmutableTables(queryServices, immutableList, startWaitTime);
+ rebuildIndexes(conn, conf, immutableList);
+ return 0;
+ }
+ private long executeToolForImmutableTables(ConnectionQueryServices queryServices,
+ ArrayList<String> immutableList) {
+ LOGGER.info("Started " + operation + " for immutable tables");
+ for (String dataTableFullName : immutableList) {
try (Admin admin = queryServices.getAdmin()) {
- PTable dataTable = PhoenixRuntime.getTableNoCache(conn, dataTableFullName);
- LOGGER.info("Executing " + operation + " of " + dataTableFullName);
+ HashSet<String> indexes = tablesAndIndexes.get(dataTableFullName);
+ LOGGER.info("Executing " + operation + " of " + dataTableFullName
+ + " (immutable)");
+ disableTable(admin, dataTableFullName, indexes);
+ modifyTable(admin, dataTableFullName, indexes);
+ } catch (IOException | SQLException e) {
+ LOGGER.severe("Something went wrong while disabling "
+ + "or modifying immutable table " + e);
+ handleFailure(queryServices, dataTableFullName, immutableList);
+ }
+ }
+ long startWaitTime = EnvironmentEdgeManager.currentTimeMillis();
+ return startWaitTime;
+ }
+ private void executeToolForMutableTables(Connection conn,
+ ConnectionQueryServices queryServices,
+ Configuration conf,
+ ArrayList<String> mutableTables) {
+ LOGGER.info("Started " + operation + " for mutable tables");
+ for (String dataTableFullName : mutableTables) {
+ try (Admin admin = queryServices.getAdmin()) {
+ HashSet<String> indexes = tablesAndIndexes.get(dataTableFullName);
+ LOGGER.info("Executing " + operation + " of " + dataTableFullName);
disableTable(admin, dataTableFullName, indexes);
modifyTable(admin, dataTableFullName, indexes);
- if (dataTable.isImmutableRows()) {
- // If the table is immutable, we need to wait for clients to purge
- // their caches of table metadata
- LOGGER.info(dataTableFullName + " is an immutable table, waiting for "
- + (GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN + 1)
- + " minutes for client cache to expire");
- if (!(test || dryRun || indexes.isEmpty())) {
- try {
- Thread.sleep((GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN + 1)
- * 60 * 1000);
- } catch (InterruptedException e) {
- LOGGER.warning("Sleep before starting index rebuild interrupted. "
- + e.getMessage());
- }
- }
- }
enableTable(admin, dataTableFullName, indexes);
LOGGER.info("Completed " + operation + " of " + dataTableFullName);
-
} catch (IOException | SQLException e) {
- LOGGER.severe("Something went wrong while executing " + operation
- + " steps " + e);
- return -1;
+ LOGGER.severe("Something went wrong while executing "
+ + operation + " steps for "+ dataTableFullName + " " + e);
+ handleFailure(queryServices, dataTableFullName, mutableTables);
}
}
// Opportunistically kick-off index rebuilds after upgrade operation
- if (upgrade) {
- for (String dataTableFullName : tablesAndIndexes.keySet()) {
- rebuildIndexes(conn, conf, dataTableFullName);
- LOGGER.info("Started index rebuild post " + operation + " of "
- + dataTableFullName);
+ rebuildIndexes(conn, conf, mutableTables);
+ }
+
+ private void handleFailure(ConnectionQueryServices queryServices,
+ String dataTableFullName,
+ ArrayList<String> tableList) {
+ LOGGER.info("Performing error handling to revert the steps taken during " +operation);
+ HashSet<String> indexes = tablesAndIndexes.get(dataTableFullName);
+ try (Admin admin = queryServices.getAdmin()) {
+ upgrade = !upgrade;
+ disableTable(admin, dataTableFullName, indexes);
+ modifyTable(admin, dataTableFullName, indexes);
+ enableTable(admin, dataTableFullName, indexes);
+ upgrade = !upgrade;
+
+ tablesAndIndexes.remove(dataTableFullName); //removing from the map
+ tableList.remove(dataTableFullName); //removing from the list
+
+ LOGGER.severe(dataTableFullName+" has been removed from the list as tool failed"
+ + " to perform "+operation);
+ } catch (IOException | SQLException e) {
+ LOGGER.severe("Revert of the "+operation +" failed in error handling, "
+ + "throwing runtime exception");
+ LOGGER.severe("Confirm the state for "+getSubListString(tableList, dataTableFullName));
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void enableImmutableTables(ConnectionQueryServices queryServices,
+ ArrayList<String> immutableList,
+ long startWaitTime) {
+ long endWaitTime = EnvironmentEdgeManager.currentTimeMillis();
+ long waitMore = getWaitMoreTime(endWaitTime, startWaitTime);
+ while (waitMore>0) {
+ // If the table is immutable, we need to wait for clients to purge
+ // their caches of table metadata
+ LOGGER.info("waiting for more " + waitMore + " ms for client cache "
+ + "to expire for immutable tables");
+ try {
+ startWaitTime = EnvironmentEdgeManager.currentTimeMillis();
+ Thread.sleep(waitMore);
+ waited = true;
+ } catch (InterruptedException e) {
+ endWaitTime = EnvironmentEdgeManager.currentTimeMillis();
+ waitMore = getWaitMoreTime(endWaitTime, startWaitTime);
+ LOGGER.warning("Sleep before starting index rebuild is interrupted. "
+ + "Attempting to sleep again! " + e.getMessage());
}
}
- return 0;
+ for (String dataTableFullName: immutableList) {
+ try (Admin admin = queryServices.getAdmin()) {
+ HashSet<String> indexes = tablesAndIndexes.get(dataTableFullName);
+ enableTable(admin, dataTableFullName, indexes);
+ } catch (IOException | SQLException e) {
+ LOGGER.severe("Something went wrong while enabling immutable table " + e);
+ //removing to avoid any rebuilds after upgrade
+ tablesAndIndexes.remove(dataTableFullName);
+ immutableList.remove(dataTableFullName);
+ throw new RuntimeException("Manually enable the following tables "
+ + getSubListString(immutableList, dataTableFullName)
+ + " and run the index rebuild ", e);
+ }
+ }
+ }
+
+ private String getSubListString(ArrayList<String> tableList, String dataTableFullName) {
+ return StringUtils.join(tableList.subList(tableList.indexOf(dataTableFullName),
+ tableList.size()), ",");
+ }
+
+ private long getWaitMoreTime(long endWaitTime, long startWaitTime) {
+ int waitTime = GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN+1;
+ if(test) {
+ return 1;
+ }
+ if(dryRun) {
+ return 0; //no wait
+ }
+ return (((waitTime) * 60000) - Math.abs(endWaitTime-startWaitTime));
}
private void disableTable(Admin admin, String dataTable, HashSet<String>indexes)
@@ -422,9 +527,19 @@ public class IndexUpgradeTool extends Configured implements Tool {
}
}
+ private void rebuildIndexes(Connection conn, Configuration conf, ArrayList<String> tableList) {
+ if (!upgrade) {
+ return;
+ }
+ for (String table: tableList) {
+ rebuildIndexes(conn, conf, table);
+ }
+ }
+
private void rebuildIndexes(Connection conn, Configuration conf, String dataTableFullName) {
try {
- HashMap<String, IndexInfo> rebuildMap = prepareToRebuildIndexes(conn, dataTableFullName);
+ HashMap<String, IndexInfo>
+ rebuildMap = prepareToRebuildIndexes(conn, dataTableFullName);
//for rebuilding indexes in case of upgrade and if there are indexes on the table/view.
if (rebuildMap.isEmpty()) {
@@ -441,7 +556,8 @@ public class IndexUpgradeTool extends Configured implements Tool {
} catch (SQLException e) {
LOGGER.severe("Failed to prepare the map for index rebuilds " + e);
- throw new RuntimeException("Failed to prepare the map for index rebuilds");
+ throw new RuntimeException("Failed to prepare the map for index rebuilds, "
+ + "manually rebuild the indexes");
}
}
@@ -475,7 +591,8 @@ public class IndexUpgradeTool extends Configured implements Tool {
}
LOGGER.info("Loaded " + coprocName + " coprocessor on table " + tableName);
} else {
- LOGGER.info(coprocName + " coprocessor on table " + tableName + "is already loaded");
+ LOGGER.info(coprocName + " coprocessor on table " + tableName
+ + " is already loaded");
}
}
@@ -487,7 +604,8 @@ public class IndexUpgradeTool extends Configured implements Tool {
}
LOGGER.info("Unloaded "+ coprocName +"coprocessor on table " + tableName);
} else {
- LOGGER.info(coprocName + " coprocessor on table " + tableName + " is already unloaded");
+ LOGGER.info(coprocName + " coprocessor on table " + tableName
+ + " is already unloaded");
}
}
@@ -531,8 +649,10 @@ public class IndexUpgradeTool extends Configured implements Tool {
try {
LOGGER.info("Rebuilding index: " + StringUtils.join( args,","));
if (!dryRun) {
- // If the index is in DISABLED state, indexTool will fail. First to ALTER REBUILD ASYNC.
- // ALTER REBUILD ASYNC will set the index state to BUILDING which is safe to make ACTIVE later.
+ // If the index is in DISABLED state, indexTool will fail.
+ // First to ALTER REBUILD ASYNC.
+ // ALTER REBUILD ASYNC will set the index state to BUILDING
+ // which is safe to make ACTIVE later.
if (!Strings.isNullOrEmpty(tenantId) && !GLOBAL_INDEX_ID.equals(tenantId)) {
Configuration conf = HBaseConfiguration.addHbaseResources(getConf());
conf.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
@@ -540,7 +660,8 @@ public class IndexUpgradeTool extends Configured implements Tool {
tenantConnection = newConnection;
}
- PTable indexPTable = PhoenixRuntime.getTable(newConnection, indexInfo.getPhysicalIndexTableName());
+ PTable indexPTable = PhoenixRuntime.getTable(newConnection,
+ indexInfo.getPhysicalIndexTableName());
if (indexPTable.getIndexState() == PIndexState.DISABLE) {
String dataTableFullName = dataTable;
if (!dataTableFullName.contains(":") && !dataTableFullName.contains(".")) {
@@ -659,7 +780,8 @@ public class IndexUpgradeTool extends Configured implements Tool {
tenantId);
for (String viewIndex : viewIndexes) {
IndexInfo indexInfo = new IndexInfo(schemaName, viewName,
- tenantId == null ? GLOBAL_INDEX_ID : tenantId, viewIndex, viewIndexPhysicalName);
+ tenantId == null ? GLOBAL_INDEX_ID : tenantId, viewIndex,
+ viewIndexPhysicalName);
indexInfos.put(viewIndex, indexInfo);
}
}