You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/09/29 21:04:34 UTC

phoenix git commit: PHOENIX-4238 MR IndexScrutinyTool break with salted tables and indexes on views

Repository: phoenix
Updated Branches:
  refs/heads/master c1ef11289 -> 62defe432


PHOENIX-4238 MR IndexScrutinyTool break with salted tables and indexes on views


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/62defe43
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/62defe43
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/62defe43

Branch: refs/heads/master
Commit: 62defe43212f83c2f8152af162554ea49046bd62
Parents: c1ef112
Author: James Taylor <jt...@salesforce.com>
Authored: Fri Sep 29 14:04:35 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Fri Sep 29 14:04:35 2017 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/IndexScrutinyToolIT.java    | 93 ++++++++++++++++----
 .../mapreduce/util/IndexColumnNames.java        | 16 +++-
 2 files changed, 88 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/62defe43/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
index f868cef..f2384ec 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
@@ -28,14 +28,20 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Timestamp;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
+import java.util.TreeSet;
 import java.util.UUID;
 
+import com.google.common.collect.Sets;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.mapreduce.Counters;
@@ -62,21 +68,23 @@ import org.junit.experimental.categories.Category;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 /**
  * Tests for the {@link IndexScrutinyTool}
  */
 @Category(NeedsOwnMiniClusterTest.class)
