You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ri...@apache.org on 2021/07/19 08:07:36 UTC

[phoenix] branch 5.1 updated: PHOENIX-6405 Disallow bulk loading into non-empty tables with global secondary indexes

This is an automated email from the ASF dual-hosted git repository.

richardantal pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.1 by this push:
     new ddf355c  PHOENIX-6405 Disallow bulk loading into non-empty tables with global secondary indexes
ddf355c is described below

commit ddf355cd49f2befb667a356f2b680295f9dc035a
Author: Richard Antal <an...@gmail.com>
AuthorDate: Tue Jul 13 17:15:59 2021 +0200

    PHOENIX-6405 Disallow bulk loading into non-empty tables with global secondary indexes
    
    Change-Id: I55e2f6138f69add7ffa028baab6d8ea80681acf0
---
 .../apache/phoenix/end2end/CsvBulkLoadToolIT.java  | 97 +++++++++++++++++++++-
 .../phoenix/mapreduce/AbstractBulkLoadTool.java    | 22 ++++-
 .../java/org/apache/phoenix/util/SchemaUtil.java   |  2 +-
 3 files changed, 118 insertions(+), 3 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
index f79e844..0b8672b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
@@ -113,6 +113,101 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT {
         rs.close();
         stmt.close();
     }
+
+    @Test
+    public void testImportWithGlobalIndex() throws Exception {
+
+        Statement stmt = conn.createStatement();
+        stmt.execute("CREATE TABLE S.TABLE1 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, T DATE) SPLIT ON (1,2)");
+        stmt.execute("CREATE INDEX glob_idx ON S.TABLE1(ID, T)");
+        conn.commit();
+
+        FileSystem fs = FileSystem.get(getUtility().getConfiguration());
+        FSDataOutputStream outputStream = fs.create(new Path("/tmp/input1.csv"));
+        PrintWriter printWriter = new PrintWriter(outputStream);
+        printWriter.println("1,Name 1,1970/01/01");
+        printWriter.println("2,Name 2,1970/01/02");
+        printWriter.close();
+
+        fs = FileSystem.get(getUtility().getConfiguration());
+        outputStream = fs.create(new Path("/tmp/input2.csv"));
+        printWriter = new PrintWriter(outputStream);
+        printWriter.println("3,Name 3,1970/01/03");
+        printWriter.println("4,Name 4,1970/01/04");
+        printWriter.close();
+
+        CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
+        csvBulkLoadTool.setConf(new Configuration(getUtility().getConfiguration()));
+        csvBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd");
+        int exitCode = csvBulkLoadTool.run(new String[] {
+                "--input", "/tmp/input1.csv",
+                "--table", "table1",
+                "--schema", "s",
+                "--zookeeper", zkQuorum});
+        assertEquals(0, exitCode);
+
+        csvBulkLoadTool = new CsvBulkLoadTool();
+        csvBulkLoadTool.setConf(new Configuration(getUtility().getConfiguration()));
+        csvBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd");
+        try {
+            exitCode = csvBulkLoadTool.run(new String[] {
+                    "--input", "/tmp/input2.csv",
+                    "--table", "table1",
+                    "--schema", "s",
+                    "--zookeeper", zkQuorum});
+            fail("Bulk loading error should have happened earlier");
+        } catch (Exception e){
+            assertTrue(e.getMessage().contains("Bulk Loading error: Bulk loading is disabled for " +
+                    "non empty tables with global indexes, because it will corrupt " +
+                    "the global index table in most cases.\n" +
+                    "Use the --corruptindexes option to override this check."));
+        }
+
+        ResultSet rs = stmt.executeQuery("SELECT id, name, t FROM s.table1 ORDER BY id");
+        assertTrue(rs.next());
+        assertEquals(1, rs.getInt(1));
+        assertEquals("Name 1", rs.getString(2));
+        assertEquals(DateUtil.parseDate("1970-01-01"), rs.getDate(3));
+        assertTrue(rs.next());
+        assertEquals(2, rs.getInt(1));
+        assertEquals("Name 2", rs.getString(2));
+        assertEquals(DateUtil.parseDate("1970-01-02"), rs.getDate(3));
+        assertFalse(rs.next());
+
+        csvBulkLoadTool = new CsvBulkLoadTool();
+        csvBulkLoadTool.setConf(new Configuration(getUtility().getConfiguration()));
+        csvBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd");
+        exitCode = csvBulkLoadTool.run(new String[] {
+                "--input", "/tmp/input2.csv",
+                "--table", "table1",
+                "--schema", "s",
+                "--zookeeper", zkQuorum,
+                "--corruptindexes"});
+        assertEquals(0, exitCode);
+
+        rs = stmt.executeQuery("SELECT id, name, t FROM s.table1 ORDER BY id");
+        assertTrue(rs.next());
+        assertEquals(1, rs.getInt(1));
+        assertEquals("Name 1", rs.getString(2));
+        assertEquals(DateUtil.parseDate("1970-01-01"), rs.getDate(3));
+        assertTrue(rs.next());
+        assertEquals(2, rs.getInt(1));
+        assertEquals("Name 2", rs.getString(2));
+        assertEquals(DateUtil.parseDate("1970-01-02"), rs.getDate(3));
+        assertTrue(rs.next());
+        assertEquals(3, rs.getInt(1));
+        assertEquals("Name 3", rs.getString(2));
+        assertEquals(DateUtil.parseDate("1970-01-03"), rs.getDate(3));
+        assertTrue(rs.next());
+        assertEquals(4, rs.getInt(1));
+        assertEquals("Name 4", rs.getString(2));
+        assertEquals(DateUtil.parseDate("1970-01-04"), rs.getDate(3));
+        assertFalse(rs.next());
+
+        rs.close();
+        stmt.close();
+    }
+
     @Test
     public void testImportWithRowTimestamp() throws Exception {
 
@@ -443,7 +538,7 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT {
             checkIndexTableIsVerified(indexTableName);
         }
     }
