You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2019/02/28 04:28:11 UTC
[phoenix] branch 4.x-HBase-1.2 updated: PHOENIX-5089 Add tenantId
parameter to IndexScrunityTool
This is an automated email from the ASF dual-hosted git repository.
gjacoby pushed a commit to branch 4.x-HBase-1.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.2 by this push:
new 33f2a3d PHOENIX-5089 Add tenantId parameter to IndexScrunityTool
33f2a3d is described below
commit 33f2a3d69e0f82e12c8f286040c167313217604a
Author: Gokcen Iskender <gi...@salesforce.com>
AuthorDate: Wed Feb 27 16:35:05 2019 -0800
PHOENIX-5089 Add tenantId parameter to IndexScrunityTool
Signed-off-by: Geoffrey Jacoby <gj...@apache.org>
---
.../phoenix/end2end/IndexScrutinyToolIT.java | 1406 +++++++++++---------
.../phoenix/mapreduce/index/IndexScrutinyTool.java | 58 +-
.../apache/phoenix/mapreduce/index/IndexTool.java | 5 +-
.../phoenix/mapreduce/util/IndexColumnNames.java | 3 +
4 files changed, 821 insertions(+), 651 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
index cbce7b2..318a076 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
@@ -47,9 +47,12 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTableOutput;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
@@ -59,6 +62,7 @@ import org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters;
import org.apache.phoenix.mapreduce.index.SourceTargetColumnNames;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -73,6 +77,7 @@ import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -80,680 +85,859 @@ import org.junit.runners.Parameterized;
* Tests for the {@link IndexScrutinyTool}
*/
@Category(NeedsOwnMiniClusterTest.class)
-@RunWith(Parameterized.class)
-public class IndexScrutinyToolIT extends BaseTest {
+@RunWith(Enclosed.class)
+public class IndexScrutinyToolIT {
+
+ abstract public static class SharedIndexToolIT extends BaseTest {
+ protected String outputDir;
+
+ @BeforeClass public static void doSetup() throws Exception {
+ Map<String, String> serverProps = Maps.newHashMap();
+ //disable major compactions
+ serverProps.put(HConstants.MAJOR_COMPACTION_PERIOD, "0");
+ Map<String, String> clientProps = Maps.newHashMap();
+ setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
+ new ReadOnlyProps(clientProps.entrySet().iterator()));
+ }
- private String dataTableDdl;
- private String indexTableDdl;
+ protected List<Job> runScrutiny(String[] cmdArgs) throws Exception {
+ IndexScrutinyTool scrutiny = new IndexScrutinyTool();
+ Configuration conf = new Configuration(getUtility().getConfiguration());
+ scrutiny.setConf(conf);
+ int status = scrutiny.run(cmdArgs);
+ assertEquals(0, status);
+ for (Job job : scrutiny.getJobs()) {
+ assertTrue(job.waitForCompletion(true));
+ }
+ return scrutiny.getJobs();
+ }
- private static final String UPSERT_SQL = "UPSERT INTO %s VALUES(?,?,?,?)";
+ protected String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize,
+ SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat, Long maxOutputRows, String tenantId, Long scrutinyTs) {
+ final List<String> args = Lists.newArrayList();
+ if (schemaName != null) {
+ args.add("-s");
+ args.add(schemaName);
+ }
+ args.add("-dt");
+ args.add(dataTable);
+ args.add("-it");
+ args.add(indxTable);
+
+ // TODO test snapshot reads
+ // if(useSnapshot) {
+ // args.add("-snap");
+ // }
+
+ if (OutputFormat.FILE.equals(outputFormat)) {
+ args.add("-op");
+ outputDir = "/tmp/" + UUID.randomUUID().toString();
+ args.add(outputDir);
+ }
+ args.add("-t");
+ args.add(String.valueOf(scrutinyTs));
+ args.add("-run-foreground");
+ if (batchSize != null) {
+ args.add("-b");
+ args.add(String.valueOf(batchSize));
+ }
- private static final String INDEX_UPSERT_SQL =
- "UPSERT INTO %s (\"0:NAME\", \":ID\", \"0:ZIP\", \"0:EMPLOY_DATE\") values (?,?,?,?)";
+ // default to using data table as the source table
+ args.add("-src");
+ if (sourceTable == null) {
+ args.add(SourceTable.DATA_TABLE_SOURCE.name());
+ } else {
+ args.add(sourceTable.name());
+ }
+ if (outputInvalidRows) {
+ args.add("-o");
+ }
+ if (outputFormat != null) {
+ args.add("-of");
+ args.add(outputFormat.name());
+ }
+ if (maxOutputRows != null) {
+ args.add("-om");
+ args.add(maxOutputRows.toString());
+ }
+ if (tenantId != null) {
+ args.add("-tenant");
+ args.add(tenantId);
+ }
+ return args.toArray(new String[0]);
+ }
- private static final String DELETE_SQL = "DELETE FROM %s ";
+ protected long getCounterValue(Counters counters, Enum<PhoenixScrutinyJobCounters> counter) {
+ return counters.findCounter(counter).getValue();
+ }
+ }
- private String schemaName;
- private String dataTableName;
- private String dataTableFullName;
- private String indexTableName;
- private String indexTableFullName;
- private String outputDir;
+ @RunWith(Parameterized.class)
+ public static class IndexScrutinyToolNonTenantIT extends SharedIndexToolIT {
- private Connection conn;
+ private String dataTableDdl;
+ private String indexTableDdl;
- private PreparedStatement dataTableUpsertStmt;
+ private static final String UPSERT_SQL = "UPSERT INTO %s VALUES(?,?,?,?)";
- private PreparedStatement indexTableUpsertStmt;
+ private static final String
+ INDEX_UPSERT_SQL =
+ "UPSERT INTO %s (\"0:NAME\", \":ID\", \"0:ZIP\", \"0:EMPLOY_DATE\") values (?,?,?,?)";
- private long testTime;
- private Properties props;
+ private static final String DELETE_SQL = "DELETE FROM %s ";
- @Parameterized.Parameters
- public static Collection<Object[]> data() {
- return Arrays.asList(new Object[][] {
- { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR)", "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" },
- { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2", "CREATE INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" },
- { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2", "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" }
- });
- }
+ private String schemaName;
+ private String dataTableName;
+ private String dataTableFullName;
+ private String indexTableName;
+ private String indexTableFullName;
- public IndexScrutinyToolIT(String dataTableDdl, String indexTableDdl) {
- this.dataTableDdl = dataTableDdl;
- this.indexTableDdl = indexTableDdl;
- }
+ private Connection conn;
- @BeforeClass
- public static void doSetup() throws Exception {
- Map<String, String> serverProps = Maps.newHashMap();
- //disable major compactions
- serverProps.put(HConstants.MAJOR_COMPACTION_PERIOD, "0");
- Map<String, String> clientProps = Maps.newHashMap();
- setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
- }
+ private PreparedStatement dataTableUpsertStmt;
- /**
- * Create the test data and index tables
- */
- @Before
- public void setup() throws SQLException {
- generateUniqueTableNames();
- createTestTable(getUrl(), String.format(dataTableDdl, dataTableFullName));
- createTestTable(getUrl(),
- String.format(indexTableDdl, indexTableName, dataTableFullName));
- props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- conn = DriverManager.getConnection(getUrl(), props);
- String dataTableUpsert = String.format(UPSERT_SQL, dataTableFullName);
- dataTableUpsertStmt = conn.prepareStatement(dataTableUpsert);
- String indexTableUpsert = String.format(INDEX_UPSERT_SQL, indexTableFullName);
- indexTableUpsertStmt = conn.prepareStatement(indexTableUpsert);
- conn.setAutoCommit(false);
- testTime = EnvironmentEdgeManager.currentTimeMillis() - 1000;
+ private PreparedStatement indexTableUpsertStmt;
- }
+ private long testTime;
+ private Properties props;
- @After
- public void teardown() throws SQLException {
- if (conn != null) {
- conn.close();
+ @Parameterized.Parameters public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] { { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR)",
+ "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" }, { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2",
+ "CREATE INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" }, { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2",
+ "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" } });
}
- }
- /**
- * Tests a data table that is correctly indexed. Scrutiny should report all rows as valid.
- */
- @Test
- public void testValidIndex() throws Exception {
- // insert two rows
- upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
- upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
- conn.commit();
-
- int numDataRows = countRows(dataTableFullName);
- int numIndexRows = countRows(indexTableFullName);
-
- // scrutiny should report everything as ok
- List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
- Job job = completedJobs.get(0);
- assertTrue(job.isSuccessful());
- Counters counters = job.getCounters();
- assertEquals(2, getCounterValue(counters, VALID_ROW_COUNT));
- assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
-
- // make sure row counts weren't modified by scrutiny
- assertEquals(numDataRows, countRows(dataTableFullName));
- assertEquals(numIndexRows, countRows(indexTableFullName));
- }
+ public IndexScrutinyToolNonTenantIT(String dataTableDdl, String indexTableDdl) {
+ this.dataTableDdl = dataTableDdl;
+ this.indexTableDdl = indexTableDdl;
+ }
- /**
- * Tests running a scrutiny while updates and deletes are happening.
- * Since CURRENT_SCN is set, the scrutiny shouldn't report any issue.
- */
- @Test
- public void testScrutinyWhileTakingWrites() throws Exception {
- int id = 0;
- while (id < 1000) {
- int index = 1;
- dataTableUpsertStmt.setInt(index++, id);
- dataTableUpsertStmt.setString(index++, "name-" + id);
- dataTableUpsertStmt.setInt(index++, id);
- dataTableUpsertStmt.setTimestamp(index++, new Timestamp(testTime));
- dataTableUpsertStmt.executeUpdate();
- id++;
- }
- conn.commit();
-
- //CURRENT_SCN for scrutiny
- long scrutinyTS = EnvironmentEdgeManager.currentTimeMillis();
-
- // launch background upserts and deletes
- final Random random = new Random(0);
- Runnable backgroundUpserts = new Runnable() {
- @Override
- public void run() {
- int idToUpsert = random.nextInt(1000);
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- PreparedStatement dataPS =
- conn.prepareStatement(String.format(UPSERT_SQL, dataTableFullName));
- upsertRow(dataPS, idToUpsert, "modified-" + idToUpsert, idToUpsert + 1000);
- conn.commit();
- } catch (SQLException e) {
- e.printStackTrace();
- }
+ /**
+ * Create the test data and index tables
+ */
+ @Before public void setup() throws SQLException {
+ generateUniqueTableNames();
+ createTestTable(getUrl(), String.format(dataTableDdl, dataTableFullName));
+ createTestTable(getUrl(), String.format(indexTableDdl, indexTableName, dataTableFullName));
+ props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ conn = DriverManager.getConnection(getUrl(), props);
+ String dataTableUpsert = String.format(UPSERT_SQL, dataTableFullName);
+ dataTableUpsertStmt = conn.prepareStatement(dataTableUpsert);
+ String indexTableUpsert = String.format(INDEX_UPSERT_SQL, indexTableFullName);
+ indexTableUpsertStmt = conn.prepareStatement(indexTableUpsert);
+ conn.setAutoCommit(false);
+ testTime = EnvironmentEdgeManager.currentTimeMillis() - 1000;
+
+ }
+
+ @After public void teardown() throws SQLException {
+ if (conn != null) {
+ conn.close();
}
- };
- Runnable backgroundDeletes = new Runnable() {
- @Override
- public void run() {
- int idToDelete = random.nextInt(1000);
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- String deleteSql =
- String.format(DELETE_SQL, indexTableFullName) + "WHERE \":ID\"="
- + idToDelete;
- conn.createStatement().executeUpdate(deleteSql);
- conn.commit();
- } catch (SQLException e) {
- e.printStackTrace();
- }
+ }
+
+ /**
+ * Tests a data table that is correctly indexed. Scrutiny should report all rows as valid.
+ */
+ @Test public void testValidIndex() throws Exception {
+ // insert two rows
+ upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
+ upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
+ conn.commit();
+
+ int numDataRows = countRows(dataTableFullName);
+ int numIndexRows = countRows(indexTableFullName);
+
+ // scrutiny should report everything as ok
+ List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
+ Job job = completedJobs.get(0);
+ assertTrue(job.isSuccessful());
+ Counters counters = job.getCounters();
+ assertEquals(2, getCounterValue(counters, VALID_ROW_COUNT));
+ assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
+
+ // make sure row counts weren't modified by scrutiny
+ assertEquals(numDataRows, countRows(dataTableFullName));
+ assertEquals(numIndexRows, countRows(indexTableFullName));
+ }
+
+ /**
+ * Tests running a scrutiny while updates and deletes are happening.
+ * Since CURRENT_SCN is set, the scrutiny shouldn't report any issue.
+ */
+ @Test public void testScrutinyWhileTakingWrites() throws Exception {
+ int id = 0;
+ while (id < 1000) {
+ int index = 1;
+ dataTableUpsertStmt.setInt(index++, id);
+ dataTableUpsertStmt.setString(index++, "name-" + id);
+ dataTableUpsertStmt.setInt(index++, id);
+ dataTableUpsertStmt.setTimestamp(index++, new Timestamp(testTime));
+ dataTableUpsertStmt.executeUpdate();
+ id++;
}
- };
- ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
- scheduledThreadPool.scheduleWithFixedDelay(backgroundUpserts, 200, 200,
- TimeUnit.MILLISECONDS);
- scheduledThreadPool.scheduleWithFixedDelay(backgroundDeletes, 200, 200,
- TimeUnit.MILLISECONDS);
-
- // scrutiny should report everything as ok
- List<Job> completedJobs =
- runScrutinyCurrentSCN(schemaName, dataTableName, indexTableName,
- scrutinyTS);
- Job job = completedJobs.get(0);
- assertTrue(job.isSuccessful());
- Counters counters = job.getCounters();
- assertEquals(1000, getCounterValue(counters, VALID_ROW_COUNT));
- assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
- scheduledThreadPool.shutdown();
- scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS);
- }
+ conn.commit();
+
+ //CURRENT_SCN for scrutiny
+ long scrutinyTS = EnvironmentEdgeManager.currentTimeMillis();
+
+ // launch background upserts and deletes
+ final Random random = new Random(0);
+ Runnable backgroundUpserts = new Runnable() {
+ @Override public void run() {
+ int idToUpsert = random.nextInt(1000);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ PreparedStatement
+ dataPS =
+ conn.prepareStatement(String.format(UPSERT_SQL, dataTableFullName));
+ upsertRow(dataPS, idToUpsert, "modified-" + idToUpsert, idToUpsert + 1000);
+ conn.commit();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ Runnable backgroundDeletes = new Runnable() {
+ @Override public void run() {
+ int idToDelete = random.nextInt(1000);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String
+ deleteSql =
+ String.format(DELETE_SQL, indexTableFullName) + "WHERE \":ID\"=" + idToDelete;
+ conn.createStatement().executeUpdate(deleteSql);
+ conn.commit();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
+ scheduledThreadPool.scheduleWithFixedDelay(backgroundUpserts, 200, 200, TimeUnit.MILLISECONDS);
+ scheduledThreadPool.scheduleWithFixedDelay(backgroundDeletes, 200, 200, TimeUnit.MILLISECONDS);
+
+ // scrutiny should report everything as ok
+ List<Job>
+ completedJobs =
+ runScrutinyCurrentSCN(schemaName, dataTableName, indexTableName, scrutinyTS);
+ Job job = completedJobs.get(0);
+ assertTrue(job.isSuccessful());
+ Counters counters = job.getCounters();
+ assertEquals(1000, getCounterValue(counters, VALID_ROW_COUNT));
+ assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
+ scheduledThreadPool.shutdown();
+ scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS);
+ }
- /**
- * Tests an index with the same # of rows as the data table, but one of the index rows is
- * incorrect Scrutiny should report the invalid rows.
- */
- @Test
- public void testEqualRowCountIndexIncorrect() throws Exception {
- // insert one valid row
- upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
- conn.commit();
-
- // disable the index and insert another row which is not indexed
- disableIndex();
- upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
- conn.commit();
-
- // insert a bad row into the index
- upsertIndexRow("badName", 2, 9999);
- conn.commit();
-
- // scrutiny should report the bad row
- List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
- Job job = completedJobs.get(0);
- assertTrue(job.isSuccessful());
- Counters counters = job.getCounters();
- assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
- assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
- }
+ /**
+ * Tests an index with the same # of rows as the data table, but one of the index rows is
+ * incorrect Scrutiny should report the invalid rows.
+ */
+ @Test public void testEqualRowCountIndexIncorrect() throws Exception {
+ // insert one valid row
+ upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
+ conn.commit();
+
+ // disable the index and insert another row which is not indexed
+ disableIndex();
+ upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
+ conn.commit();
+
+ // insert a bad row into the index
+ upsertIndexRow("badName", 2, 9999);
+ conn.commit();
+
+ // scrutiny should report the bad row
+ List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
+ Job job = completedJobs.get(0);
+ assertTrue(job.isSuccessful());
+ Counters counters = job.getCounters();
+ assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+ assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
+ }
- /**
- * Tests an index where the index pk is correct (indexed col values are indexed correctly), but
- * a covered index value is incorrect. Scrutiny should report the invalid row
- */
- @Test
- public void testCoveredValueIncorrect() throws Exception {
- // insert one valid row
- upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
- conn.commit();
-
- // disable index and insert another data row
- disableIndex();
- upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
- conn.commit();
-
- // insert a bad index row for the above data row
- upsertIndexRow("name-2", 2, 9999);
- conn.commit();
-
- // scrutiny should report the bad row
- List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
- Job job = completedJobs.get(0);
- assertTrue(job.isSuccessful());
- Counters counters = job.getCounters();
- assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
- assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
- assertEquals(1, getCounterValue(counters, BAD_COVERED_COL_VAL_COUNT));
- }
+ /**
+ * Tests an index where the index pk is correct (indexed col values are indexed correctly), but
+ * a covered index value is incorrect. Scrutiny should report the invalid row
+ */
+ @Test public void testCoveredValueIncorrect() throws Exception {
+ // insert one valid row
+ upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
+ conn.commit();
+
+ // disable index and insert another data row
+ disableIndex();
+ upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
+ conn.commit();
+
+ // insert a bad index row for the above data row
+ upsertIndexRow("name-2", 2, 9999);
+ conn.commit();
+
+ // scrutiny should report the bad row
+ List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
+ Job job = completedJobs.get(0);
+ assertTrue(job.isSuccessful());
+ Counters counters = job.getCounters();
+ assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+ assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
+ assertEquals(1, getCounterValue(counters, BAD_COVERED_COL_VAL_COUNT));
+ }
- /**
- * Test batching of row comparisons Inserts 1001 rows, with some random bad rows, and runs
- * scrutiny with batchsize of 10,
- */
- @Test
- public void testBatching() throws Exception {
- // insert 1001 data and index rows
- int numTestRows = 1001;
- for (int i = 0; i < numTestRows; i++) {
- upsertRow(dataTableUpsertStmt, i, "name-" + i, i + 1000);
- }
- conn.commit();
-
- disableIndex();
-
- // randomly delete some rows from the index
- Random random = new Random();
- for (int i = 0; i < 100; i++) {
- int idToDelete = random.nextInt(numTestRows);
- deleteRow(indexTableFullName, "WHERE \":ID\"=" + idToDelete);
- }
- conn.commit();
- int numRows = countRows(indexTableFullName);
- int numDeleted = numTestRows - numRows;
-
- // run scrutiny with batch size of 10
- List<Job> completedJobs =
- runScrutiny(schemaName, dataTableName, indexTableName, 10L);
- Job job = completedJobs.get(0);
- assertTrue(job.isSuccessful());
- Counters counters = job.getCounters();
- assertEquals(numTestRows - numDeleted, getCounterValue(counters, VALID_ROW_COUNT));
- assertEquals(numDeleted, getCounterValue(counters, INVALID_ROW_COUNT));
- assertEquals(numTestRows / 10 + numTestRows % 10,
- getCounterValue(counters, BATCHES_PROCESSED_COUNT));
- }
+ /**
+ * Test batching of row comparisons Inserts 1001 rows, with some random bad rows, and runs
+ * scrutiny with batchsize of 10,
+ */
+ @Test public void testBatching() throws Exception {
+ // insert 1001 data and index rows
+ int numTestRows = 1001;
+ for (int i = 0; i < numTestRows; i++) {
+ upsertRow(dataTableUpsertStmt, i, "name-" + i, i + 1000);
+ }
+ conn.commit();
- /**
- * Tests when there are more data table rows than index table rows Scrutiny should report the
- * number of incorrect rows
- */
- @Test
- public void testMoreDataRows() throws Exception {
- upsertRow(dataTableUpsertStmt, 1, "name-1", 95123);
- conn.commit();
- disableIndex();
- // these rows won't have a corresponding index row
- upsertRow(dataTableUpsertStmt, 2, "name-2", 95124);
- upsertRow(dataTableUpsertStmt, 3, "name-3", 95125);
- conn.commit();
-
- List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
- Job job = completedJobs.get(0);
- assertTrue(job.isSuccessful());
- Counters counters = job.getCounters();
- assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
- assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT));
- }
+ disableIndex();
- /**
- * Tests when there are more index table rows than data table rows Scrutiny should report the
- * number of incorrect rows when run with the index as the source table
- */
- @Test
- public void testMoreIndexRows() throws Exception {
- upsertRow(dataTableUpsertStmt, 1, "name-1", 95123);
- conn.commit();
- disableIndex();
- // these index rows won't have a corresponding data row
- upsertIndexRow("name-2", 2, 95124);
- upsertIndexRow("name-3", 3, 95125);
- conn.commit();
-
- List<Job> completedJobs =
- runScrutiny(schemaName, dataTableName, indexTableName, 10L,
- SourceTable.INDEX_TABLE_SOURCE);
- Job job = completedJobs.get(0);
- assertTrue(job.isSuccessful());
- Counters counters = job.getCounters();
- assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
- assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT));
- }
+ // randomly delete some rows from the index
+ Random random = new Random();
+ for (int i = 0; i < 100; i++) {
+ int idToDelete = random.nextInt(numTestRows);
+ deleteRow(indexTableFullName, "WHERE \":ID\"=" + idToDelete);
+ }
+ conn.commit();
+ int numRows = countRows(indexTableFullName);
+ int numDeleted = numTestRows - numRows;
+
+ // run scrutiny with batch size of 10
+ List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName, 10L);
+ Job job = completedJobs.get(0);
+ assertTrue(job.isSuccessful());
+ Counters counters = job.getCounters();
+ assertEquals(numTestRows - numDeleted, getCounterValue(counters, VALID_ROW_COUNT));
+ assertEquals(numDeleted, getCounterValue(counters, INVALID_ROW_COUNT));
+ assertEquals(numTestRows / 10 + numTestRows % 10,
+ getCounterValue(counters, BATCHES_PROCESSED_COUNT));
+ }
- /**
- * Tests running with both the index and data tables as the source table If we have an
- * incorrectly indexed row, it should be reported in each direction
- */
- @Test
- public void testBothDataAndIndexAsSource() throws Exception {
- // insert one valid row
- upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
- conn.commit();
-
- // disable the index and insert another row which is not indexed
- disableIndex();
- upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
- conn.commit();
-
- // insert a bad row into the index
- upsertIndexRow("badName", 2, 9999);
- conn.commit();
-
- List<Job> completedJobs =
- runScrutiny(schemaName, dataTableName, indexTableName, 10L,
- SourceTable.BOTH);
- assertEquals(2, completedJobs.size());
- for (Job job : completedJobs) {
+ /**
+ * Tests when there are more data table rows than index table rows Scrutiny should report the
+ * number of incorrect rows
+ */
+ @Test public void testMoreDataRows() throws Exception {
+ upsertRow(dataTableUpsertStmt, 1, "name-1", 95123);
+ conn.commit();
+ disableIndex();
+ // these rows won't have a corresponding index row
+ upsertRow(dataTableUpsertStmt, 2, "name-2", 95124);
+ upsertRow(dataTableUpsertStmt, 3, "name-3", 95125);
+ conn.commit();
+
+ List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
+ Job job = completedJobs.get(0);
assertTrue(job.isSuccessful());
Counters counters = job.getCounters();
assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
- assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
+ assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT));
+ }
+
+ /**
+ * Tests when there are more index table rows than data table rows Scrutiny should report the
+ * number of incorrect rows when run with the index as the source table
+ */
+ @Test public void testMoreIndexRows() throws Exception {
+ upsertRow(dataTableUpsertStmt, 1, "name-1", 95123);
+ conn.commit();
+ disableIndex();
+ // these index rows won't have a corresponding data row
+ upsertIndexRow("name-2", 2, 95124);
+ upsertIndexRow("name-3", 3, 95125);
+ conn.commit();
+
+ List<Job>
+ completedJobs =
+ runScrutiny(schemaName, dataTableName, indexTableName, 10L, SourceTable.INDEX_TABLE_SOURCE);
+ Job job = completedJobs.get(0);
+ assertTrue(job.isSuccessful());
+ Counters counters = job.getCounters();
+ assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+ assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT));
+ }
+
+ /**
+ * Tests running with both the index and data tables as the source table If we have an
+ * incorrectly indexed row, it should be reported in each direction
+ */
+ @Test public void testBothDataAndIndexAsSource() throws Exception {
+ // insert one valid row
+ upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
+ conn.commit();
+
+ // disable the index and insert another row which is not indexed
+ disableIndex();
+ upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
+ conn.commit();
+
+ // insert a bad row into the index
+ upsertIndexRow("badName", 2, 9999);
+ conn.commit();
+
+ List<Job>
+ completedJobs =
+ runScrutiny(schemaName, dataTableName, indexTableName, 10L, SourceTable.BOTH);
+ assertEquals(2, completedJobs.size());
+ for (Job job : completedJobs) {
+ assertTrue(job.isSuccessful());
+ Counters counters = job.getCounters();
+ assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+ assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
+ }
}
- }
- /**
- * Tests that with the output to file option set, the scrutiny tool outputs invalid rows to file
- */
- @Test
- public void testOutputInvalidRowsToFile() throws Exception {
- insertOneValid_OneBadVal_OneMissingTarget();
-
- String[] argValues =
- getArgValues(schemaName, dataTableName, indexTableName, 10L,
- SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.FILE, null);
- runScrutiny(argValues);
-
- // check the output files
- Path outputPath = CsvBulkImportUtil.getOutputPath(new Path(outputDir), dataTableFullName);
- DistributedFileSystem fs = getUtility().getDFSCluster().getFileSystem();
- List<Path> paths = Lists.newArrayList();
- Path firstPart = null;
- for (FileStatus outputFile : fs.listStatus(outputPath)) {
- if (outputFile.getPath().getName().startsWith("part")) {
- if (firstPart == null) {
- firstPart = outputFile.getPath();
- } else {
- paths.add(outputFile.getPath());
+ /**
+ * Tests that with the output to file option set, the scrutiny tool outputs invalid rows to file
+ */
+ @Test public void testOutputInvalidRowsToFile() throws Exception {
+ insertOneValid_OneBadVal_OneMissingTarget();
+
+ String[]
+ argValues =
+ getArgValues(schemaName, dataTableName, indexTableName, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.FILE, null);
+ runScrutiny(argValues);
+
+ // check the output files
+ Path outputPath = CsvBulkImportUtil.getOutputPath(new Path(outputDir), dataTableFullName);
+ DistributedFileSystem fs = getUtility().getDFSCluster().getFileSystem();
+ List<Path> paths = Lists.newArrayList();
+ Path firstPart = null;
+ for (FileStatus outputFile : fs.listStatus(outputPath)) {
+ if (outputFile.getPath().getName().startsWith("part")) {
+ if (firstPart == null) {
+ firstPart = outputFile.getPath();
+ } else {
+ paths.add(outputFile.getPath());
+ }
}
}
+ if (dataTableDdl.contains("SALT_BUCKETS")) {
+ fs.concat(firstPart, paths.toArray(new Path[0]));
+ }
+ Path outputFilePath = firstPart;
+ assertTrue(fs.exists(outputFilePath));
+ FSDataInputStream fsDataInputStream = fs.open(outputFilePath);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(fsDataInputStream));
+ TreeSet<String> lines = Sets.newTreeSet();
+ try {
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ lines.add(line);
+ }
+ } finally {
+ IOUtils.closeQuietly(reader);
+ IOUtils.closeQuietly(fsDataInputStream);
+ }
+ Iterator<String> lineIterator = lines.iterator();
+ assertEquals(
+ "[2, name-2, " + new Timestamp(testTime).toString() + ", 95123]\t[2, name-2, " + new Timestamp(testTime).toString() + ", 9999]", lineIterator.next());
+ assertEquals("[3, name-3, " + new Timestamp(testTime).toString() + ", 95123]\tTarget row not found",
+ lineIterator.next());
+
}
- if (dataTableDdl.contains("SALT_BUCKETS")) {
- fs.concat(firstPart, paths.toArray(new Path[0]));
- }
- Path outputFilePath = firstPart;
- assertTrue(fs.exists(outputFilePath));
- FSDataInputStream fsDataInputStream = fs.open(outputFilePath);
- BufferedReader reader = new BufferedReader(new InputStreamReader(fsDataInputStream));
- TreeSet<String> lines = Sets.newTreeSet();
- try {
- String line = null;
- while ((line = reader.readLine()) != null) {
- lines.add(line);
+
+ /**
+ * Tests writing of results to the output table
+ */
+ @Test public void testOutputInvalidRowsToTable() throws Exception {
+ insertOneValid_OneBadVal_OneMissingTarget();
+ String[]
+ argValues =
+ getArgValues(schemaName, dataTableName, indexTableName, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, null);
+ List<Job> completedJobs = runScrutiny(argValues);
+
+ // check that the output table contains the invalid rows
+ long
+ scrutinyTimeMillis =
+ PhoenixConfigurationUtil.getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration());
+ String
+ invalidRowsQuery =
+ IndexScrutinyTableOutput
+ .getSqlQueryAllInvalidRows(conn, getColNames(), scrutinyTimeMillis);
+ ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery + " ORDER BY ID asc");
+ assertTrue(rs.next());
+ assertEquals(dataTableFullName, rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME));
+ assertEquals(indexTableFullName, rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME));
+ assertTrue(rs.getBoolean("HAS_TARGET_ROW"));
+ assertEquals(2, rs.getInt("ID"));
+ assertEquals(2, rs.getInt(":ID"));
+ assertEquals(95123, rs.getInt("ZIP"));
+ assertEquals(9999, rs.getInt("0:ZIP")); // covered col zip incorrect
+ assertTrue(rs.next());
+ assertEquals(dataTableFullName, rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME));
+ assertEquals(indexTableFullName, rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME));
+ assertFalse(rs.getBoolean("HAS_TARGET_ROW"));
+ assertEquals(3, rs.getInt("ID"));
+ assertEquals(null, rs.getObject(":ID")); // null for missing target row
+ assertFalse(rs.next());
+
+ // check that the job results were written correctly to the metadata table
+ assertMetadataTableValues(argValues, scrutinyTimeMillis, invalidRowsQuery);
+ }
+
+ /**
+ * Tests that the config for max number of output rows is observed
+ */
+ @Test public void testMaxOutputRows() throws Exception {
+ insertOneValid_OneBadVal_OneMissingTarget();
+ // set max to 1. There are two bad rows, but only 1 should get written to output table
+ String[]
+ argValues =
+ getArgValues(schemaName, dataTableName, indexTableName, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, new Long(1));
+ List<Job> completedJobs = runScrutiny(argValues);
+ long
+ scrutinyTimeMillis =
+ PhoenixConfigurationUtil.getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration());
+ String
+ invalidRowsQuery =
+ IndexScrutinyTableOutput
+ .getSqlQueryAllInvalidRows(conn, getColNames(), scrutinyTimeMillis);
+ ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery);
+ assertTrue(rs.next());
+ if (dataTableDdl.contains("SALT_BUCKETS")) {
+ assertTrue(rs.next());
+ assertFalse(rs.next());
+ } else {
+ assertFalse(rs.next());
}
- } finally {
- IOUtils.closeQuietly(reader);
- IOUtils.closeQuietly(fsDataInputStream);
}
- Iterator<String> lineIterator = lines.iterator();
- assertEquals(
- "[2, name-2, " + new Timestamp(testTime).toString() + ", 95123]\t[2, name-2, " + new Timestamp(testTime)
- .toString() + ", 9999]", lineIterator.next());
- assertEquals("[3, name-3, " + new Timestamp(testTime).toString() + ", 95123]\tTarget row not found",
- lineIterator.next());
- }
+ private SourceTargetColumnNames getColNames() throws SQLException {
+ PTable pdataTable = PhoenixRuntime.getTable(conn, dataTableFullName);
+ PTable pindexTable = PhoenixRuntime.getTable(conn, indexTableFullName);
+ SourceTargetColumnNames
+ columnNames =
+ new SourceTargetColumnNames.DataSourceColNames(pdataTable, pindexTable);
+ return columnNames;
+ }
- /**
- * Tests writing of results to the output table
- */
- @Test
- public void testOutputInvalidRowsToTable() throws Exception {
- insertOneValid_OneBadVal_OneMissingTarget();
- String[] argValues =
- getArgValues(schemaName, dataTableName, indexTableName, 10L,
- SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, null);
- List<Job> completedJobs = runScrutiny(argValues);
-
- // check that the output table contains the invalid rows
- long scrutinyTimeMillis =
- PhoenixConfigurationUtil
- .getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration());
- String invalidRowsQuery =
- IndexScrutinyTableOutput.getSqlQueryAllInvalidRows(conn, getColNames(),
- scrutinyTimeMillis);
- ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery + " ORDER BY ID asc");
- assertTrue(rs.next());
- assertEquals(dataTableFullName,
- rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME));
- assertEquals(indexTableFullName,
- rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME));
- assertTrue(rs.getBoolean("HAS_TARGET_ROW"));
- assertEquals(2, rs.getInt("ID"));
- assertEquals(2, rs.getInt(":ID"));
- assertEquals(95123, rs.getInt("ZIP"));
- assertEquals(9999, rs.getInt("0:ZIP")); // covered col zip incorrect
- assertTrue(rs.next());
- assertEquals(dataTableFullName,
- rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME));
- assertEquals(indexTableFullName,
- rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME));
- assertFalse(rs.getBoolean("HAS_TARGET_ROW"));
- assertEquals(3, rs.getInt("ID"));
- assertEquals(null, rs.getObject(":ID")); // null for missing target row
- assertFalse(rs.next());
-
- // check that the job results were written correctly to the metadata table
- assertMetadataTableValues(argValues, scrutinyTimeMillis, invalidRowsQuery);
- }
+ // inserts one valid data/index row, one data row with a missing index row,
+ // and one data row with an index row that has a bad covered col val
+ private void insertOneValid_OneBadVal_OneMissingTarget() throws SQLException {
+ // insert one valid row
+ upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
+ conn.commit();
+
+ // disable the index and insert another row which is not indexed
+ disableIndex();
+ upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
+ upsertRow(dataTableUpsertStmt, 3, "name-3", 95123);
+ conn.commit();
+
+ // insert a bad index row for one of the above data rows
+ upsertIndexRow("name-2", 2, 9999);
+ conn.commit();
+ }
- /**
- * Tests that the config for max number of output rows is observed
- */
- @Test
- public void testMaxOutputRows() throws Exception {
- insertOneValid_OneBadVal_OneMissingTarget();
- // set max to 1. There are two bad rows, but only 1 should get written to output table
- String[] argValues =
- getArgValues(schemaName, dataTableName, indexTableName, 10L,
- SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, new Long(1));
- List<Job> completedJobs = runScrutiny(argValues);
- long scrutinyTimeMillis =
- PhoenixConfigurationUtil
- .getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration());
- String invalidRowsQuery =
- IndexScrutinyTableOutput.getSqlQueryAllInvalidRows(conn, getColNames(),
- scrutinyTimeMillis);
- ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery);
- assertTrue(rs.next());
- if (dataTableDdl.contains("SALT_BUCKETS")) {
+ private void assertMetadataTableValues(String[] argValues, long scrutinyTimeMillis, String invalidRowsQuery) throws SQLException {
+ ResultSet rs;
+ ResultSet
+ metadataRs =
+ IndexScrutinyTableOutput
+ .queryAllMetadata(conn, dataTableFullName, indexTableFullName,
+ scrutinyTimeMillis);
+ assertTrue(metadataRs.next());
+ List<? extends Object>
+ expected =
+ Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis,
+ SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L,
+ 2L, 1L, 1L,
+ "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
+ "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]",
+ invalidRowsQuery);
+ if (dataTableDdl.contains("SALT_BUCKETS")) {
+ expected =
+ Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis,
+ SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L,
+ 2L, 1L, 2L,
+ "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
+ "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]",
+ invalidRowsQuery);
+ }
+
+ assertRsValues(metadataRs, expected);
+ String missingTargetQuery = metadataRs.getString("INVALID_ROWS_QUERY_MISSING_TARGET");
+ rs = conn.createStatement().executeQuery(missingTargetQuery);
assertTrue(rs.next());
+ assertEquals(3, rs.getInt("ID"));
assertFalse(rs.next());
- } else {
+ String badCoveredColQuery = metadataRs.getString("INVALID_ROWS_QUERY_BAD_COVERED_COL_VAL");
+ rs = conn.createStatement().executeQuery(badCoveredColQuery);
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt("ID"));
assertFalse(rs.next());
}
- }
- private SourceTargetColumnNames getColNames() throws SQLException {
- PTable pdataTable = PhoenixRuntime.getTable(conn, dataTableFullName);
- PTable pindexTable = PhoenixRuntime.getTable(conn, indexTableFullName);
- SourceTargetColumnNames columnNames =
- new SourceTargetColumnNames.DataSourceColNames(pdataTable, pindexTable);
- return columnNames;
- }
+ // assert the result set contains the expected values in the given order
+ private void assertRsValues(ResultSet rs, List<? extends Object> expected)
+ throws SQLException {
+ for (int i = 0; i < expected.size(); i++) {
+ assertEquals(expected.get(i), rs.getObject(i + 1));
+ }
+ }
- // inserts one valid data/index row, one data row with a missing index row,
- // and one data row with an index row that has a bad covered col val
- private void insertOneValid_OneBadVal_OneMissingTarget() throws SQLException {
- // insert one valid row
- upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
- conn.commit();
-
- // disable the index and insert another row which is not indexed
- disableIndex();
- upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
- upsertRow(dataTableUpsertStmt, 3, "name-3", 95123);
- conn.commit();
-
- // insert a bad index row for one of the above data rows
- upsertIndexRow("name-2", 2, 9999);
- conn.commit();
- }
+ private void generateUniqueTableNames() {
+ schemaName = generateUniqueName();
+ dataTableName = generateUniqueName();
+ dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ indexTableName = generateUniqueName();
+ indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+ }
- private void assertMetadataTableValues(String[] argValues, long scrutinyTimeMillis,
- String invalidRowsQuery) throws SQLException {
- ResultSet rs;
- ResultSet metadataRs =
- IndexScrutinyTableOutput.queryAllMetadata(conn, dataTableFullName,
- indexTableFullName, scrutinyTimeMillis);
- assertTrue(metadataRs.next());
- List<? extends Object> expected =
- Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis,
- SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L,
- 2L, 1L, 1L, "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
- "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]", invalidRowsQuery);
- if (dataTableDdl.contains("SALT_BUCKETS")) {
- expected = Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis,
- SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L,
- 2L, 1L, 2L, "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
- "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]", invalidRowsQuery);
- }
-
- assertRsValues(metadataRs, expected);
- String missingTargetQuery = metadataRs.getString("INVALID_ROWS_QUERY_MISSING_TARGET");
- rs = conn.createStatement().executeQuery(missingTargetQuery);
- assertTrue(rs.next());
- assertEquals(3, rs.getInt("ID"));
- assertFalse(rs.next());
- String badCoveredColQuery = metadataRs.getString("INVALID_ROWS_QUERY_BAD_COVERED_COL_VAL");
- rs = conn.createStatement().executeQuery(badCoveredColQuery);
- assertTrue(rs.next());
- assertEquals(2, rs.getInt("ID"));
- assertFalse(rs.next());
- }
+ private int countRows(String tableFullName) throws SQLException {
+ ResultSet count = conn.createStatement().executeQuery("select count(*) from " + tableFullName);
+ count.next();
+ int numRows = count.getInt(1);
+ return numRows;
+ }
- // assert the result set contains the expected values in the given order
- private void assertRsValues(ResultSet rs, List<? extends Object> expected) throws SQLException {
- for (int i = 0; i < expected.size(); i++) {
- assertEquals(expected.get(i), rs.getObject(i + 1));
+ private void upsertIndexRow(String name, int id, int zip) throws SQLException {
+ indexTableUpsertStmt.setString(1, name);
+ indexTableUpsertStmt.setInt(2, id); // id
+ indexTableUpsertStmt.setInt(3, zip); // bad zip
+ indexTableUpsertStmt.setTimestamp(4, new Timestamp(testTime));
+ indexTableUpsertStmt.executeUpdate();
}
- }
- private void generateUniqueTableNames() {
- schemaName = generateUniqueName();
- dataTableName = generateUniqueName();
- dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
- indexTableName = generateUniqueName();
- indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
- }
+ private void disableIndex() throws SQLException {
+ conn.createStatement().execute(
+ String.format("ALTER INDEX %s ON %S disable", indexTableName, dataTableFullName));
+ conn.commit();
+ }
- private int countRows(String tableFullName) throws SQLException {
- ResultSet count = conn.createStatement().executeQuery("select count(*) from " + tableFullName);
- count.next();
- int numRows = count.getInt(1);
- return numRows;
- }
+ private String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize,
+ SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat, Long maxOutputRows) {
+ return getArgValues(schemaName, dataTable, indxTable, batchSize, sourceTable,
+ outputInvalidRows, outputFormat, maxOutputRows, null, Long.MAX_VALUE);
+ }
- private void upsertIndexRow(String name, int id, int zip) throws SQLException {
- indexTableUpsertStmt.setString(1, name);
- indexTableUpsertStmt.setInt(2, id); // id
- indexTableUpsertStmt.setInt(3, zip); // bad zip
- indexTableUpsertStmt.setTimestamp(4, new Timestamp(testTime));
- indexTableUpsertStmt.executeUpdate();
- }
+ private List<Job> runScrutinyCurrentSCN(String schemaName, String dataTableName, String indexTableName, Long scrutinyTS) throws Exception {
+ return runScrutiny(
+ getArgValues(schemaName, dataTableName, indexTableName, null, SourceTable.BOTH,
+ false, null, null, null, scrutinyTS));
+ }
- private void disableIndex() throws SQLException {
- conn.createStatement().execute(
- String.format("ALTER INDEX %s ON %S disable", indexTableName, dataTableFullName));
- conn.commit();
- }
+ private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName) throws Exception {
+ return runScrutiny(schemaName, dataTableName, indexTableName, null, null);
+ }
- private long getCounterValue(Counters counters, Enum<PhoenixScrutinyJobCounters> counter) {
- return counters.findCounter(counter).getValue();
- }
+ private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName,
+ Long batchSize) throws Exception {
+ return runScrutiny(schemaName, dataTableName, indexTableName, batchSize, null);
+ }
- private String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize,
- SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat,
- Long maxOutputRows) {
- return getArgValues(schemaName, dataTable, indxTable, batchSize, sourceTable,
- outputInvalidRows, outputFormat, maxOutputRows, Long.MAX_VALUE);
- }
+ private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName,
+ Long batchSize, SourceTable sourceTable) throws Exception {
+ final String[]
+ cmdArgs =
+ getArgValues(schemaName, dataTableName, indexTableName, batchSize, sourceTable,
+ false, null, null, null, Long.MAX_VALUE);
+ return runScrutiny(cmdArgs);
+ }
- private String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize,
- SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat,
- Long maxOutputRows, Long scrutinyTs) {
- final List<String> args = Lists.newArrayList();
- if (schemaName != null) {
- args.add("-s");
- args.add(schemaName);
- }
- args.add("-dt");
- args.add(dataTable);
- args.add("-it");
- args.add(indxTable);
-
- // TODO test snapshot reads
- // if(useSnapshot) {
- // args.add("-snap");
- // }
-
- if (OutputFormat.FILE.equals(outputFormat)) {
- args.add("-op");
- outputDir = "/tmp/" + UUID.randomUUID().toString();
- args.add(outputDir);
- }
- args.add("-t");
- args.add(String.valueOf(scrutinyTs));
- args.add("-run-foreground");
- if (batchSize != null) {
- args.add("-b");
- args.add(String.valueOf(batchSize));
- }
-
- // default to using data table as the source table
- args.add("-src");
- if (sourceTable == null) {
- args.add(SourceTable.DATA_TABLE_SOURCE.name());
- } else {
- args.add(sourceTable.name());
- }
- if (outputInvalidRows) {
- args.add("-o");
- }
- if (outputFormat != null) {
- args.add("-of");
- args.add(outputFormat.name());
- }
- if (maxOutputRows != null) {
- args.add("-om");
- args.add(maxOutputRows.toString());
- }
- return args.toArray(new String[0]);
- }
+ private void upsertRow(PreparedStatement stmt, int id, String name, int zip)
+ throws SQLException {
+ int index = 1;
+ // insert row
+ stmt.setInt(index++, id);
+ stmt.setString(index++, name);
+ stmt.setInt(index++, zip);
+ stmt.setTimestamp(index++, new Timestamp(testTime));
+ stmt.executeUpdate();
+ }
- private List<Job> runScrutinyCurrentSCN(String schemaName, String dataTableName, String indexTableName, Long scrutinyTS) throws Exception {
- return runScrutiny(getArgValues(schemaName, dataTableName, indexTableName, null, SourceTable.BOTH, false, null, null, scrutinyTS));
+ private int deleteRow(String fullTableName, String whereCondition) throws SQLException {
+ String deleteSql = String.format(DELETE_SQL, indexTableFullName) + whereCondition;
+ PreparedStatement deleteStmt = conn.prepareStatement(deleteSql);
+ return deleteStmt.executeUpdate();
+ }
}
- private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName) throws Exception {
- return runScrutiny(schemaName, dataTableName, indexTableName, null, null);
- }
+ public static class IndexScrutinyToolTenantIT extends SharedIndexToolIT {
+ private Connection connGlobal = null;
+ private Connection connTenant = null;
+
+ private String tenantId;
+ private String tenantViewName;
+ private String indexNameTenant;
+ private String multiTenantTable;
+ private String viewIndexTableName;
+
+ private final String createViewStr = "CREATE VIEW %s AS SELECT * FROM %s";
+ private final String
+ upsertQueryStr =
+ "UPSERT INTO %s (COL1, ID, NAME) VALUES('%s' , %d, '%s')";
+ private final String createIndexStr = "CREATE INDEX %s ON %s (NAME) ";
+
+ /**
+ * Create the test data
+ */
+ @Before public void setup() throws SQLException {
+ tenantId = generateUniqueName();
+ tenantViewName = generateUniqueName();
+ indexNameTenant = generateUniqueName();
+ multiTenantTable = generateUniqueName();
+ viewIndexTableName = "_IDX_" + multiTenantTable;
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ connGlobal = DriverManager.getConnection(getUrl(), props);
+
+ props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+ connTenant = DriverManager.getConnection(getUrl(), props);
+ String
+ createTblStr =
+ "CREATE TABLE %s (COL1 VARCHAR(15) NOT NULL,ID INTEGER NOT NULL" + ", NAME VARCHAR, CONSTRAINT PK_1 PRIMARY KEY (COL1, ID)) MULTI_TENANT=true";
+
+ createTestTable(getUrl(), String.format(createTblStr, multiTenantTable));
+
+ connTenant.createStatement().execute(
+ String.format(createViewStr, tenantViewName, multiTenantTable));
+
+ String idxStmtTenant = String.format(createIndexStr, indexNameTenant, tenantViewName);
+ connTenant.createStatement().execute(idxStmtTenant);
+ }
- private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName,
- Long batchSize) throws Exception {
- return runScrutiny(schemaName, dataTableName, indexTableName, batchSize, null);
- }
+ @After public void teardown() throws SQLException {
+ if (connGlobal != null) {
+ connGlobal.close();
+ }
+ if (connTenant != null) {
+ connTenant.close();
+ }
+ }
- private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName,
- Long batchSize, SourceTable sourceTable) throws Exception {
- final String[] cmdArgs =
- getArgValues(schemaName, dataTableName, indexTableName, batchSize, sourceTable, false,
- null, null, Long.MAX_VALUE);
- return runScrutiny(cmdArgs);
- }
+ /**
+ * Tests that the config for max number of output rows is observed
+ */
+ @Test public void testTenantViewAndIndexEqual() throws Exception {
+ connTenant.createStatement()
+ .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 1, "x"));
+ connTenant.commit();
+
+ String[] argValues =
+ getArgValues("", tenantViewName, indexNameTenant, 10L,
+ SourceTable.INDEX_TABLE_SOURCE, false, null, null, tenantId,
+ EnvironmentEdgeManager.currentTimeMillis());
+
+ List<Job> completedJobs = runScrutiny(argValues);
+ // Sunny case, both index and view are equal. 1 row
+ assertEquals(1, completedJobs.size());
+ for (Job job : completedJobs) {
+ assertTrue(job.isSuccessful());
+ Counters counters = job.getCounters();
+ assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+ assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
+ }
+ }
- private List<Job> runScrutiny(String[] cmdArgs) throws Exception {
- IndexScrutinyTool scrutiny = new IndexScrutinyTool();
- Configuration conf = new Configuration(getUtility().getConfiguration());
- scrutiny.setConf(conf);
- int status = scrutiny.run(cmdArgs);
- assertEquals(0, status);
- for (Job job : scrutiny.getJobs()) {
- assertTrue(job.waitForCompletion(true));
+ /**
+ * Tests global view on multi-tenant table should work too
+ **/
+ @Test public void testGlobalViewOnMultiTenantTable() throws Exception {
+ String globalViewName = generateUniqueName();
+ String indexNameGlobal = generateUniqueName();
+
+ connGlobal.createStatement().execute(
+ String.format(createViewStr, globalViewName, multiTenantTable));
+
+ String idxStmtGlobal = String.format(createIndexStr, indexNameGlobal, globalViewName);
+ connGlobal.createStatement().execute(idxStmtGlobal);
+ connGlobal.createStatement()
+ .execute(String.format(upsertQueryStr, globalViewName, "global", 5, "x"));
+ connGlobal.commit();
+ String[] argValues =
+ getArgValues("", globalViewName, indexNameGlobal, 10L,
+ SourceTable.INDEX_TABLE_SOURCE, false, null, null, null,
+ EnvironmentEdgeManager.currentTimeMillis());
+ List<Job> completedJobs = runScrutiny(argValues);
+ // Sunny case, both index and view are equal. 1 row
+ assertEquals(1, completedJobs.size());
+ for (Job job : completedJobs) {
+ assertTrue(job.isSuccessful());
+ Counters counters = job.getCounters();
+ assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+ assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
+ }
}
- return scrutiny.getJobs();
- }
- private void upsertRow(PreparedStatement stmt, int id, String name, int zip)
- throws SQLException {
- int index = 1;
- // insert row
- stmt.setInt(index++, id);
- stmt.setString(index++, name);
- stmt.setInt(index++, zip);
- stmt.setTimestamp(index++, new Timestamp(testTime));
- stmt.executeUpdate();
- }
+ /**
+ * Use Both as source. Add 1 row to tenant view and disable index.
+ * Add 1 more to view and add a wrong row to index.
+ * Both have 1 invalid row, 1 valid row.
+ **/
+ @Test
+ public void testOneValidOneInvalidUsingBothAsSource() throws Exception {
+ connTenant.createStatement()
+ .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 1, "x"));
+ connTenant.commit();
+ connTenant.createStatement().execute(
+ String.format("ALTER INDEX %s ON %S disable", indexNameTenant, tenantViewName));
+
+ connTenant.createStatement()
+ .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 2, "x2"));
+
+ connTenant.createStatement().execute(String.format("UPSERT INTO %s (\":ID\", \"0:NAME\") values (%d, '%s')",
+ indexNameTenant, 5555, "wrongName"));
+ connTenant.commit();
+
+ String[]
+ argValues =
+ getArgValues("", tenantViewName, indexNameTenant, 10L, SourceTable.BOTH, false,
+ null, null, tenantId, EnvironmentEdgeManager.currentTimeMillis());
+ List<Job> completedJobs = runScrutiny(argValues);
+
+ assertEquals(2, completedJobs.size());
+ for (Job job : completedJobs) {
+ assertTrue(job.isSuccessful());
+ Counters counters = job.getCounters();
+ assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+ assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
+ }
+ }
- private int deleteRow(String fullTableName, String whereCondition) throws SQLException {
- String deleteSql = String.format(DELETE_SQL, indexTableFullName) + whereCondition;
- PreparedStatement deleteStmt = conn.prepareStatement(deleteSql);
- return deleteStmt.executeUpdate();
+ /**
+ * Add 3 rows to Tenant view.
+ * Empty index table and observe they are not equal.
+ * Use data table as source and output to file.
+ * Output to table doesn't work for tenantid connection because it can't create the scrutiny table as tenant.
+ **/
+ @Test public void testWithEmptyIndexTableOutputToFile() throws Exception{
+ connTenant.createStatement()
+ .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 1, "x"));
+ connTenant.createStatement()
+ .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 2, "x2"));
+ connTenant.createStatement()
+ .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 3, "x3"));
+ connTenant.createStatement().execute(String.format("UPSERT INTO %s (\":ID\", \"0:NAME\") values (%d, '%s')",
+ indexNameTenant, 5555, "wrongName"));
+ connTenant.commit();
+
+ ConnectionQueryServices queryServices = connGlobal.unwrap(PhoenixConnection.class).getQueryServices();
+ Admin admin = queryServices.getAdmin();
+ TableName tableName = TableName.valueOf(viewIndexTableName);
+ admin.disableTable(tableName);
+ admin.truncateTable(tableName, false);
+
+ String[]
+ argValues =
+ getArgValues("", tenantViewName, indexNameTenant, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.FILE, null,
+ tenantId, EnvironmentEdgeManager.currentTimeMillis());
+ List<Job> completedJobs = runScrutiny(argValues);
+
+ assertEquals(1, completedJobs.size());
+ for (Job job : completedJobs) {
+ assertTrue(job.isSuccessful());
+ Counters counters = job.getCounters();
+ assertEquals(0, getCounterValue(counters, VALID_ROW_COUNT));
+ assertEquals(3, getCounterValue(counters, INVALID_ROW_COUNT));
+ }
+ }
}
-
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
index d9a14bf..637ffb5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
@@ -19,11 +19,10 @@ package org.apache.phoenix.mapreduce.index;
import java.io.IOException;
import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
+import com.google.common.base.Strings;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
@@ -111,6 +110,8 @@ public class IndexScrutinyTool extends Configured implements Tool {
private static final Option OUTPUT_PATH_OPTION =
new Option("op", "output-path", true, "Output path where the files are written");
private static final Option OUTPUT_MAX = new Option("om", "output-max", true, "Max number of invalid rows to output per mapper. Defaults to 1M");
+ private static final Option TENANT_ID_OPTION = new Option("tenant", "tenant-id", true,
+ "If specified, uses Tenant connection for tenant view index scrutiny (optional)");
public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_SCRUTINY_[%s]_[%s]";
/**
@@ -145,6 +146,7 @@ public class IndexScrutinyTool extends Configured implements Tool {
options.addOption(TIMESTAMP);
options.addOption(BATCH_SIZE_OPTION);
options.addOption(SOURCE_TABLE_OPTION);
+ options.addOption(TENANT_ID_OPTION);
return options;
}
@@ -202,10 +204,11 @@ public class IndexScrutinyTool extends Configured implements Tool {
private String basePath;
private long scrutinyExecuteTime;
private long outputMaxRows; // per mapper
+ private String tenantId;
public JobFactory(Connection connection, Configuration configuration, long batchSize,
boolean useSnapshot, long ts, boolean outputInvalidRows, OutputFormat outputFormat,
- String basePath, long outputMaxRows) {
+ String basePath, long outputMaxRows, String tenantId) {
this.outputInvalidRows = outputInvalidRows;
this.outputFormat = outputFormat;
this.basePath = basePath;
@@ -214,12 +217,16 @@ public class IndexScrutinyTool extends Configured implements Tool {
this.connection = connection;
this.configuration = configuration;
this.useSnapshot = useSnapshot;
+ this.tenantId = tenantId;
this.ts = ts; // CURRENT_SCN to set
scrutinyExecuteTime = EnvironmentEdgeManager.currentTimeMillis(); // time at which scrutiny was run.
// Same for
// all jobs created from this factory
PhoenixConfigurationUtil.setScrutinyExecuteTimestamp(configuration,
scrutinyExecuteTime);
+ if (!Strings.isNullOrEmpty(tenantId)) {
+ PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
+ }
}
public Job createSubmittableJob(String schemaName, String indexTable, String dataTable,
@@ -363,10 +370,17 @@ public class IndexScrutinyTool extends Configured implements Tool {
printHelpAndExit(e.getMessage(), getOptions());
}
final Configuration configuration = HBaseConfiguration.addHbaseResources(getConf());
+ boolean useTenantId = cmdLine.hasOption(TENANT_ID_OPTION.getOpt());
+ String tenantId = null;
+ if (useTenantId) {
+ tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
+ configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+ LOG.info(String.format("IndexScrutinyTool uses a tenantId %s", tenantId));
+ }
connection = ConnectionUtil.getInputConnection(configuration);
final String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
final String dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
- final String indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
+ String indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
String basePath = cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
@@ -389,7 +403,7 @@ public class IndexScrutinyTool extends Configured implements Tool {
: EnvironmentEdgeManager.currentTimeMillis() - 60000;
if (indexTable != null) {
- if (!isValidIndexTable(connection, qDataTable, indexTable)) {
+ if (!IndexTool.isValidIndexTable(connection, qDataTable, indexTable, tenantId)) {
throw new IllegalArgumentException(String
.format(" %s is not an index table for %s ", indexTable, qDataTable));
}
@@ -421,7 +435,7 @@ public class IndexScrutinyTool extends Configured implements Tool {
outputFormat, outputMaxRows));
JobFactory jobFactory =
new JobFactory(connection, configuration, batchSize, useSnapshot, ts,
- outputInvalidRows, outputFormat, basePath, outputMaxRows);
+ outputInvalidRows, outputFormat, basePath, outputMaxRows, tenantId);
// If we are running the scrutiny with both tables as the source, run two separate jobs,
// one for each direction
if (SourceTable.BOTH.equals(sourceTable)) {
@@ -482,38 +496,6 @@ public class IndexScrutinyTool extends Configured implements Tool {
return jobs;
}
- /**
- * Checks for the validity of the index table passed to the job.
- * @param connection
- * @param masterTable
- * @param indexTable
- * @return
- * @throws SQLException
- */
- private boolean isValidIndexTable(final Connection connection, final String masterTable,
- final String indexTable) throws SQLException {
- final DatabaseMetaData dbMetaData = connection.getMetaData();
- final String schemaName = SchemaUtil.getSchemaNameFromFullName(masterTable);
- final String tableName =
- SchemaUtil.normalizeIdentifier(SchemaUtil.getTableNameFromFullName(masterTable));
-
- ResultSet rs = null;
- try {
- rs = dbMetaData.getIndexInfo("", schemaName, tableName, false, false);
- while (rs.next()) {
- final String indexName = rs.getString(6);
- if (indexTable.equalsIgnoreCase(indexName)) {
- return true;
- }
- }
- } finally {
- if (rs != null) {
- rs.close();
- }
- }
- return false;
- }
-
public static void main(final String[] args) throws Exception {
int result = ToolRunner.run(new IndexScrutinyTool(), args);
System.exit(result);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index d1d6ca2..a71bc27 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -33,6 +33,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import com.google.common.base.Strings;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
@@ -337,7 +338,7 @@ public class IndexTool extends Configured implements Tool {
ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
IndexMaintainer.serializeAdditional(pDataTable, indexMetaDataPtr, disabledPIndexes, connection.unwrap(PhoenixConnection.class));
PhoenixConfigurationUtil.setIndexMaintainers(configuration, indexMetaDataPtr);
- if (tenantId != null) {
+ if (!Strings.isNullOrEmpty(tenantId)) {
PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
}
@@ -792,7 +793,7 @@ public class IndexTool extends Configured implements Tool {
* @return
* @throws SQLException
*/
- private boolean isValidIndexTable(final Connection connection, final String masterTable,
+ public static boolean isValidIndexTable(final Connection connection, final String masterTable,
final String indexTable, final String tenantId) throws SQLException {
final DatabaseMetaData dbMetaData = connection.getMetaData();
final String schemaName = SchemaUtil.getSchemaNameFromFullName(masterTable);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
index 6f2959f..df8c7ab 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
@@ -61,6 +61,9 @@ public class IndexColumnNames {
if (pindexTable.getViewIndexId() != null) {
offset++;
}
+ if (pindexTable.isMultiTenant()) {
+ offset++;
+ }
if (offset > 0) {
pindexCols = pindexCols.subList(offset, pindexCols.size());
pkColumns = pkColumns.subList(offset, pkColumns.size());