+@RunWith(Parameterized.class)
 public class IndexScrutinyToolIT extends BaseTest {
 
-    private static final String DATA_TABLE_DDL =
-            "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR)";
-
-    private static final String INDEX_TABLE_DDL = "CREATE INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)";
+    private String dataTableDdl;
+    private String indexTableDdl;
 
     private static final String UPSERT_SQL = "UPSERT INTO %s VALUES(?,?,?,?)";
 
-    private static final String INDEX_UPSERT_SQL = "UPSERT INTO %s (\"0:NAME\", \":ID\", \"0:ZIP\", \"0:EMPLOY_DATE\") values (?,?,?,?)";
+    private static final String INDEX_UPSERT_SQL =
+        "UPSERT INTO %s (\"0:NAME\", \":ID\", \"0:ZIP\", \"0:EMPLOY_DATE\") values (?,?,?,?)";
 
     private static final String DELETE_SQL = "DELETE FROM %s ";
 
@@ -95,6 +103,18 @@ public class IndexScrutinyToolIT extends BaseTest {
 
     private long testTime;
 
+    @Parameterized.Parameters
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+            { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR)", "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" },
+            { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2", "CREATE INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" } });
+    }
+
+    public IndexScrutinyToolIT(String dataTableDdl, String indexTableDdl) {
+        this.dataTableDdl = dataTableDdl;
+        this.indexTableDdl = indexTableDdl;
+    }
+
     @BeforeClass
     public static void doSetup() throws Exception {
         Map<String, String> props = Maps.newHashMap();
@@ -107,9 +127,9 @@ public class IndexScrutinyToolIT extends BaseTest {
     @Before
     public void setup() throws SQLException {
         generateUniqueTableNames();
-        createTestTable(getUrl(), String.format(DATA_TABLE_DDL, dataTableFullName));
+        createTestTable(getUrl(), String.format(dataTableDdl, dataTableFullName));
         createTestTable(getUrl(),
-            String.format(INDEX_TABLE_DDL, indexTableName, dataTableFullName));
+            String.format(indexTableDdl, indexTableName, dataTableFullName));
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         conn = DriverManager.getConnection(getUrl(), props);
         String dataTableUpsert = String.format(UPSERT_SQL, dataTableFullName);
@@ -340,17 +360,41 @@ public class IndexScrutinyToolIT extends BaseTest {
         // check the output files
         Path outputPath = CsvBulkImportUtil.getOutputPath(new Path(outputDir), dataTableFullName);
         DistributedFileSystem fs = getUtility().getDFSCluster().getFileSystem();
-        Path outputFilePath = new Path(outputPath, "part-m-00000");
+        List<Path> paths = Lists.newArrayList();
+        Path firstPart = null;
+        for (FileStatus outputFile : fs.listStatus(outputPath)) {
+            if (outputFile.getPath().getName().startsWith("part")) {
+                if (firstPart == null) {
+                    firstPart = outputFile.getPath();
+                } else {
+                    paths.add(outputFile.getPath());
+                }
+            }
+        }
+        if (dataTableDdl.contains("SALT_BUCKETS")) {
+            fs.concat(firstPart, paths.toArray(new Path[0]));
+        }
+        Path outputFilePath = firstPart;
         assertTrue(fs.exists(outputFilePath));
         FSDataInputStream fsDataInputStream = fs.open(outputFilePath);
         BufferedReader reader = new BufferedReader(new InputStreamReader(fsDataInputStream));
+        TreeSet<String> lines = Sets.newTreeSet();
         try {
-            assertEquals("[2, name-2, " + new Timestamp(testTime).toString() + ", 95123]\t[2, name-2, " + new Timestamp(testTime).toString() + ", 9999]", reader.readLine());
-            assertEquals("[3, name-3, " + new Timestamp(testTime).toString() + ", 95123]\tTarget row not found", reader.readLine());
+            String line = null;
+            while ((line = reader.readLine()) != null) {
+                lines.add(line);
+            }
         } finally {
-            reader.close();
-            fsDataInputStream.close();
+            IOUtils.closeQuietly(reader);
+            IOUtils.closeQuietly(fsDataInputStream);
         }
+        Iterator<String> lineIterator = lines.iterator();
+        assertEquals(
+            "[2, name-2, " + new Timestamp(testTime).toString() + ", 95123]\t[2, name-2, " + new Timestamp(testTime)
+                .toString() + ", 9999]", lineIterator.next());
+        assertEquals("[3, name-3, " + new Timestamp(testTime).toString() + ", 95123]\tTarget row not found",
+            lineIterator.next());
+
     }
 
     /**
@@ -415,7 +459,12 @@ public class IndexScrutinyToolIT extends BaseTest {
                     scrutinyTimeMillis);
         ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery);
         assertTrue(rs.next());
-        assertFalse(rs.next());
+        if (dataTableDdl.contains("SALT_BUCKETS")) {
+            assertTrue(rs.next());
+            assertFalse(rs.next());
+        } else {
+            assertFalse(rs.next());
+        }
     }
 
     private SourceTargetColumnNames getColNames() throws SQLException {
@@ -452,10 +501,17 @@ public class IndexScrutinyToolIT extends BaseTest {
                     indexTableFullName, scrutinyTimeMillis);
         assertTrue(metadataRs.next());
         List<? extends Object> expected =
-                Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis,
-                    SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L,
-                    2L, 1L, 1L, "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
-                    "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]", invalidRowsQuery);
+            Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis,
+                SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L,
+                2L, 1L, 1L, "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
+                "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]", invalidRowsQuery);
+        if (dataTableDdl.contains("SALT_BUCKETS")) {
+            expected = Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis,
+                SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L,
+                2L, 1L, 2L, "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
+                "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]", invalidRowsQuery);
+        }
+
         assertRsValues(metadataRs, expected);
         String missingTargetQuery = metadataRs.getString("INVALID_ROWS_QUERY_MISSING_TARGET");
         rs = conn.createStatement().executeQuery(missingTargetQuery);
@@ -485,8 +541,7 @@ public class IndexScrutinyToolIT extends BaseTest {
     }
 
     private int countRows(String tableFullName) throws SQLException {
-        ResultSet count =
-                conn.createStatement().executeQuery("select count(*) from " + tableFullName);
+        ResultSet count = conn.createStatement().executeQuery("select count(*) from " + tableFullName);
         count.next();
         int numRows = count.getInt(1);
         return numRows;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/62defe43/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
index 5daa1ed..6f2959f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
@@ -52,7 +52,19 @@ public class IndexColumnNames {
         this.pdataTable = pdataTable;
         this.pindexTable = pindexTable;
         List<PColumn> pindexCols = pindexTable.getColumns();
+        List<PColumn> pkColumns = pindexTable.getPKColumns();
         Set<String> indexColsAdded = new HashSet<String>();
+        int offset = 0;
+        if (pindexTable.getBucketNum() != null) {
+            offset++;
+        }
+        if (pindexTable.getViewIndexId() != null) {
+            offset++;
+        }
+        if (offset > 0) {
+            pindexCols = pindexCols.subList(offset, pindexCols.size());
+            pkColumns = pkColumns.subList(offset, pkColumns.size());
+        }
 
         // first add the data pk columns
         for (PColumn indexCol : pindexCols) {
@@ -68,7 +80,7 @@ public class IndexColumnNames {
         }
 
         // then the rest of the index pk columns
-        for (PColumn indexPkCol : pindexTable.getPKColumns()) {
+        for (PColumn indexPkCol : pkColumns) {
             String indexColName = indexPkCol.getName().getString();
             if (!indexColsAdded.contains(indexColName)) {
                 indexPkColNames.add(indexColName);
@@ -81,7 +93,7 @@ public class IndexColumnNames {
         }
 
         // then the covered columns (rest of the columns)
-        for (PColumn indexCol : pindexTable.getColumns()) {
+        for (PColumn indexCol : pindexCols) {
             String indexColName = indexCol.getName().getString();
             if (!indexColsAdded.contains(indexColName)) {
                 indexNonPkColNames.add(indexColName);