-    
+
     @Test
     public void testInvalidArguments() {
         String tableName = "TABLE8";
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index b777fbc..05a4116 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -88,6 +88,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
     static final Option IGNORE_ERRORS_OPT = new Option("g", "ignore-errors", false, "Ignore input errors");
     static final Option HELP_OPT = new Option("h", "help", false, "Show this help and quit");
     static final Option SKIP_HEADER_OPT = new Option("k", "skip-header", false, "Skip the first line of CSV files (the header)");
+    static final Option ENABLE_CORRUPT_INDEXES = new Option( "corruptindexes", "corruptindexes", false, "Allow bulk loading into non-empty tables with global secondary indexes");
 
     /**
      * Set configuration values based on parsed command line options.
@@ -112,6 +113,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
         options.addOption(IGNORE_ERRORS_OPT);
         options.addOption(HELP_OPT);
         options.addOption(SKIP_HEADER_OPT);
+        options.addOption(ENABLE_CORRUPT_INDEXES);
         return options;
     }
 
@@ -226,6 +228,12 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
         configureOptions(cmdLine, importColumns, conf);
         String sName = SchemaUtil.normalizeIdentifier(schemaName);
         String tName = SchemaUtil.normalizeIdentifier(tableName);
+
+        String tn = SchemaUtil.getEscapedTableName(sName, tName);
+        ResultSet rsempty = conn.createStatement().executeQuery("SELECT * FROM " + tn + " LIMIT 1");
+        boolean tableNotEmpty = rsempty.next();
+        rsempty.close();
+
         try {
             validateTable(conn, sName, tName);
         } finally {
@@ -244,14 +252,26 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
         PTable table = PhoenixRuntime.getTable(conn, qualifiedTableName);
         tablesToBeLoaded.add(new TargetTableRef(qualifiedTableName, table.getPhysicalName().getString()));
         boolean hasLocalIndexes = false;
+        boolean hasGlobalIndexes = false;
         for(PTable index: table.getIndexes()) {
             if (index.getIndexType() == IndexType.LOCAL) {
                 hasLocalIndexes =
                         qualifiedIndexTableName == null ? true : index.getTableName().getString()
                                 .equals(qualifiedIndexTableName);
-                if (hasLocalIndexes) break;
+                if (hasLocalIndexes && hasGlobalIndexes) break;
+            }
+            if (index.getIndexType() == IndexType.GLOBAL) {
+                hasGlobalIndexes = true;
+                if (hasLocalIndexes && hasGlobalIndexes) break;
             }
         }
+
+        if(hasGlobalIndexes && tableNotEmpty && !cmdLine.hasOption(ENABLE_CORRUPT_INDEXES.getOpt())){
+            throw new IllegalStateException("Bulk Loading error: Bulk loading is disabled for non" +
+                    " empty tables with global indexes, because it will corrupt the global index table in most cases.\n" +
+                    "Use the --corruptindexes option to override this check.");
+        }
+
         // using conn after it's been closed... o.O
         tablesToBeLoaded.addAll(getIndexTables(conn, qualifiedTableName));
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index 445456d..c039310 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -669,7 +669,7 @@ public class SchemaUtil {
         if (schemaName == null || schemaName.length() == 0) {
             return "\"" + tableName + "\"";
         }
-        return "\"" + schemaName + "\"." + "\"" + tableName + "\"";
+        return "\"" + schemaName + "\"" + QueryConstants.NAME_SEPARATOR + "\"" + tableName + "\"";
     }
 
     protected static PhoenixConnection addMetaDataColumn(PhoenixConnection conn, long scn, String columnDef) throws SQLException {