You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sk...@apache.org on 2019/07/10 04:12:08 UTC
[phoenix] branch 4.14-HBase-1.3 updated: PHOENIX-5333: A tool to
upgrade existing tables/indexes to use self-consistent global indexes
design
This is an automated email from the ASF dual-hosted git repository.
skadam pushed a commit to branch 4.14-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.14-HBase-1.3 by this push:
new 95d7bba PHOENIX-5333: A tool to upgrade existing tables/indexes to use self-consistent global indexes design
95d7bba is described below
commit 95d7bba2a29742edabf49bb559beb722604194f3
Author: s.kadam <s....@salesforce.com>
AuthorDate: Tue Jul 9 16:51:09 2019 -0700
PHOENIX-5333: A tool to upgrade existing tables/indexes to use self-consistent global indexes design
---
.../end2end/ParameterizedIndexUpgradeToolIT.java | 327 ++++++++++++
.../phoenix/mapreduce/index/IndexUpgradeTool.java | 547 +++++++++++++++++++++
2 files changed, 874 insertions(+)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java
new file mode 100644
index 0000000..400df93
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.cli.CommandLine;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.index.GlobalIndexChecker;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.mapreduce.index.IndexUpgradeTool;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized.Parameters;
+import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.phoenix.mapreduce.index.IndexUpgradeTool.ROLLBACK_OP;
+import static org.apache.phoenix.mapreduce.index.IndexUpgradeTool.UPGRADE_OP;
+
+@RunWith(Parameterized.class)
+@Category(NeedsOwnMiniClusterTest.class)
+public class ParameterizedIndexUpgradeToolIT extends BaseTest {
+ //Please do not remove/uncomment commented items in the list until PHOENIX-5385 is fixed
+ private static final String [] INDEXES_LIST = {"TEST.INDEX1", "TEST.INDEX2", "TEST1.INDEX3",
+ "TEST1.INDEX2","TEST1.INDEX1","TEST.INDEX3"/*, "_IDX_TEST.MOCK1", "_IDX_TEST1.MOCK2"*/};
+ private static final String [] INDEXES_LIST_NAMESPACE = {"TEST:INDEX1", "TEST:INDEX2", "TEST1:INDEX3",
+ "TEST1:INDEX2","TEST1:INDEX1","TEST:INDEX3"/*, "TEST:_IDX_MOCK1", "TEST1:_IDX_MOCK2"*/};
+ private static final String [] TABLE_LIST = {"TEST.MOCK1","TEST1.MOCK2","TEST.MOCK3"};
+ private static final String [] TABLE_LIST_NAMESPACE = {"TEST:MOCK1","TEST1:MOCK2","TEST:MOCK3"};
+
+ private static final String INPUT_LIST = "TEST.MOCK1,TEST1.MOCK2,TEST.MOCK3";
+ private static final String INPUT_FILE = "/tmp/input_file_index_upgrade.csv";
+
+ private static Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(1),
+ clientProps = Maps.newHashMapWithExpectedSize(1);
+
+ private final boolean mutable;
+ private final boolean upgrade;
+ private final boolean isNamespaceEnabled;
+
+ private StringBuilder optionsBuilder;
+ private String tableDDLOptions;
+ private Connection conn;
+ private Admin admin;
+ private IndexUpgradeTool iut;
+
+ @Before
+ public void setup () throws Exception {
+ optionsBuilder = new StringBuilder();
+
+ setClusterProperties();
+
+ setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
+ new ReadOnlyProps(clientProps.entrySet().iterator()));
+
+ conn = DriverManager.getConnection(getUrl(), new Properties());
+ conn.setAutoCommit(true);
+ ConnectionQueryServices queryServices = conn.unwrap(PhoenixConnection.class)
+ .getQueryServices();
+ admin = queryServices.getAdmin();
+ iut = new IndexUpgradeTool(upgrade ? UPGRADE_OP : ROLLBACK_OP, INPUT_LIST,
+ null, "/tmp/index_upgrade_" + UUID.randomUUID().toString(),true, Mockito.mock(IndexTool.class));
+ iut.setConf(getUtility().getConfiguration());
+ iut.setTest(true);
+ if (!mutable) {
+ optionsBuilder.append(" IMMUTABLE_ROWS=true");
+ }
+ tableDDLOptions = optionsBuilder.toString();
+ prepareSetup();
+ }
+
+ private void setClusterProperties() {
+ // we need to destroy the cluster if it was initiated using property as true
+ if (Boolean.toString(upgrade).equals(clientProps
+ .get(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB))
+ || Boolean.toString(!isNamespaceEnabled).equals(serverProps
+ .get(QueryServices.IS_NAMESPACE_MAPPING_ENABLED))) {
+ tearDownMiniClusterAsync(1);
+ }
+ //setting up properties for namespace
+ clientProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED,
+ Boolean.toString(isNamespaceEnabled));
+ clientProps.put(QueryServices.IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE,
+ Boolean.toString(isNamespaceEnabled));
+ serverProps.putAll(clientProps);
+ //To mimic the upgrade/rollback scenario, so that table creation uses old/new design
+ clientProps.put(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB,
+ Boolean.toString(!upgrade));
+ }
+
+ private void prepareSetup() throws SQLException {
+ //inputList is "TEST.MOCK1,TEST1.MOCK2,TEST.MOCK3";
+ if (isNamespaceEnabled) {
+ conn.createStatement().execute("CREATE SCHEMA TEST");
+ conn.createStatement().execute("CREATE SCHEMA TEST1");
+ }
+ conn.createStatement().execute("CREATE TABLE TEST.MOCK1 (id bigint NOT NULL "
+ + "PRIMARY KEY, a.name varchar, sal bigint, address varchar)"+tableDDLOptions);
+ conn.createStatement().execute("CREATE TABLE TEST1.MOCK2 (id bigint NOT NULL "
+ + "PRIMARY KEY, name varchar, city varchar, phone bigint)"+tableDDLOptions);
+ conn.createStatement().execute("CREATE TABLE TEST.MOCK3 (id bigint NOT NULL "
+ + "PRIMARY KEY, name varchar, age bigint)"+tableDDLOptions);
+
+ //Please do not remove/uncomment commented code until PHOENIX-5385 is fixed
+ //views
+ /*conn.createStatement().execute("CREATE VIEW TEST.MOCK1_VIEW (view_column varchar) "
+ + "AS SELECT * FROM TEST.MOCK1 WHERE a.name = 'a'");
+ conn.createStatement().execute("CREATE VIEW TEST.MOCK1_VIEW1 (view_column varchar,"
+ + " zip varchar) AS SELECT * FROM TEST.MOCK1 WHERE a.name = 'a'");
+ conn.createStatement().execute("CREATE VIEW TEST1.MOCK2_VIEW (view_column varchar,"
+ + " state varchar) AS SELECT * FROM TEST1.MOCK2 WHERE name = 'c'");
+ //view-indexes
+ conn.createStatement().execute("CREATE INDEX MOCK1_INDEX1 ON TEST.MOCK1_VIEW1 "
+ + "(view_column)");
+ conn.createStatement().execute("CREATE INDEX MOCK1_INDEX2 ON TEST.MOCK1_VIEW1 "
+ + "(zip)");
+ conn.createStatement().execute("CREATE INDEX MOCK2_INDEX1 ON TEST1.MOCK2_VIEW "
+ + "(state, city)");
+ conn.createStatement().execute("CREATE INDEX MOCK1_INDEX3 ON TEST.MOCK1_VIEW "
+ + "(view_column)");*/
+ //indexes
+ conn.createStatement().execute("CREATE INDEX INDEX1 ON TEST.MOCK1 (sal, a.name)");
+ conn.createStatement().execute("CREATE INDEX INDEX2 ON TEST.MOCK1 (a.name)");
+ conn.createStatement().execute("CREATE INDEX INDEX1 ON TEST1.MOCK2 (city)");
+ conn.createStatement().execute("CREATE INDEX INDEX2 ON TEST1.MOCK2 (phone)");
+ conn.createStatement().execute("CREATE INDEX INDEX3 ON TEST1.MOCK2 (name)");
+ conn.createStatement().execute("CREATE INDEX INDEX3 ON TEST.MOCK3 (age, name)");
+ }
+
+ private void validate(boolean pre) throws IOException {
+ String [] indexList = INDEXES_LIST;
+ String [] tableList = TABLE_LIST;
+ if(isNamespaceEnabled) {
+ indexList = INDEXES_LIST_NAMESPACE;
+ tableList = TABLE_LIST_NAMESPACE;
+ }
+ if (pre) {
+ if (upgrade) {
+ checkOldIndexingCoprocessors(indexList,tableList);
+ } else {
+ checkNewIndexingCoprocessors(indexList,tableList);
+ }
+ } else {
+ if (upgrade) {
+ checkNewIndexingCoprocessors(indexList,tableList);
+ } else {
+ checkOldIndexingCoprocessors(indexList,tableList);
+ }
+ }
+ }
+
+ private void checkNewIndexingCoprocessors(String [] indexList, String [] tableList) throws IOException {
+ if (mutable) {
+ for (String table : tableList) {
+ Assert.assertTrue(admin.getTableDescriptor(TableName.valueOf(table))
+ .hasCoprocessor(IndexRegionObserver.class.getName()));
+ Assert.assertFalse(admin.getTableDescriptor(TableName.valueOf(table))
+ .hasCoprocessor(Indexer.class.getName()));
+ }
+ }
+ for (String index : indexList) {
+ Assert.assertTrue(admin.getTableDescriptor(TableName.valueOf(index))
+ .hasCoprocessor(GlobalIndexChecker.class.getName()));
+ }
+ }
+
+ private void checkOldIndexingCoprocessors(String [] indexList, String [] tableList) throws IOException {
+ if (mutable) {
+ for (String table : tableList) {
+ Assert.assertTrue(admin.getTableDescriptor(TableName.valueOf(table))
+ .hasCoprocessor(Indexer.class.getName()));
+ Assert.assertFalse(admin.getTableDescriptor(TableName.valueOf(table))
+ .hasCoprocessor(IndexRegionObserver.class.getName()));
+ }
+ }
+ for (String index : indexList) {
+ Assert.assertFalse(admin.getTableDescriptor(TableName.valueOf(index))
+ .hasCoprocessor(GlobalIndexChecker.class.getName()));
+ }
+ }
+
+ @Parameters(name ="IndexUpgradeToolIT_mutable={0},upgrade={1},isNamespaceEnabled={2}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {false, false, true},
+ {true, false, true},
+ {false, true, true},
+ {true, true, true},
+ {false, false, false},
+ {true, false, false},
+ {false, true, false},
+ {true, true, false}
+ });
+ }
+
+ public ParameterizedIndexUpgradeToolIT(boolean mutable, boolean upgrade, boolean isNamespaceEnabled) {
+ this.mutable = mutable;
+ this.upgrade = upgrade;
+ this.isNamespaceEnabled = isNamespaceEnabled;
+ }
+
+ @Test
+ public void testNonDryRunToolWithMultiTables() throws Exception {
+ validate(true);
+ iut.setDryRun(false);
+ iut.setLogFile(null);
+ iut.prepareToolSetup();
+ iut.executeTool();
+ //testing actual run
+ validate(false);
+ }
+
+ @Test
+ public void testToolWithIncorrectTables() throws Exception {
+ validate(true);
+ iut.setInputTables("TEST3.TABLE_NOT_PRESENT");
+ iut.prepareToolSetup();
+
+ int status = iut.executeTool();
+ Assert.assertEquals(-1, status);
+ validate(true);
+ }
+
+ @Test
+ public void testToolWithInputFileParameter() throws Exception {
+ BufferedWriter writer = new BufferedWriter(new FileWriter(new File(INPUT_FILE)));
+ writer.write(INPUT_LIST);
+ writer.close();
+
+ validate(true);
+
+ iut.setInputTables(null);
+ iut.setInputFile(INPUT_FILE);
+ iut.prepareToolSetup();
+ iut.executeTool();
+
+ validate(true);
+ }
+
+ @Test
+ public void testCommandLineParsing() {
+
+ String outputFile = "/tmp/index_upgrade_" + UUID.randomUUID().toString();
+ String [] args = {"-o", upgrade ? UPGRADE_OP : ROLLBACK_OP, "-tb",
+ INPUT_LIST, "-lf", outputFile, "-d"};
+ IndexUpgradeTool iut = new IndexUpgradeTool();
+
+ CommandLine cmd = iut.parseOptions(args);
+ iut.initializeTool(cmd);
+ Assert.assertEquals(iut.getDryRun(),true);
+ Assert.assertEquals(iut.getInputTables(), INPUT_LIST);
+ Assert.assertEquals(iut.getOperation(), upgrade ? UPGRADE_OP : ROLLBACK_OP);
+ Assert.assertEquals(iut.getLogFile(), outputFile);
+ }
+
+ @After
+ public void cleanup() throws SQLException {
+ //TEST.MOCK1,TEST1.MOCK2,TEST.MOCK3
+ conn.createStatement().execute("DROP INDEX INDEX1 ON TEST.MOCK1");
+ conn.createStatement().execute("DROP INDEX INDEX2 ON TEST.MOCK1");
+ conn.createStatement().execute("DROP INDEX INDEX1 ON TEST1.MOCK2");
+ conn.createStatement().execute("DROP INDEX INDEX2 ON TEST1.MOCK2");
+ conn.createStatement().execute("DROP INDEX INDEX3 ON TEST1.MOCK2");
+ conn.createStatement().execute("DROP INDEX INDEX3 ON TEST.MOCK3");
+
+
+ //Please do not remove/uncomment commented code until PHOENIX-5385 is fixed
+ /*conn.createStatement().execute("DROP INDEX MOCK1_INDEX3 ON TEST.MOCK1_VIEW");
+ conn.createStatement().execute("DROP INDEX MOCK1_INDEX1 ON TEST.MOCK1_VIEW1");
+ conn.createStatement().execute("DROP INDEX MOCK1_INDEX2 ON TEST.MOCK1_VIEW1");
+ conn.createStatement().execute("DROP INDEX MOCK2_INDEX1 ON TEST1.MOCK2_VIEW");
+
+ conn.createStatement().execute("DROP VIEW TEST.MOCK1_VIEW");
+ conn.createStatement().execute("DROP VIEW TEST.MOCK1_VIEW1");
+ conn.createStatement().execute("DROP VIEW TEST1.MOCK2_VIEW");*/
+
+ conn.createStatement().execute("DROP TABLE TEST.MOCK1");
+ conn.createStatement().execute("DROP TABLE TEST1.MOCK2");
+ conn.createStatement().execute("DROP TABLE TEST.MOCK3");
+
+ if (isNamespaceEnabled) {
+ conn.createStatement().execute("DROP SCHEMA TEST");
+ conn.createStatement().execute("DROP SCHEMA TEST1");
+ }
+ conn.close();
+ }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java
new file mode 100644
index 0000000..400747f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configured;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
+import org.apache.phoenix.index.GlobalIndexChecker;
+import org.apache.phoenix.index.PhoenixIndexBuilder;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.query.ConnectionQueryServices;
+
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.logging.Logger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.util.SchemaUtil;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.logging.FileHandler;
+import java.util.logging.SimpleFormatter;
+
+import static org.apache.phoenix.query.QueryServicesOptions.GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN;
+
+public class IndexUpgradeTool extends Configured {
+
+ private static final Logger LOGGER = Logger.getLogger(IndexUpgradeTool.class.getName());
+
+ private static final Option OPERATION_OPTION = new Option("o", "operation",
+ true,
+ "[Required]Operation to perform (upgrade/rollback)");
+ private static final Option TABLE_OPTION = new Option("tb", "table", true,
+ "[Required]Tables list ex. table1,table2");
+ private static final Option TABLE_CSV_FILE_OPTION = new Option("f", "file",
+ true,
+ "[Optional]Tables list in a csv file");
+ private static final Option DRY_RUN_OPTION = new Option("d", "dry-run",
+ false,
+ "[Optional]If passed this will output steps that will be executed");
+ private static final Option HELP_OPTION = new Option("h", "help",
+ false, "Help");
+ private static final Option LOG_FILE_OPTION = new Option("lf", "logfile",
+ true,
+ "Log file path where the logs are written");
+ private static final Option INDEX_SYNC_REBUILD_OPTION = new Option("sr", "index-sync-rebuild",
+ false,
+ "[Optional]Whether or not synchronously rebuild the indexes; default rebuild asynchronous");
+
+ public static final String UPGRADE_OP = "upgrade";
+ public static final String ROLLBACK_OP = "rollback";
+ private static final String GLOBAL_INDEX_ID = "#NA#";
+ private IndexTool indexingTool;
+
+ private HashMap<String, HashSet<String>> tablesAndIndexes = new HashMap<>();
+ private HashMap<String, HashMap<String,String>> rebuildMap = new HashMap<>();
+ private HashMap<String, String> prop = new HashMap<>();
+
+ private boolean dryRun, upgrade, syncRebuild;
+ private String operation;
+ private String inputTables;
+ private String logFile;
+ private String inputFile;
+
+ private boolean test = false;
+
+ public void setDryRun(boolean dryRun) {
+ this.dryRun = dryRun;
+ }
+
+ public void setInputTables(String inputTables) {
+ this.inputTables = inputTables;
+ }
+
+ public void setLogFile(String logFile) {
+ this.logFile = logFile;
+ }
+
+ public void setInputFile(String inputFile) {
+ this.inputFile = inputFile;
+ }
+
+ public void setTest(boolean test) { this.test = test; }
+
+ public boolean getDryRun() {
+ return this.dryRun;
+ }
+
+ public String getInputTables() {
+ return this.inputTables;
+ }
+
+ public String getLogFile() {
+ return this.logFile;
+ }
+
+ public String getOperation() {
+ return operation;
+ }
+
+ public static void main (String[] args) {
+ CommandLine cmdLine = null;
+
+ IndexUpgradeTool iut = new IndexUpgradeTool();
+ try {
+ cmdLine = iut.parseOptions(args);
+ LOGGER.info("Index Upgrade tool initiated: "+ StringUtils.join( args, ","));
+ } catch (IllegalStateException e) {
+ iut.printHelpAndExit(e.getMessage(), iut.getOptions());
+ }
+ iut.initializeTool(cmdLine);
+ iut.prepareToolSetup();
+ iut.executeTool();
+ }
+
+ public IndexUpgradeTool(String mode, String tables, String inputFile,
+ String outputFile, boolean dryRun, IndexTool indexTool) {
+ this.operation = mode;
+ this.inputTables = tables;
+ this.inputFile = inputFile;
+ this.logFile = outputFile;
+ this.dryRun = dryRun;
+ this.indexingTool = indexTool;
+ }
+
+ public IndexUpgradeTool () { }
+
+ /**
+ * Parses the commandline arguments, throws IllegalStateException if mandatory arguments are
+ * missing.
+ * @param args supplied command line arguments
+ * @return the parsed command line
+ */
+ @VisibleForTesting
+ public CommandLine parseOptions(String[] args) {
+
+ final Options options = getOptions();
+
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmdLine = null;
+ try {
+ cmdLine = parser.parse(options, args);
+ } catch (ParseException e) {
+ printHelpAndExit("severe parsing command line options: " + e.getMessage(),
+ options);
+ }
+ if (cmdLine.hasOption(HELP_OPTION.getOpt())) {
+ printHelpAndExit(options, 0);
+ }
+ if (!cmdLine.hasOption(OPERATION_OPTION.getOpt())) {
+ throw new IllegalStateException(OPERATION_OPTION.getLongOpt()
+ +" is a mandatory parameter");
+ }
+ if (cmdLine.hasOption(DRY_RUN_OPTION.getOpt())
+ && !cmdLine.hasOption(LOG_FILE_OPTION.getOpt())) {
+ throw new IllegalStateException("Log file with "+TABLE_OPTION.getLongOpt()
+ + " is mandatory if " + DRY_RUN_OPTION.getLongOpt() +" is passed");
+ }
+ if (!(cmdLine.hasOption(TABLE_OPTION.getOpt()))
+ && !(cmdLine.hasOption(TABLE_CSV_FILE_OPTION.getOpt()))) {
+ throw new IllegalStateException("Tables list should be passed in either with"
+ +TABLE_OPTION.getLongOpt() + " or " + TABLE_CSV_FILE_OPTION.getLongOpt());
+ }
+ if ((cmdLine.hasOption(TABLE_OPTION.getOpt()))
+ && (cmdLine.hasOption(TABLE_CSV_FILE_OPTION.getOpt()))) {
+ throw new IllegalStateException("Tables list passed in with"
+ +TABLE_OPTION.getLongOpt() + " and " + TABLE_CSV_FILE_OPTION.getLongOpt()
+ + "; specify only one.");
+ }
+ return cmdLine;
+ }
+
+ private void printHelpAndExit(String severeMessage, Options options) {
+ System.err.println(severeMessage);
+ printHelpAndExit(options, 1);
+ }
+
+ private void printHelpAndExit(Options options, int exitCode) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("help", options);
+ System.exit(exitCode);
+ }
+
+ private Options getOptions() {
+ final Options options = new Options();
+ options.addOption(OPERATION_OPTION);
+ TABLE_OPTION.setOptionalArg(true);
+ options.addOption(TABLE_OPTION);
+ TABLE_CSV_FILE_OPTION.setOptionalArg(true);
+ options.addOption(TABLE_CSV_FILE_OPTION);
+ DRY_RUN_OPTION.setOptionalArg(true);
+ options.addOption(DRY_RUN_OPTION);
+ LOG_FILE_OPTION.setOptionalArg(true);
+ options.addOption(LOG_FILE_OPTION);
+ options.addOption(HELP_OPTION);
+ INDEX_SYNC_REBUILD_OPTION.setOptionalArg(true);
+ options.addOption(INDEX_SYNC_REBUILD_OPTION);
+
+ return options;
+ }
+
+ @VisibleForTesting
+ public void initializeTool(CommandLine cmdLine) {
+ operation = cmdLine.getOptionValue(OPERATION_OPTION.getOpt());
+ inputTables = cmdLine.getOptionValue(TABLE_OPTION.getOpt());
+ logFile = cmdLine.getOptionValue(LOG_FILE_OPTION.getOpt());
+ inputFile = cmdLine.getOptionValue(TABLE_CSV_FILE_OPTION.getOpt());
+ dryRun = cmdLine.hasOption(DRY_RUN_OPTION.getOpt());
+ syncRebuild = cmdLine.hasOption(INDEX_SYNC_REBUILD_OPTION.getOpt());
+ }
+
+ @VisibleForTesting
+ public void prepareToolSetup() {
+ try {
+ if (logFile != null) {
+ FileHandler fh = new FileHandler(logFile);
+ fh.setFormatter(new SimpleFormatter());
+ LOGGER.addHandler(fh);
+ }
+
+ prop.put(Indexer.INDEX_BUILDER_CONF_KEY, PhoenixIndexBuilder.class.getName());
+ prop.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
+
+ if (inputTables == null) {
+ inputTables = new String(Files.readAllBytes(Paths.get(inputFile)));
+ }
+ if (inputTables == null) {
+ LOGGER.severe("Tables' list is not available; use -tb or -f option");
+ }
+ LOGGER.info("list of tables passed: " + inputTables);
+
+ if (operation.equalsIgnoreCase(UPGRADE_OP)) {
+ upgrade = true;
+ } else if (operation.equalsIgnoreCase(ROLLBACK_OP)) {
+ upgrade = false;
+ } else {
+ throw new IllegalStateException("Invalid option provided for "
+ + OPERATION_OPTION.getOpt() + " expected values: {upgrade, rollback}");
+ }
+ if (dryRun) {
+ LOGGER.info("This is the beginning of the tool with dry run.");
+ }
+ } catch (IOException e) {
+ LOGGER.severe("Something went wrong "+e);
+ System.exit(-1);
+ }
+ }
+
+ @VisibleForTesting
+ public int executeTool() {
+ Configuration conf = HBaseConfiguration.addHbaseResources(getConf());
+
+ try (Connection conn = ConnectionUtil.getInputConnection(conf)) {
+
+ ConnectionQueryServices queryServices = conn.unwrap(PhoenixConnection.class)
+ .getQueryServices();
+
+ boolean status = extractTablesAndIndexes(conn.unwrap(PhoenixConnection.class));
+
+ if (status) {
+ return executeTool(conn, queryServices, conf);
+ }
+ } catch (SQLException e) {
+ LOGGER.severe("Something went wrong in executing tool "+ e);
+ }
+ return -1;
+ }
+
+ private int executeTool(Connection conn, ConnectionQueryServices queryServices,
+ Configuration conf) {
+
+ LOGGER.info("Executing " + operation);
+ for (Map.Entry<String, HashSet<String>> entry :tablesAndIndexes.entrySet()) {
+ String dataTableFullName = entry.getKey();
+ HashSet<String> indexes = entry.getValue();
+
+ try (Admin admin = queryServices.getAdmin()) {
+
+ PTable dataTable = PhoenixRuntime.getTableNoCache(conn, dataTableFullName);
+ LOGGER.fine("Executing " + operation + " for " + dataTableFullName);
+
+ boolean mutable = !(dataTable.isImmutableRows());
+ if (!mutable) {
+ LOGGER.fine("Data table is immutable, waiting for "
+ + GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN + 1
+ + " minutes for client cache to expire");
+ if (!test) {
+ Thread.sleep(
+ (GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN + 1) * 60 * 1000);
+ }
+ }
+ disableTable(admin, dataTableFullName, indexes);
+ modifyTable(admin, dataTableFullName, indexes);
+ enableTable(admin, dataTableFullName, indexes);
+ if (upgrade) {
+ if(!test) {
+ indexingTool = new IndexTool();
+ }
+ indexingTool.setConf(conf);
+ rebuildIndexes(dataTableFullName, indexingTool);
+ }
+ } catch (IOException | SQLException | InterruptedException e) {
+ LOGGER.severe("Something went wrong while executing " + operation + " steps " + e);
+ return -1;
+ }
+ }
+ return 0;
+ }
+
+ private void modifyTable(Admin admin, String dataTableFullName, HashSet<String> indexes)
+ throws IOException {
+ if (upgrade) {
+ modifyIndexTable(admin, indexes);
+ modifyDataTable(admin, dataTableFullName);
+ } else {
+ modifyDataTable(admin, dataTableFullName);
+ modifyIndexTable(admin, indexes);
+ }
+ }
+
+ private void disableTable(Admin admin, String dataTable, HashSet<String>indexes)
+ throws IOException {
+ if (admin.isTableEnabled(TableName.valueOf(dataTable))) {
+ if (!dryRun) {
+ admin.disableTable(TableName.valueOf(dataTable));
+ }
+ LOGGER.info("Disabled data table " + dataTable);
+ } else {
+ LOGGER.info( "Data table " + dataTable +" is already disabled");
+ }
+ for (String indexName : indexes) {
+ if (admin.isTableEnabled(TableName.valueOf(indexName))) {
+ if (!dryRun) {
+ admin.disableTable(TableName.valueOf(indexName));
+ }
+ LOGGER.info("Disabled index table " + indexName);
+ } else {
+ LOGGER.info( "Index table " + indexName +" is already disabled");
+ }
+ }
+ }
+
+ private void enableTable(Admin admin, String dataTable, HashSet<String>indexes)
+ throws IOException {
+ if (!admin.isTableEnabled(TableName.valueOf(dataTable))) {
+ if (!dryRun) {
+ admin.enableTable(TableName.valueOf(dataTable));
+ }
+ LOGGER.info("Enabled data table " + dataTable);
+ } else {
+ LOGGER.info( "Data table " + dataTable +" is already enabled");
+ }
+ for (String indexName : indexes) {
+ if(!admin.isTableEnabled(TableName.valueOf(indexName))) {
+ if (!dryRun) {
+ admin.enableTable(TableName.valueOf(indexName));
+ }
+ LOGGER.info("Enabled index table " + indexName);
+ } else {
+ LOGGER.info( "Index table " + indexName +" is already enabled");
+ }
+ }
+ }
+
+ private void modifyDataTable(Admin admin, String tableName)
+ throws IOException {
+ HTableDescriptor tableDesc = admin.getTableDescriptor(TableName.valueOf(tableName));
+ if (upgrade) {
+ removeCoprocessor(admin, tableName, tableDesc, Indexer.class.getName());
+ addCoprocessor(admin, tableName, tableDesc, IndexRegionObserver.class.getName());
+ } else {
+ removeCoprocessor(admin, tableName, tableDesc, IndexRegionObserver.class.getName());
+ addCoprocessor(admin, tableName, tableDesc, Indexer.class.getName());
+ }
+ if (!dryRun) {
+ admin.modifyTable(TableName.valueOf(tableName), tableDesc);
+ }
+ }
+
+ private void addCoprocessor(Admin admin, String tableName, HTableDescriptor tableDesc, String coprocName) throws IOException {
+ if (!admin.getTableDescriptor(TableName.valueOf(tableName)).hasCoprocessor(coprocName)) {
+ if (!dryRun) {
+ tableDesc.addCoprocessor(coprocName,
+ null, QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY, prop);
+ }
+ LOGGER.info("Loaded "+coprocName+" coprocessor on table " + tableName);
+ } else {
+ LOGGER.info(coprocName+" coprocessor on table " + tableName + "is already loaded");
+ }
+ }
+
+ private void removeCoprocessor(Admin admin, String tableName, HTableDescriptor tableDesc, String coprocName) throws IOException {
+ if (admin.getTableDescriptor(TableName.valueOf(tableName)).hasCoprocessor(coprocName)) {
+ if (!dryRun) {
+ tableDesc.removeCoprocessor(coprocName);
+ }
+ LOGGER.info("Unloaded "+ coprocName +"coprocessor on table " + tableName);
+ } else {
+ LOGGER.info(coprocName+" coprocessor on table " + tableName + " is already unloaded");
+ }
+ }
+
+ private void modifyIndexTable(Admin admin, HashSet<String> indexes)
+ throws IOException {
+ for (String indexName : indexes) {
+ HTableDescriptor indexTableDesc = admin.getTableDescriptor(TableName.valueOf(indexName));
+ if (upgrade) {
+ addCoprocessor(admin, indexName, indexTableDesc, GlobalIndexChecker.class.getName());
+ } else {
+ removeCoprocessor(admin, indexName, indexTableDesc, GlobalIndexChecker.class.getName());
+ }
+ if (!dryRun) {
+ admin.modifyTable(TableName.valueOf(indexName),indexTableDesc);
+ }
+ }
+ }
+
+ private int rebuildIndexes(String dataTable, IndexTool indexingTool) {
+ String schema = SchemaUtil.getSchemaNameFromFullName(dataTable);
+ String table = SchemaUtil.getTableNameFromFullName(dataTable);
+ for(Map.Entry<String, String> indexMap : rebuildMap.get(dataTable).entrySet()) {
+ String index = indexMap.getKey();
+ String tenantId = indexMap.getValue();
+ String indexName = SchemaUtil.getTableNameFromFullName(index);
+ String outFile = "/tmp/index_rebuild_" + indexName +
+ (GLOBAL_INDEX_ID.equals(tenantId)?"":"_"+tenantId) +"_"+ UUID.randomUUID().toString();
+ String[] args =
+ { "-s", schema, "-dt", table, "-it", indexName, "-direct", "-op", outFile };
+ ArrayList<String> list = new ArrayList<>(Arrays.asList(args));
+ if (!GLOBAL_INDEX_ID.equals(tenantId)) {
+ list.add("-tenant");
+ list.add(tenantId);
+ }
+ if (syncRebuild) {
+ list.add("-runfg");
+ }
+ args = list.toArray(new String[list.size()]);
+
+ try {
+ LOGGER.info("Rebuilding index " + indexName);
+ if (!dryRun) {
+ indexingTool.run(args);
+ }
+ } catch (Exception e) {
+ LOGGER.severe("Something went wrong while building the index " + index + " " + e);
+ return -1;
+ }
+ }
+ return 0;
+ }
+
+ private boolean extractTablesAndIndexes(PhoenixConnection conn) {
+ String [] tables = inputTables.trim().split(",");
+ PTable dataTable = null;
+ try {
+ for (String tableName : tables) {
+ HashSet<String> physicalIndexes = new HashSet<>();
+ dataTable = PhoenixRuntime.getTableNoCache(conn, tableName);
+ String physicalTableName = dataTable.getPhysicalName().getString();
+ HashMap<String, String> rebuildIndexes = new HashMap<>();
+
+ if (!dataTable.isTransactional() && dataTable.getType().equals(PTableType.TABLE)) {
+ for (PTable indexTable : dataTable.getIndexes()) {
+ if (indexTable.getIndexType().equals(PTable.IndexType.GLOBAL)) {
+ physicalIndexes.add(indexTable.getPhysicalName().getString());
+ rebuildIndexes.put(indexTable.getPhysicalName().getString(), GLOBAL_INDEX_ID);
+ }
+ }
+
+ if (MetaDataUtil.hasViewIndexTable(conn, dataTable.getPhysicalName())) {
+ String viewIndexPhysicalName = MetaDataUtil.getViewIndexPhysicalName(physicalTableName);
+ physicalIndexes.add(viewIndexPhysicalName);
+
+ ResultSet rs =
+ conn.createStatement().executeQuery(
+ "SELECT DISTINCT TABLE_NAME, TENANT_ID FROM "
+ + "SYSTEM.CATALOG WHERE COLUMN_FAMILY = \'"
+ + viewIndexPhysicalName +"\' AND TABLE_TYPE = \'i\'");
+ while (rs.next()) {
+ String viewIndexName = rs.getString(1);
+ String tenantId = rs.getString(2);
+ rebuildIndexes.put(viewIndexName, tenantId);
+ }
+ }
+ rebuildMap.put(physicalTableName, rebuildIndexes);
+ tablesAndIndexes.put(physicalTableName, physicalIndexes);
+ } else {
+ LOGGER.info("Skipping Table " + tableName + " because it is "+
+ (dataTable.isTransactional() ? "transactional" : "not a data table"));
+ }
+ }
+ return true;
+ } catch (SQLException e) {
+ LOGGER.severe("Failed to find list of indexes "+e);
+ if (dataTable == null) {
+ LOGGER.severe("Unable to find the provided data table");
+ }
+ return false;
+ }
+ }
+}
\ No newline at end of file