You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ri...@apache.org on 2021/07/19 08:07:36 UTC
[phoenix] branch 5.1 updated: PHOENIX-6405 Disallow bulk loading
into non-empty tables with global secondary indexes
This is an automated email from the ASF dual-hosted git repository.
richardantal pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.1 by this push:
new ddf355c PHOENIX-6405 Disallow bulk loading into non-empty tables with global secondary indexes
ddf355c is described below
commit ddf355cd49f2befb667a356f2b680295f9dc035a
Author: Richard Antal <an...@gmail.com>
AuthorDate: Tue Jul 13 17:15:59 2021 +0200
PHOENIX-6405 Disallow bulk loading into non-empty tables with global secondary indexes
Change-Id: I55e2f6138f69add7ffa028baab6d8ea80681acf0
---
.../apache/phoenix/end2end/CsvBulkLoadToolIT.java | 97 +++++++++++++++++++++-
.../phoenix/mapreduce/AbstractBulkLoadTool.java | 22 ++++-
.../java/org/apache/phoenix/util/SchemaUtil.java | 2 +-
3 files changed, 118 insertions(+), 3 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
index f79e844..0b8672b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
@@ -113,6 +113,101 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT {
rs.close();
stmt.close();
}
+
+ @Test
+ public void testImportWithGlobalIndex() throws Exception {
+
+ Statement stmt = conn.createStatement();
+ stmt.execute("CREATE TABLE S.TABLE1 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, T DATE) SPLIT ON (1,2)");
+ stmt.execute("CREATE INDEX glob_idx ON S.TABLE1(ID, T)");
+ conn.commit();
+
+ FileSystem fs = FileSystem.get(getUtility().getConfiguration());
+ FSDataOutputStream outputStream = fs.create(new Path("/tmp/input1.csv"));
+ PrintWriter printWriter = new PrintWriter(outputStream);
+ printWriter.println("1,Name 1,1970/01/01");
+ printWriter.println("2,Name 2,1970/01/02");
+ printWriter.close();
+
+ fs = FileSystem.get(getUtility().getConfiguration());
+ outputStream = fs.create(new Path("/tmp/input2.csv"));
+ printWriter = new PrintWriter(outputStream);
+ printWriter.println("3,Name 3,1970/01/03");
+ printWriter.println("4,Name 4,1970/01/04");
+ printWriter.close();
+
+ CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
+ csvBulkLoadTool.setConf(new Configuration(getUtility().getConfiguration()));
+ csvBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd");
+ int exitCode = csvBulkLoadTool.run(new String[] {
+ "--input", "/tmp/input1.csv",
+ "--table", "table1",
+ "--schema", "s",
+ "--zookeeper", zkQuorum});
+ assertEquals(0, exitCode);
+
+ csvBulkLoadTool = new CsvBulkLoadTool();
+ csvBulkLoadTool.setConf(new Configuration(getUtility().getConfiguration()));
+ csvBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd");
+ try {
+ exitCode = csvBulkLoadTool.run(new String[] {
+ "--input", "/tmp/input2.csv",
+ "--table", "table1",
+ "--schema", "s",
+ "--zookeeper", zkQuorum});
+ fail("Bulk loading error should have happened earlier");
+ } catch (Exception e){
+ assertTrue(e.getMessage().contains("Bulk Loading error: Bulk loading is disabled for " +
+ "non empty tables with global indexes, because it will corrupt " +
+ "the global index table in most cases.\n" +
+ "Use the --corruptindexes option to override this check."));
+ }
+
+ ResultSet rs = stmt.executeQuery("SELECT id, name, t FROM s.table1 ORDER BY id");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertEquals("Name 1", rs.getString(2));
+ assertEquals(DateUtil.parseDate("1970-01-01"), rs.getDate(3));
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ assertEquals("Name 2", rs.getString(2));
+ assertEquals(DateUtil.parseDate("1970-01-02"), rs.getDate(3));
+ assertFalse(rs.next());
+
+ csvBulkLoadTool = new CsvBulkLoadTool();
+ csvBulkLoadTool.setConf(new Configuration(getUtility().getConfiguration()));
+ csvBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd");
+ exitCode = csvBulkLoadTool.run(new String[] {
+ "--input", "/tmp/input2.csv",
+ "--table", "table1",
+ "--schema", "s",
+ "--zookeeper", zkQuorum,
+ "--corruptindexes"});
+ assertEquals(0, exitCode);
+
+ rs = stmt.executeQuery("SELECT id, name, t FROM s.table1 ORDER BY id");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertEquals("Name 1", rs.getString(2));
+ assertEquals(DateUtil.parseDate("1970-01-01"), rs.getDate(3));
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ assertEquals("Name 2", rs.getString(2));
+ assertEquals(DateUtil.parseDate("1970-01-02"), rs.getDate(3));
+ assertTrue(rs.next());
+ assertEquals(3, rs.getInt(1));
+ assertEquals("Name 3", rs.getString(2));
+ assertEquals(DateUtil.parseDate("1970-01-03"), rs.getDate(3));
+ assertTrue(rs.next());
+ assertEquals(4, rs.getInt(1));
+ assertEquals("Name 4", rs.getString(2));
+ assertEquals(DateUtil.parseDate("1970-01-04"), rs.getDate(3));
+ assertFalse(rs.next());
+
+ rs.close();
+ stmt.close();
+ }
+
@Test
public void testImportWithRowTimestamp() throws Exception {
@@ -443,7 +538,7 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT {
checkIndexTableIsVerified(indexTableName);
}
}
-
+
@Test
public void testInvalidArguments() {
String tableName = "TABLE8";
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index b777fbc..05a4116 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -88,6 +88,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
static final Option IGNORE_ERRORS_OPT = new Option("g", "ignore-errors", false, "Ignore input errors");
static final Option HELP_OPT = new Option("h", "help", false, "Show this help and quit");
static final Option SKIP_HEADER_OPT = new Option("k", "skip-header", false, "Skip the first line of CSV files (the header)");
+ static final Option ENABLE_CORRUPT_INDEXES = new Option( "corruptindexes", "corruptindexes", false, "Allow bulk loading into non-empty tables with global secondary indexes");
/**
* Set configuration values based on parsed command line options.
@@ -112,6 +113,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
options.addOption(IGNORE_ERRORS_OPT);
options.addOption(HELP_OPT);
options.addOption(SKIP_HEADER_OPT);
+ options.addOption(ENABLE_CORRUPT_INDEXES);
return options;
}
@@ -226,6 +228,12 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
configureOptions(cmdLine, importColumns, conf);
String sName = SchemaUtil.normalizeIdentifier(schemaName);
String tName = SchemaUtil.normalizeIdentifier(tableName);
+
+ String tn = SchemaUtil.getEscapedTableName(sName, tName);
+ ResultSet rsempty = conn.createStatement().executeQuery("SELECT * FROM " + tn + " LIMIT 1");
+ boolean tableNotEmpty = rsempty.next();
+ rsempty.close();
+
try {
validateTable(conn, sName, tName);
} finally {
@@ -244,14 +252,26 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
PTable table = PhoenixRuntime.getTable(conn, qualifiedTableName);
tablesToBeLoaded.add(new TargetTableRef(qualifiedTableName, table.getPhysicalName().getString()));
boolean hasLocalIndexes = false;
+ boolean hasGlobalIndexes = false;
for(PTable index: table.getIndexes()) {
if (index.getIndexType() == IndexType.LOCAL) {
hasLocalIndexes =
qualifiedIndexTableName == null ? true : index.getTableName().getString()
.equals(qualifiedIndexTableName);
- if (hasLocalIndexes) break;
+ if (hasLocalIndexes && hasGlobalIndexes) break;
+ }
+ if (index.getIndexType() == IndexType.GLOBAL) {
+ hasGlobalIndexes = true;
+ if (hasLocalIndexes && hasGlobalIndexes) break;
}
}
+
+ if(hasGlobalIndexes && tableNotEmpty && !cmdLine.hasOption(ENABLE_CORRUPT_INDEXES.getOpt())){
+ throw new IllegalStateException("Bulk Loading error: Bulk loading is disabled for non" +
+ " empty tables with global indexes, because it will corrupt the global index table in most cases.\n" +
+ "Use the --corruptindexes option to override this check.");
+ }
+
// using conn after it's been closed... o.O
tablesToBeLoaded.addAll(getIndexTables(conn, qualifiedTableName));
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index 445456d..c039310 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -669,7 +669,7 @@ public class SchemaUtil {
if (schemaName == null || schemaName.length() == 0) {
return "\"" + tableName + "\"";
}
- return "\"" + schemaName + "\"." + "\"" + tableName + "\"";
+ return "\"" + schemaName + "\"" + QueryConstants.NAME_SEPARATOR + "\"" + tableName + "\"";
}
protected static PhoenixConnection addMetaDataColumn(PhoenixConnection conn, long scn, String columnDef) throws SQLException {