You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/09/29 21:04:34 UTC
phoenix git commit: PHOENIX-4238 MR IndexScrutinyTool break with
salted tables and indexes on views
Repository: phoenix
Updated Branches:
refs/heads/master c1ef11289 -> 62defe432
PHOENIX-4238 MR IndexScrutinyTool break with salted tables and indexes on views
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/62defe43
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/62defe43
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/62defe43
Branch: refs/heads/master
Commit: 62defe43212f83c2f8152af162554ea49046bd62
Parents: c1ef112
Author: James Taylor <jt...@salesforce.com>
Authored: Fri Sep 29 14:04:35 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Fri Sep 29 14:04:35 2017 -0700
----------------------------------------------------------------------
.../phoenix/end2end/IndexScrutinyToolIT.java | 93 ++++++++++++++++----
.../mapreduce/util/IndexColumnNames.java | 16 +++-
2 files changed, 88 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/62defe43/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
index f868cef..f2384ec 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
@@ -28,14 +28,20 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
+import java.util.TreeSet;
import java.util.UUID;
+import com.google.common.collect.Sets;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.mapreduce.Counters;
@@ -62,21 +68,23 @@ import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
/**
* Tests for the {@link IndexScrutinyTool}
*/
@Category(NeedsOwnMiniClusterTest.class)
+@RunWith(Parameterized.class)
public class IndexScrutinyToolIT extends BaseTest {
- private static final String DATA_TABLE_DDL =
- "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR)";
-
- private static final String INDEX_TABLE_DDL = "CREATE INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)";
+ private String dataTableDdl;
+ private String indexTableDdl;
private static final String UPSERT_SQL = "UPSERT INTO %s VALUES(?,?,?,?)";
- private static final String INDEX_UPSERT_SQL = "UPSERT INTO %s (\"0:NAME\", \":ID\", \"0:ZIP\", \"0:EMPLOY_DATE\") values (?,?,?,?)";
+ private static final String INDEX_UPSERT_SQL =
+ "UPSERT INTO %s (\"0:NAME\", \":ID\", \"0:ZIP\", \"0:EMPLOY_DATE\") values (?,?,?,?)";
private static final String DELETE_SQL = "DELETE FROM %s ";
@@ -95,6 +103,18 @@ public class IndexScrutinyToolIT extends BaseTest {
private long testTime;
+ @Parameterized.Parameters
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR)", "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" },
+ { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2", "CREATE INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" } });
+ }
+
+ public IndexScrutinyToolIT(String dataTableDdl, String indexTableDdl) {
+ this.dataTableDdl = dataTableDdl;
+ this.indexTableDdl = indexTableDdl;
+ }
+
@BeforeClass
public static void doSetup() throws Exception {
Map<String, String> props = Maps.newHashMap();
@@ -107,9 +127,9 @@ public class IndexScrutinyToolIT extends BaseTest {
@Before
public void setup() throws SQLException {
generateUniqueTableNames();
- createTestTable(getUrl(), String.format(DATA_TABLE_DDL, dataTableFullName));
+ createTestTable(getUrl(), String.format(dataTableDdl, dataTableFullName));
createTestTable(getUrl(),
- String.format(INDEX_TABLE_DDL, indexTableName, dataTableFullName));
+ String.format(indexTableDdl, indexTableName, dataTableFullName));
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
conn = DriverManager.getConnection(getUrl(), props);
String dataTableUpsert = String.format(UPSERT_SQL, dataTableFullName);
@@ -340,17 +360,41 @@ public class IndexScrutinyToolIT extends BaseTest {
// check the output files
Path outputPath = CsvBulkImportUtil.getOutputPath(new Path(outputDir), dataTableFullName);
DistributedFileSystem fs = getUtility().getDFSCluster().getFileSystem();
- Path outputFilePath = new Path(outputPath, "part-m-00000");
+ List<Path> paths = Lists.newArrayList();
+ Path firstPart = null;
+ for (FileStatus outputFile : fs.listStatus(outputPath)) {
+ if (outputFile.getPath().getName().startsWith("part")) {
+ if (firstPart == null) {
+ firstPart = outputFile.getPath();
+ } else {
+ paths.add(outputFile.getPath());
+ }
+ }
+ }
+ if (dataTableDdl.contains("SALT_BUCKETS")) {
+ fs.concat(firstPart, paths.toArray(new Path[0]));
+ }
+ Path outputFilePath = firstPart;
assertTrue(fs.exists(outputFilePath));
FSDataInputStream fsDataInputStream = fs.open(outputFilePath);
BufferedReader reader = new BufferedReader(new InputStreamReader(fsDataInputStream));
+ TreeSet<String> lines = Sets.newTreeSet();
try {
- assertEquals("[2, name-2, " + new Timestamp(testTime).toString() + ", 95123]\t[2, name-2, " + new Timestamp(testTime).toString() + ", 9999]", reader.readLine());
- assertEquals("[3, name-3, " + new Timestamp(testTime).toString() + ", 95123]\tTarget row not found", reader.readLine());
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ lines.add(line);
+ }
} finally {
- reader.close();
- fsDataInputStream.close();
+ IOUtils.closeQuietly(reader);
+ IOUtils.closeQuietly(fsDataInputStream);
}
+ Iterator<String> lineIterator = lines.iterator();
+ assertEquals(
+ "[2, name-2, " + new Timestamp(testTime).toString() + ", 95123]\t[2, name-2, " + new Timestamp(testTime)
+ .toString() + ", 9999]", lineIterator.next());
+ assertEquals("[3, name-3, " + new Timestamp(testTime).toString() + ", 95123]\tTarget row not found",
+ lineIterator.next());
+
}
/**
@@ -415,7 +459,12 @@ public class IndexScrutinyToolIT extends BaseTest {
scrutinyTimeMillis);
ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery);
assertTrue(rs.next());
- assertFalse(rs.next());
+ if (dataTableDdl.contains("SALT_BUCKETS")) {
+ assertTrue(rs.next());
+ assertFalse(rs.next());
+ } else {
+ assertFalse(rs.next());
+ }
}
private SourceTargetColumnNames getColNames() throws SQLException {
@@ -452,10 +501,17 @@ public class IndexScrutinyToolIT extends BaseTest {
indexTableFullName, scrutinyTimeMillis);
assertTrue(metadataRs.next());
List<? extends Object> expected =
- Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis,
- SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L,
- 2L, 1L, 1L, "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
- "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]", invalidRowsQuery);
+ Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis,
+ SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L,
+ 2L, 1L, 1L, "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
+ "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]", invalidRowsQuery);
+ if (dataTableDdl.contains("SALT_BUCKETS")) {
+ expected = Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis,
+ SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L,
+ 2L, 1L, 2L, "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
+ "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]", invalidRowsQuery);
+ }
+
assertRsValues(metadataRs, expected);
String missingTargetQuery = metadataRs.getString("INVALID_ROWS_QUERY_MISSING_TARGET");
rs = conn.createStatement().executeQuery(missingTargetQuery);
@@ -485,8 +541,7 @@ public class IndexScrutinyToolIT extends BaseTest {
}
private int countRows(String tableFullName) throws SQLException {
- ResultSet count =
- conn.createStatement().executeQuery("select count(*) from " + tableFullName);
+ ResultSet count = conn.createStatement().executeQuery("select count(*) from " + tableFullName);
count.next();
int numRows = count.getInt(1);
return numRows;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/62defe43/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
index 5daa1ed..6f2959f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
@@ -52,7 +52,19 @@ public class IndexColumnNames {
this.pdataTable = pdataTable;
this.pindexTable = pindexTable;
List<PColumn> pindexCols = pindexTable.getColumns();
+ List<PColumn> pkColumns = pindexTable.getPKColumns();
Set<String> indexColsAdded = new HashSet<String>();
+ int offset = 0;
+ if (pindexTable.getBucketNum() != null) {
+ offset++;
+ }
+ if (pindexTable.getViewIndexId() != null) {
+ offset++;
+ }
+ if (offset > 0) {
+ pindexCols = pindexCols.subList(offset, pindexCols.size());
+ pkColumns = pkColumns.subList(offset, pkColumns.size());
+ }
// first add the data pk columns
for (PColumn indexCol : pindexCols) {
@@ -68,7 +80,7 @@ public class IndexColumnNames {
}
// then the rest of the index pk columns
- for (PColumn indexPkCol : pindexTable.getPKColumns()) {
+ for (PColumn indexPkCol : pkColumns) {
String indexColName = indexPkCol.getName().getString();
if (!indexColsAdded.contains(indexColName)) {
indexPkColNames.add(indexColName);
@@ -81,7 +93,7 @@ public class IndexColumnNames {
}
// then the covered columns (rest of the columns)
- for (PColumn indexCol : pindexTable.getColumns()) {
+ for (PColumn indexCol : pindexCols) {
String indexColName = indexCol.getName().getString();
if (!indexColsAdded.contains(indexColName)) {
indexNonPkColNames.add(indexColName);