You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2019/09/09 12:02:57 UTC
[phoenix] branch 4.x-HBase-1.4 updated: PHOENIX-5456
IndexScrutinyTool slow for indexes on multitenant tables and
IndexScrutinyIT doesn't run
This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch 4.x-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.4 by this push:
new 781a420 PHOENIX-5456 IndexScrutinyTool slow for indexes on multitenant tables and IndexScrutinyIT doesn't run
781a420 is described below
commit 781a420ba9f2447f095e2fb289ad1423bd296ebf
Author: Gokcen Iskender <gi...@salesforce.com>
AuthorDate: Thu Aug 29 14:55:28 2019 -0700
PHOENIX-5456 IndexScrutinyTool slow for indexes on multitenant tables and IndexScrutinyIT doesn't run
---
.../phoenix/end2end/IndexScrutinyToolBaseIT.java | 137 ++
.../end2end/IndexScrutinyToolForTenantIT.java | 258 ++++
.../phoenix/end2end/IndexScrutinyToolIT.java | 1339 ++++++++------------
.../phoenix/mapreduce/util/IndexColumnNames.java | 3 +-
4 files changed, 916 insertions(+), 821 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java
new file mode 100644
index 0000000..9536a12
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
+import org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link IndexScrutinyTool}
+ */
+public class IndexScrutinyToolBaseIT extends BaseTest {
+ protected String outputDir;
+
+ @BeforeClass public static void doSetup() throws Exception {
+ Map<String, String> serverProps = Maps.newHashMap();
+ //disable major compactions
+ serverProps.put(HConstants.MAJOR_COMPACTION_PERIOD, "0");
+ Map<String, String> clientProps = Maps.newHashMap();
+
+ clientProps.put(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB, Boolean.FALSE.toString());
+
+ setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
+ new ReadOnlyProps(clientProps.entrySet().iterator()));
+ }
+
+ protected List<Job> runScrutiny(String[] cmdArgs) throws Exception {
+ IndexScrutinyTool scrutiny = new IndexScrutinyTool();
+ Configuration conf = new Configuration(getUtility().getConfiguration());
+ scrutiny.setConf(conf);
+ int status = scrutiny.run(cmdArgs);
+ assertEquals(0, status);
+ for (Job job : scrutiny.getJobs()) {
+ assertTrue(job.waitForCompletion(true));
+ }
+ return scrutiny.getJobs();
+ }
+
+ protected String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize,
+ SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat, Long maxOutputRows, String tenantId, Long scrutinyTs) {
+ final List<String> args = Lists.newArrayList();
+ if (schemaName != null) {
+ args.add("-s");
+ args.add(schemaName);
+ }
+ args.add("-dt");
+ args.add(dataTable);
+ args.add("-it");
+ args.add(indxTable);
+
+ // TODO test snapshot reads
+ // if(useSnapshot) {
+ // args.add("-snap");
+ // }
+
+ if (OutputFormat.FILE.equals(outputFormat)) {
+ args.add("-op");
+ outputDir = "/tmp/" + UUID.randomUUID().toString();
+ args.add(outputDir);
+ }
+
+ args.add("-t");
+ args.add(String.valueOf(scrutinyTs));
+ args.add("-run-foreground");
+ if (batchSize != null) {
+ args.add("-b");
+ args.add(String.valueOf(batchSize));
+ }
+
+ // default to using data table as the source table
+ args.add("-src");
+ if (sourceTable == null) {
+ args.add(SourceTable.DATA_TABLE_SOURCE.name());
+ } else {
+ args.add(sourceTable.name());
+ }
+ if (outputInvalidRows) {
+ args.add("-o");
+ }
+ if (outputFormat != null) {
+ args.add("-of");
+ args.add(outputFormat.name());
+ }
+ if (maxOutputRows != null) {
+ args.add("-om");
+ args.add(maxOutputRows.toString());
+ }
+ if (tenantId != null) {
+ args.add("-tenant");
+ args.add(tenantId);
+ }
+ return args.toArray(new String[0]);
+ }
+
+ protected long getCounterValue(Counters counters, Enum<PhoenixScrutinyJobCounters> counter) {
+ return counters.findCounter(counter).getValue();
+ }
+
+ protected int countRows(Connection conn, String tableFullName) throws SQLException {
+ ResultSet count = conn.createStatement().executeQuery("select count(*) from " + tableFullName);
+ count.next();
+ int numRows = count.getInt(1);
+ return numRows;
+ }
+
+}
+
+
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolForTenantIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolForTenantIT.java
new file mode 100644
index 0000000..206e793
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolForTenantIT.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import com.google.common.base.Joiner;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
+import org.apache.phoenix.mapreduce.index.SourceTargetColumnNames;
+import org.apache.phoenix.parse.HintNode;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.phoenix.mapreduce.index.IndexScrutinyTableOutput.OUTPUT_TABLE_NAME;
+import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.INVALID_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.VALID_ROW_COUNT;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link IndexScrutinyTool}
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class IndexScrutinyToolForTenantIT extends IndexScrutinyToolBaseIT {
+ private Connection connGlobal = null;
+ private Connection connTenant = null;
+
+ private String tenantId;
+ private String tenantViewName;
+ private String indexNameTenant;
+ private String multiTenantTable;
+ private String viewIndexTableName;
+
+ private final String createViewStr = "CREATE VIEW %s AS SELECT * FROM %s";
+ private final String upsertQueryStr = "UPSERT INTO %s (COL1, ID, NAME) VALUES('%s' , %d, '%s')";
+ private final String createIndexStr = "CREATE INDEX %s ON %s (NAME) ";
+
+ /**
+ * Create the test data
+ */
+ @Before public void setup() throws SQLException {
+ tenantId = generateUniqueName();
+ tenantViewName = generateUniqueName();
+ indexNameTenant = generateUniqueName();
+ multiTenantTable = generateUniqueName();
+ viewIndexTableName = "_IDX_" + multiTenantTable;
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ connGlobal = DriverManager.getConnection(getUrl(), props);
+
+ props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+ connTenant = DriverManager.getConnection(getUrl(), props);
+ String createTblStr = "CREATE TABLE %s (COL1 VARCHAR(15) NOT NULL,ID INTEGER NOT NULL" + ", NAME VARCHAR, CONSTRAINT PK_1 PRIMARY KEY (COL1, ID)) MULTI_TENANT=true";
+
+ createTestTable(getUrl(), String.format(createTblStr, multiTenantTable));
+
+ connTenant.createStatement()
+ .execute(String.format(createViewStr, tenantViewName, multiTenantTable));
+
+ String idxStmtTenant = String.format(createIndexStr, indexNameTenant, tenantViewName);
+ connTenant.createStatement().execute(idxStmtTenant);
+ }
+
+ @After public void teardown() throws SQLException {
+ if (connGlobal != null) {
+ connGlobal.close();
+ }
+ if (connTenant != null) {
+ connTenant.close();
+ }
+ }
+
+ /**
+ * Tests that the config for max number of output rows is observed
+ */
+ @Test public void testTenantViewAndIndexEqual() throws Exception {
+ connTenant.createStatement()
+ .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 1, "x"));
+ connTenant.commit();
+
+ String[]
+ argValues =
+ getArgValues("", tenantViewName, indexNameTenant, 10L, SourceTable.INDEX_TABLE_SOURCE, false, null, null, tenantId,
+ EnvironmentEdgeManager.currentTimeMillis());
+
+ List<Job> completedJobs = runScrutiny(argValues);
+ // Sunny case, both index and view are equal. 1 row
+ assertEquals(1, completedJobs.size());
+ for (Job job : completedJobs) {
+ assertTrue(job.isSuccessful());
+ Counters counters = job.getCounters();
+ assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+ assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
+ }
+ }
+
+ /**
+ * Tests global view on multi-tenant table should work too
+ **/
+ @Test public void testGlobalViewOnMultiTenantTable() throws Exception {
+ String globalViewName = generateUniqueName();
+ String indexNameGlobal = generateUniqueName();
+
+ connGlobal.createStatement()
+ .execute(String.format(createViewStr, globalViewName, multiTenantTable));
+
+ String idxStmtGlobal = String.format(createIndexStr, indexNameGlobal, globalViewName);
+ connGlobal.createStatement().execute(idxStmtGlobal);
+ connGlobal.createStatement()
+ .execute(String.format(upsertQueryStr, globalViewName, "global", 5, "x"));
+ connGlobal.commit();
+ String[]
+ argValues =
+ getArgValues("", globalViewName, indexNameGlobal, 10L, SourceTable.INDEX_TABLE_SOURCE, false, null, null, null,
+ EnvironmentEdgeManager.currentTimeMillis());
+ List<Job> completedJobs = runScrutiny(argValues);
+ // Sunny case, both index and view are equal. 1 row
+ assertEquals(1, completedJobs.size());
+ for (Job job : completedJobs) {
+ assertTrue(job.isSuccessful());
+ Counters counters = job.getCounters();
+ assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+ assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
+ }
+ }
+
+ @Test public void testColumnsForSelectQueryOnMultiTenantTable() throws Exception {
+ String indexNameGlobal = generateUniqueName();
+ connGlobal.createStatement()
+ .execute(String.format(createIndexStr, indexNameGlobal, multiTenantTable));
+
+ PhoenixConnection pconn = connGlobal.unwrap(PhoenixConnection.class);
+ PTable pDataTable = pconn.getTable(new PTableKey(null, multiTenantTable));
+ PTable pIndexTable = pconn.getTable(new PTableKey(null, indexNameGlobal));
+
+ SourceTargetColumnNames
+ columnNames =
+ new SourceTargetColumnNames.IndexSourceColNames(pDataTable, pIndexTable);
+ String targetPksCsv = Joiner.on(",").join(SchemaUtil.getEscapedFullColumnNames(columnNames.getTargetPkColNames()));
+ String
+ selectQuery =
+ QueryUtil.constructSelectStatement(indexNameGlobal, columnNames.getCastedTargetColNames(), targetPksCsv,
+ HintNode.Hint.NO_INDEX, false);
+ assertEquals(selectQuery,
+ "SELECT /*+ NO_INDEX */ CAST(\"COL1\" AS VARCHAR(15)) , CAST(\"ID\" AS INTEGER) , CAST(\"0\".\"NAME\" AS VARCHAR) FROM "
+ + indexNameGlobal + " WHERE (\"COL1\",\"ID\")");
+ }
+
+ /**
+ * Use Both as source. Add 1 row to tenant view and disable index.
+ * Add 1 more to view and add a wrong row to index.
+ * Both have 1 invalid row, 1 valid row.
+ **/
+ @Test public void testOneValidOneInvalidUsingBothAsSource() throws Exception {
+ connTenant.createStatement()
+ .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 1, "x"));
+ connTenant.commit();
+ connTenant.createStatement().execute(
+ String.format("ALTER INDEX %s ON %S disable", indexNameTenant, tenantViewName));
+
+ connTenant.createStatement()
+ .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 2, "x2"));
+
+ connTenant.createStatement().execute(String.format("UPSERT INTO %s (\":ID\", \"0:NAME\") values (%d, '%s')",
+ indexNameTenant, 5555, "wrongName"));
+ connTenant.commit();
+
+ String[]
+ argValues =
+ getArgValues("", tenantViewName, indexNameTenant, 10L, SourceTable.BOTH, false, null, null, tenantId, EnvironmentEdgeManager.currentTimeMillis());
+ List<Job> completedJobs = runScrutiny(argValues);
+
+ assertEquals(2, completedJobs.size());
+ for (Job job : completedJobs) {
+ assertTrue(job.isSuccessful());
+ Counters counters = job.getCounters();
+ assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+ assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
+ }
+ }
+
+ /**
+ * Add 3 rows to Tenant view.
+ * Empty index table and observe they are not equal.
+ * Use data table as source and output to file.
+ **/
+ @Test public void testWithEmptyIndexTableOutputToFile() throws Exception {
+ testWithOutput(OutputFormat.FILE);
+ }
+
+ @Test public void testWithEmptyIndexTableOutputToTable() throws Exception {
+ testWithOutput(OutputFormat.TABLE);
+ assertEquals(3, countRows(connGlobal, OUTPUT_TABLE_NAME));
+ }
+
+ private void testWithOutput(OutputFormat outputFormat) throws Exception {
+ connTenant.createStatement()
+ .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 1, "x"));
+ connTenant.createStatement()
+ .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 2, "x2"));
+ connTenant.createStatement()
+ .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 3, "x3"));
+ connTenant.createStatement().execute(String.format("UPSERT INTO %s (\":ID\", \"0:NAME\") values (%d, '%s')",
+ indexNameTenant, 5555, "wrongName"));
+ connTenant.commit();
+
+ ConnectionQueryServices queryServices = connGlobal.unwrap(PhoenixConnection.class).getQueryServices();
+ Admin admin = queryServices.getAdmin();
+ TableName tableName = TableName.valueOf(viewIndexTableName);
+ admin.disableTable(tableName);
+ admin.truncateTable(tableName, false);
+
+ String[]
+ argValues =
+ getArgValues("", tenantViewName, indexNameTenant, 10L, SourceTable.DATA_TABLE_SOURCE, true, outputFormat, null,
+ tenantId, EnvironmentEdgeManager.currentTimeMillis());
+ List<Job> completedJobs = runScrutiny(argValues);
+
+ assertEquals(1, completedJobs.size());
+ for (Job job : completedJobs) {
+ assertTrue(job.isSuccessful());
+ Counters counters = job.getCounters();
+ assertEquals(0, getCounterValue(counters, VALID_ROW_COUNT));
+ assertEquals(3, getCounterValue(counters, INVALID_ROW_COUNT));
+ }
+ }
+}
+
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
index 9444444..c6f5418 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
@@ -10,7 +10,6 @@
*/
package org.apache.phoenix.end2end;
-import static org.apache.phoenix.mapreduce.index.IndexScrutinyTableOutput.OUTPUT_TABLE_NAME;
import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.BAD_COVERED_COL_VAL_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.BATCHES_PROCESSED_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.INVALID_ROW_COUNT;
@@ -32,922 +31,622 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.TreeSet;
-import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Sets;
import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTableOutput;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
-import org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters;
import org.apache.phoenix.mapreduce.index.SourceTargetColumnNames;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
-import org.apache.phoenix.query.BaseTest;
-import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.junit.After;
import org.junit.Before;
-import org.junit.BeforeClass;
+
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
/**
* Tests for the {@link IndexScrutinyTool}
*/
@Category(NeedsOwnMiniClusterTest.class)
-@RunWith(Enclosed.class)
-public class IndexScrutinyToolIT {
-
- abstract public static class SharedIndexToolIT extends BaseTest {
- protected String outputDir;
-
- @BeforeClass public static void doSetup() throws Exception {
- Map<String, String> serverProps = Maps.newHashMap();
- //disable major compactions
- serverProps.put(HConstants.MAJOR_COMPACTION_PERIOD, "0");
- Map<String, String> clientProps = Maps.newHashMap();
- setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
- new ReadOnlyProps(clientProps.entrySet().iterator()));
- }
-
- protected List<Job> runScrutiny(String[] cmdArgs) throws Exception {
- IndexScrutinyTool scrutiny = new IndexScrutinyTool();
- Configuration conf = new Configuration(getUtility().getConfiguration());
- scrutiny.setConf(conf);
- int status = scrutiny.run(cmdArgs);
- assertEquals(0, status);
- for (Job job : scrutiny.getJobs()) {
- assertTrue(job.waitForCompletion(true));
- }
- return scrutiny.getJobs();
- }
-
- protected String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize,
- SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat, Long maxOutputRows, String tenantId, Long scrutinyTs) {
- final List<String> args = Lists.newArrayList();
- if (schemaName != null) {
- args.add("-s");
- args.add(schemaName);
- }
- args.add("-dt");
- args.add(dataTable);
- args.add("-it");
- args.add(indxTable);
-
- // TODO test snapshot reads
- // if(useSnapshot) {
- // args.add("-snap");
- // }
-
- if (OutputFormat.FILE.equals(outputFormat)) {
- args.add("-op");
- outputDir = "/tmp/" + UUID.randomUUID().toString();
- args.add(outputDir);
- }
- args.add("-t");
- args.add(String.valueOf(scrutinyTs));
- args.add("-run-foreground");
- if (batchSize != null) {
- args.add("-b");
- args.add(String.valueOf(batchSize));
- }
-
- // default to using data table as the source table
- args.add("-src");
- if (sourceTable == null) {
- args.add(SourceTable.DATA_TABLE_SOURCE.name());
- } else {
- args.add(sourceTable.name());
- }
- if (outputInvalidRows) {
- args.add("-o");
- }
- if (outputFormat != null) {
- args.add("-of");
- args.add(outputFormat.name());
- }
- if (maxOutputRows != null) {
- args.add("-om");
- args.add(maxOutputRows.toString());
- }
- if (tenantId != null) {
- args.add("-tenant");
- args.add(tenantId);
- }
- return args.toArray(new String[0]);
- }
-
- protected long getCounterValue(Counters counters, Enum<PhoenixScrutinyJobCounters> counter) {
- return counters.findCounter(counter).getValue();
- }
-
- protected int countRows(Connection conn, String tableFullName) throws SQLException {
- ResultSet count = conn.createStatement().executeQuery("select count(*) from " + tableFullName);
- count.next();
- int numRows = count.getInt(1);
- return numRows;
- }
-
- }
-
- @RunWith(Parameterized.class)
- public static class IndexScrutinyToolNonTenantIT extends SharedIndexToolIT {
-
- private String dataTableDdl;
- private String indexTableDdl;
+@RunWith(Parameterized.class)
+public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
+ private String dataTableDdl;
+ private String indexTableDdl;
- private static final String UPSERT_SQL = "UPSERT INTO %s VALUES(?,?,?,?)";
+ private static final String UPSERT_SQL = "UPSERT INTO %s VALUES(?,?,?,?)";
- private static final String
- INDEX_UPSERT_SQL =
- "UPSERT INTO %s (\"0:NAME\", \":ID\", \"0:ZIP\", \"0:EMPLOY_DATE\") values (?,?,?,?)";
+ private static final String
+ INDEX_UPSERT_SQL =
+ "UPSERT INTO %s (\"0:NAME\", \":ID\", \"0:ZIP\", \"0:EMPLOY_DATE\") values (?,?,?,?)";
- private static final String DELETE_SQL = "DELETE FROM %s ";
+ private static final String DELETE_SQL = "DELETE FROM %s ";
- private String schemaName;
- private String dataTableName;
- private String dataTableFullName;
- private String indexTableName;
- private String indexTableFullName;
+ private String schemaName;
+ private String dataTableName;
+ private String dataTableFullName;
+ private String indexTableName;
+ private String indexTableFullName;
- private Connection conn;
+ private Connection conn;
- private PreparedStatement dataTableUpsertStmt;
+ private PreparedStatement dataTableUpsertStmt;
- private PreparedStatement indexTableUpsertStmt;
+ private PreparedStatement indexTableUpsertStmt;
- private long testTime;
- private Properties props;
+ private long testTime;
+ private Properties props;
- @Parameterized.Parameters public static Collection<Object[]> data() {
- return Arrays.asList(new Object[][] { { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR)",
- "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" }, { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2",
- "CREATE INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" }, { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2",
- "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" } });
- }
+ @Parameterized.Parameters public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] { { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR)",
+ "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" }, { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2",
+ "CREATE INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" }, { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2",
+ "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" } });
+ }
- public IndexScrutinyToolNonTenantIT(String dataTableDdl, String indexTableDdl) {
- this.dataTableDdl = dataTableDdl;
- this.indexTableDdl = indexTableDdl;
- }
+ public IndexScrutinyToolIT(String dataTableDdl, String indexTableDdl) {
+ this.dataTableDdl = dataTableDdl;
+ this.indexTableDdl = indexTableDdl;
+ }
- /**
- * Create the test data and index tables
- */
- @Before public void setup() throws SQLException {
- generateUniqueTableNames();
- createTestTable(getUrl(), String.format(dataTableDdl, dataTableFullName));
- createTestTable(getUrl(), String.format(indexTableDdl, indexTableName, dataTableFullName));
- props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- conn = DriverManager.getConnection(getUrl(), props);
- String dataTableUpsert = String.format(UPSERT_SQL, dataTableFullName);
- dataTableUpsertStmt = conn.prepareStatement(dataTableUpsert);
- String indexTableUpsert = String.format(INDEX_UPSERT_SQL, indexTableFullName);
- indexTableUpsertStmt = conn.prepareStatement(indexTableUpsert);
- conn.setAutoCommit(false);
- testTime = EnvironmentEdgeManager.currentTimeMillis() - 1000;
+ /**
+ * Create the test data and index tables
+ */
+ @Before public void setup() throws SQLException {
+ generateUniqueTableNames();
+ createTestTable(getUrl(), String.format(dataTableDdl, dataTableFullName));
+ createTestTable(getUrl(), String.format(indexTableDdl, indexTableName, dataTableFullName));
+ props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ conn = DriverManager.getConnection(getUrl(), props);
+ String dataTableUpsert = String.format(UPSERT_SQL, dataTableFullName);
+ dataTableUpsertStmt = conn.prepareStatement(dataTableUpsert);
+ String indexTableUpsert = String.format(INDEX_UPSERT_SQL, indexTableFullName);
+ indexTableUpsertStmt = conn.prepareStatement(indexTableUpsert);
+ conn.setAutoCommit(false);
+ testTime = EnvironmentEdgeManager.currentTimeMillis() - 1000;
- }
+ }
- @After public void teardown() throws SQLException {
- if (conn != null) {
- conn.close();
- }
+ @After public void teardown() throws SQLException {
+ if (conn != null) {
+ conn.close();
}
+ }
- /**
- * Tests a data table that is correctly indexed. Scrutiny should report all rows as valid.
- */
- @Test public void testValidIndex() throws Exception {
- // insert two rows
- upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
- upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
- conn.commit();
-
- int numDataRows = countRows(conn, dataTableFullName);
- int numIndexRows = countRows(conn, indexTableFullName);
-
- // scrutiny should report everything as ok
- List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
- Job job = completedJobs.get(0);
- assertTrue(job.isSuccessful());
- Counters counters = job.getCounters();
- assertEquals(2, getCounterValue(counters, VALID_ROW_COUNT));
- assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
-
- // make sure row counts weren't modified by scrutiny
- assertEquals(numDataRows, countRows(conn, dataTableFullName));
- assertEquals(numIndexRows, countRows(conn, indexTableFullName));
- }
+ /**
+ * Tests a data table that is correctly indexed. Scrutiny should report all rows as valid.
+ */
+ @Test public void testValidIndex() throws Exception {
+ // insert two rows
+ upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
+ upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
+ conn.commit();
+
+ int numDataRows = countRows(conn, dataTableFullName);
+ int numIndexRows = countRows(conn, indexTableFullName);
+
+ // scrutiny should report everything as ok
+ List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
+ Job job = completedJobs.get(0);
+ assertTrue(job.isSuccessful());
+ Counters counters = job.getCounters();
+ assertEquals(2, getCounterValue(counters, VALID_ROW_COUNT));
+ assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
+
+ // make sure row counts weren't modified by scrutiny
+ assertEquals(numDataRows, countRows(conn, dataTableFullName));
+ assertEquals(numIndexRows, countRows(conn, indexTableFullName));
+ }
- /**
- * Tests running a scrutiny while updates and deletes are happening.
- * Since CURRENT_SCN is set, the scrutiny shouldn't report any issue.
- */
- @Test public void testScrutinyWhileTakingWrites() throws Exception {
- int id = 0;
- while (id < 1000) {
- int index = 1;
- dataTableUpsertStmt.setInt(index++, id);
- dataTableUpsertStmt.setString(index++, "name-" + id);
- dataTableUpsertStmt.setInt(index++, id);
- dataTableUpsertStmt.setTimestamp(index++, new Timestamp(testTime));
- dataTableUpsertStmt.executeUpdate();
- id++;
- }
- conn.commit();
-
- //CURRENT_SCN for scrutiny
- long scrutinyTS = EnvironmentEdgeManager.currentTimeMillis();
-
- // launch background upserts and deletes
- final Random random = new Random(0);
- Runnable backgroundUpserts = new Runnable() {
- @Override public void run() {
- int idToUpsert = random.nextInt(1000);
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- PreparedStatement
- dataPS =
- conn.prepareStatement(String.format(UPSERT_SQL, dataTableFullName));
- upsertRow(dataPS, idToUpsert, "modified-" + idToUpsert, idToUpsert + 1000);
- conn.commit();
- } catch (SQLException e) {
- e.printStackTrace();
- }
+ /**
+ * Tests running a scrutiny while updates and deletes are happening.
+ * Since CURRENT_SCN is set, the scrutiny shouldn't report any issue.
+ */
+ @Test @Ignore("PHOENIX-4378 Unable to set KEEP_DELETED_CELLS to true on RS scanner") public void testScrutinyWhileTakingWrites() throws Exception {
+ int id = 0;
+ while (id < 1000) {
+ int index = 1;
+ dataTableUpsertStmt.setInt(index++, id);
+ dataTableUpsertStmt.setString(index++, "name-" + id);
+ dataTableUpsertStmt.setInt(index++, id);
+ dataTableUpsertStmt.setTimestamp(index++, new Timestamp(testTime));
+ dataTableUpsertStmt.executeUpdate();
+ id++;
+ }
+ conn.commit();
+
+ //CURRENT_SCN for scrutiny
+ long scrutinyTS = EnvironmentEdgeManager.currentTimeMillis();
+
+ // launch background upserts and deletes
+ final Random random = new Random(0);
+ Runnable backgroundUpserts = new Runnable() {
+ @Override public void run() {
+ int idToUpsert = random.nextInt(1000);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ PreparedStatement
+ dataPS =
+ conn.prepareStatement(String.format(UPSERT_SQL, dataTableFullName));
+ upsertRow(dataPS, idToUpsert, "modified-" + idToUpsert, idToUpsert + 1000);
+ conn.commit();
+ } catch (SQLException e) {
+ e.printStackTrace();
}
- };
- Runnable backgroundDeletes = new Runnable() {
- @Override public void run() {
- int idToDelete = random.nextInt(1000);
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- String
- deleteSql =
- String.format(DELETE_SQL, indexTableFullName) + "WHERE \":ID\"=" + idToDelete;
- conn.createStatement().executeUpdate(deleteSql);
- conn.commit();
- } catch (SQLException e) {
- e.printStackTrace();
- }
+ }
+ };
+ Runnable backgroundDeletes = new Runnable() {
+ @Override public void run() {
+ int idToDelete = random.nextInt(1000);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String
+ deleteSql =
+ String.format(DELETE_SQL, indexTableFullName) + "WHERE \":ID\"=" + idToDelete;
+ conn.createStatement().executeUpdate(deleteSql);
+ conn.commit();
+ } catch (SQLException e) {
+ e.printStackTrace();
}
- };
- ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
- scheduledThreadPool.scheduleWithFixedDelay(backgroundUpserts, 200, 200, TimeUnit.MILLISECONDS);
- scheduledThreadPool.scheduleWithFixedDelay(backgroundDeletes, 200, 200, TimeUnit.MILLISECONDS);
-
- // scrutiny should report everything as ok
- List<Job>
- completedJobs =
- runScrutinyCurrentSCN(schemaName, dataTableName, indexTableName, scrutinyTS);
- Job job = completedJobs.get(0);
- assertTrue(job.isSuccessful());
- Counters counters = job.getCounters();
- assertEquals(1000, getCounterValue(counters, VALID_ROW_COUNT));
- assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
- scheduledThreadPool.shutdown();
- scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS);
- }
-
- /**
- * Tests an index with the same # of rows as the data table, but one of the index rows is
- * incorrect Scrutiny should report the invalid rows.
- */
- @Test public void testEqualRowCountIndexIncorrect() throws Exception {
- // insert one valid row
- upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
- conn.commit();
-
- // disable the index and insert another row which is not indexed
- disableIndex();
- upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
- conn.commit();
-
- // insert a bad row into the index
- upsertIndexRow("badName", 2, 9999);
- conn.commit();
-
- // scrutiny should report the bad row
- List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
- Job job = completedJobs.get(0);
- assertTrue(job.isSuccessful());
- Counters counters = job.getCounters();
- assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
- assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
- }
-
- /**
- * Tests an index where the index pk is correct (indexed col values are indexed correctly), but
- * a covered index value is incorrect. Scrutiny should report the invalid row
- */
- @Test public void testCoveredValueIncorrect() throws Exception {
- // insert one valid row
- upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
- conn.commit();
-
- // disable index and insert another data row
- disableIndex();
- upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
- conn.commit();
-
- // insert a bad index row for the above data row
- upsertIndexRow("name-2", 2, 9999);
- conn.commit();
-
- // scrutiny should report the bad row
- List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
- Job job = completedJobs.get(0);
- assertTrue(job.isSuccessful());
- Counters counters = job.getCounters();
- assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
- assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
- assertEquals(1, getCounterValue(counters, BAD_COVERED_COL_VAL_COUNT));
- }
-
- /**
- * Test batching of row comparisons Inserts 1001 rows, with some random bad rows, and runs
- * scrutiny with batchsize of 10,
- */
- @Test public void testBatching() throws Exception {
- // insert 1001 data and index rows
- int numTestRows = 1001;
- for (int i = 0; i < numTestRows; i++) {
- upsertRow(dataTableUpsertStmt, i, "name-" + i, i + 1000);
}
- conn.commit();
+ };
+ ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
+ scheduledThreadPool.scheduleWithFixedDelay(backgroundUpserts, 200, 200, TimeUnit.MILLISECONDS);
+ scheduledThreadPool.scheduleWithFixedDelay(backgroundDeletes, 200, 200, TimeUnit.MILLISECONDS);
+
+ // scrutiny should report everything as ok
+ List<Job>
+ completedJobs =
+ runScrutinyCurrentSCN(schemaName, dataTableName, indexTableName, scrutinyTS);
+ Job job = completedJobs.get(0);
+ assertTrue(job.isSuccessful());
+ Counters counters = job.getCounters();
+ assertEquals(1000, getCounterValue(counters, VALID_ROW_COUNT));
+ assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
+ scheduledThreadPool.shutdown();
+ scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS);
+ }
- disableIndex();
+ /**
+ * Tests an index with the same # of rows as the data table, but one of the index rows is
+ * incorrect Scrutiny should report the invalid rows.
+ */
+ @Test public void testEqualRowCountIndexIncorrect() throws Exception {
+ // insert one valid row
+ upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
+ conn.commit();
+
+ // disable the index and insert another row which is not indexed
+ disableIndex();
+ upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
+ conn.commit();
+
+ // insert a bad row into the index
+ upsertIndexRow("badName", 2, 9999);
+ conn.commit();
+
+ // scrutiny should report the bad row
+ List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
+ Job job = completedJobs.get(0);
+ assertTrue(job.isSuccessful());
+ Counters counters = job.getCounters();
+ assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+ assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
+ }
- // randomly delete some rows from the index
- Random random = new Random();
- for (int i = 0; i < 100; i++) {
- int idToDelete = random.nextInt(numTestRows);
- deleteRow(indexTableFullName, "WHERE \":ID\"=" + idToDelete);
- }
- conn.commit();
- int numRows = countRows(conn, indexTableFullName);
- int numDeleted = numTestRows - numRows;
+ /**
+ * Tests an index where the index pk is correct (indexed col values are indexed correctly), but
+ * a covered index value is incorrect. Scrutiny should report the invalid row
+ */
+ @Test public void testCoveredValueIncorrect() throws Exception {
+ // insert one valid row
+ upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
+ conn.commit();
+
+ // disable index and insert another data row
+ disableIndex();
+ upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
+ conn.commit();
+
+ // insert a bad index row for the above data row
+ upsertIndexRow("name-2", 2, 9999);
+ conn.commit();
+
+ // scrutiny should report the bad row
+ List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
+ Job job = completedJobs.get(0);
+ assertTrue(job.isSuccessful());
+ Counters counters = job.getCounters();
+ assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+ assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
+ assertEquals(1, getCounterValue(counters, BAD_COVERED_COL_VAL_COUNT));
+ }
- // run scrutiny with batch size of 10
- List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName, 10L);
- Job job = completedJobs.get(0);
- assertTrue(job.isSuccessful());
- Counters counters = job.getCounters();
- assertEquals(numTestRows - numDeleted, getCounterValue(counters, VALID_ROW_COUNT));
- assertEquals(numDeleted, getCounterValue(counters, INVALID_ROW_COUNT));
- assertEquals(numTestRows / 10 + numTestRows % 10,
- getCounterValue(counters, BATCHES_PROCESSED_COUNT));
- }
+ /**
+ * Test batching of row comparisons Inserts 1001 rows, with some random bad rows, and runs
+ * scrutiny with batchsize of 10,
+ */
+ @Test public void testBatching() throws Exception {
+ // insert 1001 data and index rows
+ int numTestRows = 1001;
+ for (int i = 0; i < numTestRows; i++) {
+ upsertRow(dataTableUpsertStmt, i, "name-" + i, i + 1000);
+ }
+ conn.commit();
+
+ disableIndex();
+
+ // randomly delete some rows from the index
+ Random random = new Random();
+ for (int i = 0; i < 100; i++) {
+ int idToDelete = random.nextInt(numTestRows);
+ deleteRow(indexTableFullName, "WHERE \":ID\"=" + idToDelete);
+ }
+ conn.commit();
+ int numRows = countRows(conn, indexTableFullName);
+ int numDeleted = numTestRows - numRows;
+
+ // run scrutiny with batch size of 10
+ List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName, 10L);
+ Job job = completedJobs.get(0);
+ assertTrue(job.isSuccessful());
+ Counters counters = job.getCounters();
+ assertEquals(numTestRows - numDeleted, getCounterValue(counters, VALID_ROW_COUNT));
+ assertEquals(numDeleted, getCounterValue(counters, INVALID_ROW_COUNT));
+ assertEquals(numTestRows / 10 + numTestRows % 10,
+ getCounterValue(counters, BATCHES_PROCESSED_COUNT));
+ }
- /**
- * Tests when there are more data table rows than index table rows Scrutiny should report the
- * number of incorrect rows
- */
- @Test public void testMoreDataRows() throws Exception {
- upsertRow(dataTableUpsertStmt, 1, "name-1", 95123);
- conn.commit();
- disableIndex();
- // these rows won't have a corresponding index row
- upsertRow(dataTableUpsertStmt, 2, "name-2", 95124);
- upsertRow(dataTableUpsertStmt, 3, "name-3", 95125);
- conn.commit();
-
- List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
- Job job = completedJobs.get(0);
- assertTrue(job.isSuccessful());
- Counters counters = job.getCounters();
- assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
- assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT));
- }
+ /**
+ * Tests when there are more data table rows than index table rows Scrutiny should report the
+ * number of incorrect rows
+ */
+ @Test public void testMoreDataRows() throws Exception {
+ upsertRow(dataTableUpsertStmt, 1, "name-1", 95123);
+ conn.commit();
+ disableIndex();
+ // these rows won't have a corresponding index row
+ upsertRow(dataTableUpsertStmt, 2, "name-2", 95124);
+ upsertRow(dataTableUpsertStmt, 3, "name-3", 95125);
+ conn.commit();
+
+ List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
+ Job job = completedJobs.get(0);
+ assertTrue(job.isSuccessful());
+ Counters counters = job.getCounters();
+ assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+ assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT));
+ }
+
+ /**
+ * Tests when there are more index table rows than data table rows Scrutiny should report the
+ * number of incorrect rows when run with the index as the source table
+ */
+ @Test public void testMoreIndexRows() throws Exception {
+ upsertRow(dataTableUpsertStmt, 1, "name-1", 95123);
+ conn.commit();
+ disableIndex();
+ // these index rows won't have a corresponding data row
+ upsertIndexRow("name-2", 2, 95124);
+ upsertIndexRow("name-3", 3, 95125);
+ conn.commit();
+
+ List<Job>
+ completedJobs =
+ runScrutiny(schemaName, dataTableName, indexTableName, 10L, SourceTable.INDEX_TABLE_SOURCE);
+ Job job = completedJobs.get(0);
+ assertTrue(job.isSuccessful());
+ Counters counters = job.getCounters();
+ assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+ assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT));
+ }
- /**
- * Tests when there are more index table rows than data table rows Scrutiny should report the
- * number of incorrect rows when run with the index as the source table
- */
- @Test public void testMoreIndexRows() throws Exception {
- upsertRow(dataTableUpsertStmt, 1, "name-1", 95123);
- conn.commit();
- disableIndex();
- // these index rows won't have a corresponding data row
- upsertIndexRow("name-2", 2, 95124);
- upsertIndexRow("name-3", 3, 95125);
- conn.commit();
-
- List<Job>
- completedJobs =
- runScrutiny(schemaName, dataTableName, indexTableName, 10L, SourceTable.INDEX_TABLE_SOURCE);
- Job job = completedJobs.get(0);
+ /**
+ * Tests running with both the index and data tables as the source table If we have an
+ * incorrectly indexed row, it should be reported in each direction
+ */
+ @Test public void testBothDataAndIndexAsSource() throws Exception {
+ // insert one valid row
+ upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
+ conn.commit();
+
+ // disable the index and insert another row which is not indexed
+ disableIndex();
+ upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
+ conn.commit();
+
+ // insert a bad row into the index
+ upsertIndexRow("badName", 2, 9999);
+ conn.commit();
+
+ List<Job>
+ completedJobs =
+ runScrutiny(schemaName, dataTableName, indexTableName, 10L, SourceTable.BOTH);
+ assertEquals(2, completedJobs.size());
+ for (Job job : completedJobs) {
assertTrue(job.isSuccessful());
Counters counters = job.getCounters();
assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
- assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT));
- }
-
- /**
- * Tests running with both the index and data tables as the source table If we have an
- * incorrectly indexed row, it should be reported in each direction
- */
- @Test public void testBothDataAndIndexAsSource() throws Exception {
- // insert one valid row
- upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
- conn.commit();
-
- // disable the index and insert another row which is not indexed
- disableIndex();
- upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
- conn.commit();
-
- // insert a bad row into the index
- upsertIndexRow("badName", 2, 9999);
- conn.commit();
-
- List<Job>
- completedJobs =
- runScrutiny(schemaName, dataTableName, indexTableName, 10L, SourceTable.BOTH);
- assertEquals(2, completedJobs.size());
- for (Job job : completedJobs) {
- assertTrue(job.isSuccessful());
- Counters counters = job.getCounters();
- assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
- assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
- }
+ assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
}
+ }
- /**
- * Tests that with the output to file option set, the scrutiny tool outputs invalid rows to file
- */
- @Test public void testOutputInvalidRowsToFile() throws Exception {
- insertOneValid_OneBadVal_OneMissingTarget();
-
- String[]
- argValues =
- getArgValues(schemaName, dataTableName, indexTableName, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.FILE, null);
- runScrutiny(argValues);
-
- // check the output files
- Path outputPath = CsvBulkImportUtil.getOutputPath(new Path(outputDir), dataTableFullName);
- DistributedFileSystem fs = getUtility().getDFSCluster().getFileSystem();
- List<Path> paths = Lists.newArrayList();
- Path firstPart = null;
- for (FileStatus outputFile : fs.listStatus(outputPath)) {
- if (outputFile.getPath().getName().startsWith("part")) {
- if (firstPart == null) {
- firstPart = outputFile.getPath();
- } else {
- paths.add(outputFile.getPath());
- }
+ /**
+ * Tests that with the output to file option set, the scrutiny tool outputs invalid rows to file
+ */
+ @Test public void testOutputInvalidRowsToFile() throws Exception {
+ insertOneValid_OneBadVal_OneMissingTarget();
+
+ String[]
+ argValues =
+ getArgValues(schemaName, dataTableName, indexTableName, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.FILE, null);
+ runScrutiny(argValues);
+
+ // check the output files
+ Path outputPath = CsvBulkImportUtil.getOutputPath(new Path(outputDir), dataTableFullName);
+ DistributedFileSystem fs = getUtility().getDFSCluster().getFileSystem();
+ List<Path> paths = Lists.newArrayList();
+ Path firstPart = null;
+ for (FileStatus outputFile : fs.listStatus(outputPath)) {
+ if (outputFile.getPath().getName().startsWith("part")) {
+ if (firstPart == null) {
+ firstPart = outputFile.getPath();
+ } else {
+ paths.add(outputFile.getPath());
}
}
- if (dataTableDdl.contains("SALT_BUCKETS")) {
- fs.concat(firstPart, paths.toArray(new Path[0]));
- }
- Path outputFilePath = firstPart;
- assertTrue(fs.exists(outputFilePath));
- FSDataInputStream fsDataInputStream = fs.open(outputFilePath);
- BufferedReader reader = new BufferedReader(new InputStreamReader(fsDataInputStream));
- TreeSet<String> lines = Sets.newTreeSet();
- try {
- String line = null;
- while ((line = reader.readLine()) != null) {
- lines.add(line);
- }
- } finally {
- IOUtils.closeQuietly(reader);
- IOUtils.closeQuietly(fsDataInputStream);
+ }
+ if (dataTableDdl.contains("SALT_BUCKETS")) {
+ fs.concat(firstPart, paths.toArray(new Path[0]));
+ }
+ Path outputFilePath = firstPart;
+ assertTrue(fs.exists(outputFilePath));
+ FSDataInputStream fsDataInputStream = fs.open(outputFilePath);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(fsDataInputStream));
+ TreeSet<String> lines = Sets.newTreeSet();
+ try {
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ lines.add(line);
}
- Iterator<String> lineIterator = lines.iterator();
- assertEquals(
- "[2, name-2, " + new Timestamp(testTime).toString() + ", 95123]\t[2, name-2, " + new Timestamp(testTime).toString() + ", 9999]", lineIterator.next());
- assertEquals("[3, name-3, " + new Timestamp(testTime).toString() + ", 95123]\tTarget row not found",
- lineIterator.next());
+ } finally {
+ IOUtils.closeQuietly(reader);
+ IOUtils.closeQuietly(fsDataInputStream);
+ }
+ Iterator<String> lineIterator = lines.iterator();
+ assertEquals("[2, name-2, " + new Timestamp(testTime).toString() + ", 95123]\t[2, name-2, "
+ + new Timestamp(testTime).toString() + ", 9999]", lineIterator.next());
+ assertEquals("[3, name-3, " + new Timestamp(testTime).toString() + ", 95123]\tTarget row not found",
+ lineIterator.next());
+ }
- }
+ /**
+ * Tests writing of results to the output table
+ */
+ @Test public void testOutputInvalidRowsToTable() throws Exception {
+ insertOneValid_OneBadVal_OneMissingTarget();
+ String[]
+ argValues =
+ getArgValues(schemaName, dataTableName, indexTableName, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, null);
+ List<Job> completedJobs = runScrutiny(argValues);
+
+ // check that the output table contains the invalid rows
+ long
+ scrutinyTimeMillis =
+ PhoenixConfigurationUtil.getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration());
+ String
+ invalidRowsQuery =
+ IndexScrutinyTableOutput
+ .getSqlQueryAllInvalidRows(conn, getColNames(), scrutinyTimeMillis);
+ ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery + " ORDER BY ID asc");
+ assertTrue(rs.next());
+ assertEquals(dataTableFullName, rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME));
+ assertEquals(indexTableFullName, rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME));
+ assertTrue(rs.getBoolean("HAS_TARGET_ROW"));
+ assertEquals(2, rs.getInt("ID"));
+ assertEquals(2, rs.getInt(":ID"));
+ assertEquals(95123, rs.getInt("ZIP"));
+ assertEquals(9999, rs.getInt("0:ZIP")); // covered col zip incorrect
+ assertTrue(rs.next());
+ assertEquals(dataTableFullName, rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME));
+ assertEquals(indexTableFullName, rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME));
+ assertFalse(rs.getBoolean("HAS_TARGET_ROW"));
+ assertEquals(3, rs.getInt("ID"));
+ assertEquals(null, rs.getObject(":ID")); // null for missing target row
+ assertFalse(rs.next());
+
+ // check that the job results were written correctly to the metadata table
+ assertMetadataTableValues(argValues, scrutinyTimeMillis, invalidRowsQuery);
+ }
- /**
- * Tests writing of results to the output table
- */
- @Test public void testOutputInvalidRowsToTable() throws Exception {
- insertOneValid_OneBadVal_OneMissingTarget();
- String[]
- argValues =
- getArgValues(schemaName, dataTableName, indexTableName, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, null);
- List<Job> completedJobs = runScrutiny(argValues);
-
- // check that the output table contains the invalid rows
- long
- scrutinyTimeMillis =
- PhoenixConfigurationUtil.getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration());
- String
- invalidRowsQuery =
- IndexScrutinyTableOutput
- .getSqlQueryAllInvalidRows(conn, getColNames(), scrutinyTimeMillis);
- ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery + " ORDER BY ID asc");
- assertTrue(rs.next());
- assertEquals(dataTableFullName, rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME));
- assertEquals(indexTableFullName, rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME));
- assertTrue(rs.getBoolean("HAS_TARGET_ROW"));
- assertEquals(2, rs.getInt("ID"));
- assertEquals(2, rs.getInt(":ID"));
- assertEquals(95123, rs.getInt("ZIP"));
- assertEquals(9999, rs.getInt("0:ZIP")); // covered col zip incorrect
+ /**
+ * Tests that the config for max number of output rows is observed
+ */
+ @Test public void testMaxOutputRows() throws Exception {
+ insertOneValid_OneBadVal_OneMissingTarget();
+ // set max to 1. There are two bad rows, but only 1 should get written to output table
+ String[]
+ argValues =
+ getArgValues(schemaName, dataTableName, indexTableName, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, new Long(1));
+ List<Job> completedJobs = runScrutiny(argValues);
+ long
+ scrutinyTimeMillis =
+ PhoenixConfigurationUtil.getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration());
+ String
+ invalidRowsQuery =
+ IndexScrutinyTableOutput
+ .getSqlQueryAllInvalidRows(conn, getColNames(), scrutinyTimeMillis);
+ ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery);
+ assertTrue(rs.next());
+ if (dataTableDdl.contains("SALT_BUCKETS")) {
assertTrue(rs.next());
- assertEquals(dataTableFullName, rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME));
- assertEquals(indexTableFullName, rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME));
- assertFalse(rs.getBoolean("HAS_TARGET_ROW"));
- assertEquals(3, rs.getInt("ID"));
- assertEquals(null, rs.getObject(":ID")); // null for missing target row
assertFalse(rs.next());
-
- // check that the job results were written correctly to the metadata table
- assertMetadataTableValues(argValues, scrutinyTimeMillis, invalidRowsQuery);
- }
-
- /**
- * Tests that the config for max number of output rows is observed
- */
- @Test public void testMaxOutputRows() throws Exception {
- insertOneValid_OneBadVal_OneMissingTarget();
- // set max to 1. There are two bad rows, but only 1 should get written to output table
- String[]
- argValues =
- getArgValues(schemaName, dataTableName, indexTableName, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, new Long(1));
- List<Job> completedJobs = runScrutiny(argValues);
- long
- scrutinyTimeMillis =
- PhoenixConfigurationUtil.getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration());
- String
- invalidRowsQuery =
- IndexScrutinyTableOutput
- .getSqlQueryAllInvalidRows(conn, getColNames(), scrutinyTimeMillis);
- ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery);
- assertTrue(rs.next());
- if (dataTableDdl.contains("SALT_BUCKETS")) {
- assertTrue(rs.next());
- assertFalse(rs.next());
- } else {
- assertFalse(rs.next());
- }
+ } else {
+ assertFalse(rs.next());
}
+ }
- private SourceTargetColumnNames getColNames() throws SQLException {
- PTable pdataTable = PhoenixRuntime.getTable(conn, dataTableFullName);
- PTable pindexTable = PhoenixRuntime.getTable(conn, indexTableFullName);
- SourceTargetColumnNames
- columnNames =
- new SourceTargetColumnNames.DataSourceColNames(pdataTable, pindexTable);
- return columnNames;
- }
+ private SourceTargetColumnNames getColNames() throws SQLException {
+ PTable pdataTable = PhoenixRuntime.getTable(conn, dataTableFullName);
+ PTable pindexTable = PhoenixRuntime.getTable(conn, indexTableFullName);
+ SourceTargetColumnNames
+ columnNames =
+ new SourceTargetColumnNames.DataSourceColNames(pdataTable, pindexTable);
+ return columnNames;
+ }
- // inserts one valid data/index row, one data row with a missing index row,
- // and one data row with an index row that has a bad covered col val
- private void insertOneValid_OneBadVal_OneMissingTarget() throws SQLException {
- // insert one valid row
- upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
- conn.commit();
-
- // disable the index and insert another row which is not indexed
- disableIndex();
- upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
- upsertRow(dataTableUpsertStmt, 3, "name-3", 95123);
- conn.commit();
-
- // insert a bad index row for one of the above data rows
- upsertIndexRow("name-2", 2, 9999);
- conn.commit();
- }
+ // inserts one valid data/index row, one data row with a missing index row,
+ // and one data row with an index row that has a bad covered col val
+ private void insertOneValid_OneBadVal_OneMissingTarget() throws SQLException {
+ // insert one valid row
+ upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
+ conn.commit();
+
+ // disable the index and insert another row which is not indexed
+ disableIndex();
+ upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
+ upsertRow(dataTableUpsertStmt, 3, "name-3", 95123);
+ conn.commit();
+
+ // insert a bad index row for one of the above data rows
+ upsertIndexRow("name-2", 2, 9999);
+ conn.commit();
+ }
- private void assertMetadataTableValues(String[] argValues, long scrutinyTimeMillis, String invalidRowsQuery) throws SQLException {
- ResultSet rs;
- ResultSet
- metadataRs =
- IndexScrutinyTableOutput
- .queryAllMetadata(conn, dataTableFullName, indexTableFullName,
- scrutinyTimeMillis);
- assertTrue(metadataRs.next());
- List<? extends Object>
- expected =
+ private void assertMetadataTableValues(String[] argValues, long scrutinyTimeMillis, String invalidRowsQuery) throws SQLException {
+ ResultSet rs;
+ ResultSet
+ metadataRs =
+ IndexScrutinyTableOutput
+ .queryAllMetadata(conn, dataTableFullName, indexTableFullName,
+ scrutinyTimeMillis);
+ assertTrue(metadataRs.next());
+ List<? extends Object>
+ expected =
+ Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis, SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L,
+ 1L, 2L, 1L, 1L,
+ "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
+ "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]",
+ invalidRowsQuery);
+ if (dataTableDdl.contains("SALT_BUCKETS")) {
+ expected =
Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis,
SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L,
- 2L, 1L, 1L,
+ 2L, 1L, 2L,
"[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
"[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]",
invalidRowsQuery);
- if (dataTableDdl.contains("SALT_BUCKETS")) {
- expected =
- Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis,
- SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L,
- 2L, 1L, 2L,
- "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
- "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]",
- invalidRowsQuery);
- }
-
- assertRsValues(metadataRs, expected);
- String missingTargetQuery = metadataRs.getString("INVALID_ROWS_QUERY_MISSING_TARGET");
- rs = conn.createStatement().executeQuery(missingTargetQuery);
- assertTrue(rs.next());
- assertEquals(3, rs.getInt("ID"));
- assertFalse(rs.next());
- String badCoveredColQuery = metadataRs.getString("INVALID_ROWS_QUERY_BAD_COVERED_COL_VAL");
- rs = conn.createStatement().executeQuery(badCoveredColQuery);
- assertTrue(rs.next());
- assertEquals(2, rs.getInt("ID"));
- assertFalse(rs.next());
- }
-
- // assert the result set contains the expected values in the given order
- private void assertRsValues(ResultSet rs, List<? extends Object> expected)
- throws SQLException {
- for (int i = 0; i < expected.size(); i++) {
- assertEquals(expected.get(i), rs.getObject(i + 1));
- }
- }
-
- private void generateUniqueTableNames() {
- schemaName = generateUniqueName();
- dataTableName = generateUniqueName();
- dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
- indexTableName = generateUniqueName();
- indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
- }
-
- private void upsertIndexRow(String name, int id, int zip) throws SQLException {
- indexTableUpsertStmt.setString(1, name);
- indexTableUpsertStmt.setInt(2, id); // id
- indexTableUpsertStmt.setInt(3, zip); // bad zip
- indexTableUpsertStmt.setTimestamp(4, new Timestamp(testTime));
- indexTableUpsertStmt.executeUpdate();
}
- private void disableIndex() throws SQLException {
- conn.createStatement().execute(
- String.format("ALTER INDEX %s ON %S disable", indexTableName, dataTableFullName));
- conn.commit();
- }
-
- private String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize,
- SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat, Long maxOutputRows) {
- return getArgValues(schemaName, dataTable, indxTable, batchSize, sourceTable,
- outputInvalidRows, outputFormat, maxOutputRows, null, Long.MAX_VALUE);
- }
-
- private List<Job> runScrutinyCurrentSCN(String schemaName, String dataTableName, String indexTableName, Long scrutinyTS) throws Exception {
- return runScrutiny(
- getArgValues(schemaName, dataTableName, indexTableName, null, SourceTable.BOTH,
- false, null, null, null, scrutinyTS));
- }
-
- private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName) throws Exception {
- return runScrutiny(schemaName, dataTableName, indexTableName, null, null);
- }
-
- private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName,
- Long batchSize) throws Exception {
- return runScrutiny(schemaName, dataTableName, indexTableName, batchSize, null);
- }
+ assertRsValues(metadataRs, expected);
+ String missingTargetQuery = metadataRs.getString("INVALID_ROWS_QUERY_MISSING_TARGET");
+ rs = conn.createStatement().executeQuery(missingTargetQuery);
+ assertTrue(rs.next());
+ assertEquals(3, rs.getInt("ID"));
+ assertFalse(rs.next());
+ String badCoveredColQuery = metadataRs.getString("INVALID_ROWS_QUERY_BAD_COVERED_COL_VAL");
+ rs = conn.createStatement().executeQuery(badCoveredColQuery);
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt("ID"));
+ assertFalse(rs.next());
+ }
- private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName,
- Long batchSize, SourceTable sourceTable) throws Exception {
- final String[]
- cmdArgs =
- getArgValues(schemaName, dataTableName, indexTableName, batchSize, sourceTable,
- false, null, null, null, Long.MAX_VALUE);
- return runScrutiny(cmdArgs);
+ // assert the result set contains the expected values in the given order
+ private void assertRsValues(ResultSet rs, List<? extends Object> expected) throws SQLException {
+ for (int i = 0; i < expected.size(); i++) {
+ assertEquals(expected.get(i), rs.getObject(i + 1));
}
+ }
- private void upsertRow(PreparedStatement stmt, int id, String name, int zip)
- throws SQLException {
- int index = 1;
- // insert row
- stmt.setInt(index++, id);
- stmt.setString(index++, name);
- stmt.setInt(index++, zip);
- stmt.setTimestamp(index++, new Timestamp(testTime));
- stmt.executeUpdate();
- }
+ private void generateUniqueTableNames() {
+ schemaName = generateUniqueName();
+ dataTableName = generateUniqueName();
+ dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ indexTableName = generateUniqueName();
+ indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+ }
- private int deleteRow(String fullTableName, String whereCondition) throws SQLException {
- String deleteSql = String.format(DELETE_SQL, indexTableFullName) + whereCondition;
- PreparedStatement deleteStmt = conn.prepareStatement(deleteSql);
- return deleteStmt.executeUpdate();
- }
+ private void upsertIndexRow(String name, int id, int zip) throws SQLException {
+ indexTableUpsertStmt.setString(1, name);
+ indexTableUpsertStmt.setInt(2, id); // id
+ indexTableUpsertStmt.setInt(3, zip); // bad zip
+ indexTableUpsertStmt.setTimestamp(4, new Timestamp(testTime));
+ indexTableUpsertStmt.executeUpdate();
}
- public static class IndexScrutinyToolTenantIT extends SharedIndexToolIT {
- private Connection connGlobal = null;
- private Connection connTenant = null;
-
- private String tenantId;
- private String tenantViewName;
- private String indexNameTenant;
- private String multiTenantTable;
- private String viewIndexTableName;
-
- private final String createViewStr = "CREATE VIEW %s AS SELECT * FROM %s";
- private final String
- upsertQueryStr =
- "UPSERT INTO %s (COL1, ID, NAME) VALUES('%s' , %d, '%s')";
- private final String createIndexStr = "CREATE INDEX %s ON %s (NAME) ";
-
- /**
- * Create the test data
- */
- @Before public void setup() throws SQLException {
- tenantId = generateUniqueName();
- tenantViewName = generateUniqueName();
- indexNameTenant = generateUniqueName();
- multiTenantTable = generateUniqueName();
- viewIndexTableName = "_IDX_" + multiTenantTable;
-
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- connGlobal = DriverManager.getConnection(getUrl(), props);
-
- props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
- connTenant = DriverManager.getConnection(getUrl(), props);
- String
- createTblStr =
- "CREATE TABLE %s (COL1 VARCHAR(15) NOT NULL,ID INTEGER NOT NULL" + ", NAME VARCHAR, CONSTRAINT PK_1 PRIMARY KEY (COL1, ID)) MULTI_TENANT=true";
-
- createTestTable(getUrl(), String.format(createTblStr, multiTenantTable));
-
- connTenant.createStatement().execute(
- String.format(createViewStr, tenantViewName, multiTenantTable));
-
- String idxStmtTenant = String.format(createIndexStr, indexNameTenant, tenantViewName);
- connTenant.createStatement().execute(idxStmtTenant);
- }
+ private void disableIndex() throws SQLException {
+ conn.createStatement().execute(
+ String.format("ALTER INDEX %s ON %S disable", indexTableName, dataTableFullName));
+ conn.commit();
+ }
- @After public void teardown() throws SQLException {
- if (connGlobal != null) {
- connGlobal.close();
- }
- if (connTenant != null) {
- connTenant.close();
- }
- }
+ private String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize,
+ SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat, Long maxOutputRows) {
+ return getArgValues(schemaName, dataTable, indxTable, batchSize, sourceTable,
+ outputInvalidRows, outputFormat, maxOutputRows, null, Long.MAX_VALUE);
+ }
- /**
- * Tests that the config for max number of output rows is observed
- */
- @Test public void testTenantViewAndIndexEqual() throws Exception {
- connTenant.createStatement()
- .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 1, "x"));
- connTenant.commit();
-
- String[] argValues =
- getArgValues("", tenantViewName, indexNameTenant, 10L,
- SourceTable.INDEX_TABLE_SOURCE, false, null, null, tenantId,
- EnvironmentEdgeManager.currentTimeMillis());
-
- List<Job> completedJobs = runScrutiny(argValues);
- // Sunny case, both index and view are equal. 1 row
- assertEquals(1, completedJobs.size());
- for (Job job : completedJobs) {
- assertTrue(job.isSuccessful());
- Counters counters = job.getCounters();
- assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
- assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
- }
- }
+ private List<Job> runScrutinyCurrentSCN(String schemaName, String dataTableName, String indexTableName, Long scrutinyTS) throws Exception {
+ return runScrutiny(getArgValues(schemaName, dataTableName, indexTableName, null, SourceTable.BOTH,
+ false, null, null, null, scrutinyTS));
+ }
- /**
- * Tests global view on multi-tenant table should work too
- **/
- @Test public void testGlobalViewOnMultiTenantTable() throws Exception {
- String globalViewName = generateUniqueName();
- String indexNameGlobal = generateUniqueName();
-
- connGlobal.createStatement().execute(
- String.format(createViewStr, globalViewName, multiTenantTable));
-
- String idxStmtGlobal = String.format(createIndexStr, indexNameGlobal, globalViewName);
- connGlobal.createStatement().execute(idxStmtGlobal);
- connGlobal.createStatement()
- .execute(String.format(upsertQueryStr, globalViewName, "global", 5, "x"));
- connGlobal.commit();
- String[] argValues =
- getArgValues("", globalViewName, indexNameGlobal, 10L,
- SourceTable.INDEX_TABLE_SOURCE, false, null, null, null,
- EnvironmentEdgeManager.currentTimeMillis());
- List<Job> completedJobs = runScrutiny(argValues);
- // Sunny case, both index and view are equal. 1 row
- assertEquals(1, completedJobs.size());
- for (Job job : completedJobs) {
- assertTrue(job.isSuccessful());
- Counters counters = job.getCounters();
- assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
- assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
- }
- }
+ private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName)
+ throws Exception {
+ return runScrutiny(schemaName, dataTableName, indexTableName, null, null);
+ }
- /**
- * Use Both as source. Add 1 row to tenant view and disable index.
- * Add 1 more to view and add a wrong row to index.
- * Both have 1 invalid row, 1 valid row.
- **/
- @Test
- public void testOneValidOneInvalidUsingBothAsSource() throws Exception {
- connTenant.createStatement()
- .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 1, "x"));
- connTenant.commit();
- connTenant.createStatement().execute(
- String.format("ALTER INDEX %s ON %S disable", indexNameTenant, tenantViewName));
-
- connTenant.createStatement()
- .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 2, "x2"));
-
- connTenant.createStatement().execute(String.format("UPSERT INTO %s (\":ID\", \"0:NAME\") values (%d, '%s')",
- indexNameTenant, 5555, "wrongName"));
- connTenant.commit();
-
- String[]
- argValues =
- getArgValues("", tenantViewName, indexNameTenant, 10L, SourceTable.BOTH, false,
- null, null, tenantId, EnvironmentEdgeManager.currentTimeMillis());
- List<Job> completedJobs = runScrutiny(argValues);
-
- assertEquals(2, completedJobs.size());
- for (Job job : completedJobs) {
- assertTrue(job.isSuccessful());
- Counters counters = job.getCounters();
- assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
- assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
- }
- }
+ private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName,
+ Long batchSize) throws Exception {
+ return runScrutiny(schemaName, dataTableName, indexTableName, batchSize, null);
+ }
- /**
- * Add 3 rows to Tenant view.
- * Empty index table and observe they are not equal.
- * Use data table as source and output to file.
- **/
- @Test public void testWithEmptyIndexTableOutputToFile() throws Exception{
- testWithOutput(OutputFormat.FILE);
- }
+ private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName,
+ Long batchSize, SourceTable sourceTable) throws Exception {
+ final String[]
+ cmdArgs =
+ getArgValues(schemaName, dataTableName, indexTableName, batchSize, sourceTable,
+ false, null, null, null, Long.MAX_VALUE);
+ return runScrutiny(cmdArgs);
+ }
- @Test public void testWithEmptyIndexTableOutputToTable() throws Exception{
- testWithOutput(OutputFormat.TABLE);
- assertEquals(3, countRows(connGlobal, OUTPUT_TABLE_NAME));
- }
+ private void upsertRow(PreparedStatement stmt, int id, String name, int zip) throws SQLException {
+ int index = 1;
+ // insert row
+ stmt.setInt(index++, id);
+ stmt.setString(index++, name);
+ stmt.setInt(index++, zip);
+ stmt.setTimestamp(index++, new Timestamp(testTime));
+ stmt.executeUpdate();
+ }
- private void testWithOutput(OutputFormat outputFormat) throws Exception {
- connTenant.createStatement()
- .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 1, "x"));
- connTenant.createStatement()
- .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 2, "x2"));
- connTenant.createStatement()
- .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 3, "x3"));
- connTenant.createStatement().execute(String.format("UPSERT INTO %s (\":ID\", \"0:NAME\") values (%d, '%s')",
- indexNameTenant, 5555, "wrongName"));
- connTenant.commit();
-
- ConnectionQueryServices queryServices = connGlobal.unwrap(PhoenixConnection.class).getQueryServices();
- Admin admin = queryServices.getAdmin();
- TableName tableName = TableName.valueOf(viewIndexTableName);
- admin.disableTable(tableName);
- admin.truncateTable(tableName, false);
-
- String[]
- argValues =
- getArgValues("", tenantViewName, indexNameTenant, 10L, SourceTable.DATA_TABLE_SOURCE, true, outputFormat, null,
- tenantId, EnvironmentEdgeManager.currentTimeMillis());
- List<Job> completedJobs = runScrutiny(argValues);
-
- assertEquals(1, completedJobs.size());
- for (Job job : completedJobs) {
- assertTrue(job.isSuccessful());
- Counters counters = job.getCounters();
- assertEquals(0, getCounterValue(counters, VALID_ROW_COUNT));
- assertEquals(3, getCounterValue(counters, INVALID_ROW_COUNT));
- }
- }
+ private int deleteRow(String fullTableName, String whereCondition) throws SQLException {
+ String deleteSql = String.format(DELETE_SQL, indexTableFullName) + whereCondition;
+ PreparedStatement deleteStmt = conn.prepareStatement(deleteSql);
+ return deleteStmt.executeUpdate();
}
+
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
index df8c7ab..ae11b3d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
@@ -61,9 +61,10 @@ public class IndexColumnNames {
if (pindexTable.getViewIndexId() != null) {
offset++;
}
- if (pindexTable.isMultiTenant()) {
+ if (pindexTable.isMultiTenant() && pindexTable.getViewIndexId() != null) {
offset++;
}
+
if (offset > 0) {
pindexCols = pindexCols.subList(offset, pindexCols.size());
pkColumns = pkColumns.subList(offset, pkColumns.size());