You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/03/04 23:40:42 UTC
[07/50] [abbrv] phoenix git commit: PHOENIX-1248 CsvBulkLoadTool is
failing with IAE when local index specified for --index-table
parameter(Gabriel Reid)
PHOENIX-1248 CsvBulkLoadTool is failing with IAE when local index specified for --index-table parameter(Gabriel Reid)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d6e7846f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d6e7846f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d6e7846f
Branch: refs/heads/calcite
Commit: d6e7846f491e09780a2d663cc5c23bc21244e26c
Parents: 3f48938
Author: Rajeshbabu Chintaguntla <ra...@apache.org>
Authored: Sun Feb 1 09:28:29 2015 -0800
Committer: Rajeshbabu Chintaguntla <ra...@apache.org>
Committed: Sun Feb 1 09:28:29 2015 -0800
----------------------------------------------------------------------
.../phoenix/mapreduce/CsvBulkLoadToolIT.java | 79 ++++++++++++++----
.../phoenix/mapreduce/CsvBulkLoadTool.java | 85 ++++++++++++++------
.../phoenix/mapreduce/CsvToKeyValueMapper.java | 3 +-
3 files changed, 126 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d6e7846f/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
index 4373f47..0501142 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
@@ -17,12 +17,6 @@
*/
package org.apache.phoenix.mapreduce;
-import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -42,6 +36,12 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
@Category(NeedsOwnMiniClusterTest.class)
public class CsvBulkLoadToolIT {
@@ -191,17 +191,62 @@ public class CsvBulkLoadToolIT {
rs.close();
stmt.close();
}
-
+
@Test
- public void testImportOneIndexTable() throws Exception {
+ public void testImportWithLocalIndex() throws Exception {
Statement stmt = conn.createStatement();
- stmt.execute("CREATE TABLE TABLE4 (ID INTEGER NOT NULL PRIMARY KEY, " +
- "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
- String ddl = "CREATE INDEX TABLE4_IDX ON TABLE4 "
+ stmt.execute("CREATE TABLE TABLE6 (ID INTEGER NOT NULL PRIMARY KEY, " +
+ "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
+ String ddl = "CREATE LOCAL INDEX TABLE6_IDX ON TABLE6 "
+ " (FIRST_NAME ASC)";
stmt.execute(ddl);
-
+
+ FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
+ FSDataOutputStream outputStream = fs.create(new Path("/tmp/input3.csv"));
+ PrintWriter printWriter = new PrintWriter(outputStream);
+ printWriter.println("1,FirstName 1,LastName 1");
+ printWriter.println("2,FirstName 2,LastName 2");
+ printWriter.close();
+
+ CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
+ csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
+ int exitCode = csvBulkLoadTool.run(new String[] {
+ "--input", "/tmp/input3.csv",
+ "--table", "table6",
+ "--zookeeper", zkQuorum});
+ assertEquals(0, exitCode);
+
+ ResultSet rs = stmt.executeQuery("SELECT id, FIRST_NAME FROM TABLE6 where first_name='FirstName 2'");
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ assertEquals("FirstName 2", rs.getString(2));
+
+ rs.close();
+ stmt.close();
+ }
+
+ @Test
+ public void testImportOneIndexTable() throws Exception {
+ testImportOneIndexTable("TABLE4", false);
+ }
+
+ @Test
+ public void testImportOneLocalIndexTable() throws Exception {
+ testImportOneIndexTable("TABLE5", true);
+ }
+
+ public void testImportOneIndexTable(String tableName, boolean localIndex) throws Exception {
+
+ String indexTableName = String.format("%s_IDX", tableName);
+ Statement stmt = conn.createStatement();
+ stmt.execute("CREATE TABLE " + tableName + "(ID INTEGER NOT NULL PRIMARY KEY, "
+ + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
+ String ddl =
+ "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexTableName + " ON "
+ + tableName + "(FIRST_NAME ASC)";
+ stmt.execute(ddl);
+
FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
FSDataOutputStream outputStream = fs.create(new Path("/tmp/input4.csv"));
PrintWriter printWriter = new PrintWriter(outputStream);
@@ -213,14 +258,14 @@ public class CsvBulkLoadToolIT {
csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
int exitCode = csvBulkLoadTool.run(new String[] {
"--input", "/tmp/input4.csv",
- "--table", "table4",
- "--index-table", "TABLE4_IDX",
- "--zookeeper", zkQuorum});
+ "--table", tableName,
+ "--index-table", indexTableName,
+ "--zookeeper", zkQuorum });
assertEquals(0, exitCode);
- ResultSet rs = stmt.executeQuery("SELECT * FROM TABLE4");
+ ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
assertFalse(rs.next());
- rs = stmt.executeQuery("SELECT FIRST_NAME FROM TABLE4 where FIRST_NAME='FirstName 1'");
+ rs = stmt.executeQuery("SELECT FIRST_NAME FROM " + tableName + " where FIRST_NAME='FirstName 1'");
assertTrue(rs.next());
assertEquals("FirstName 1", rs.getString(1));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d6e7846f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
index 54e3f2c..c92a3a3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
@@ -61,8 +61,10 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.job.JobManager;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.util.CSVCommonsLoader;
import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.StringUtil;
@@ -212,38 +214,39 @@ public class CsvBulkLoadTool extends Configured implements Tool {
outputPath = new Path("/tmp/" + UUID.randomUUID());
}
- List<String> tablesToBeLoaded = new ArrayList<String>();
- tablesToBeLoaded.add(qualifiedTableName);
+ List<TargetTableRef> tablesToBeLoaded = new ArrayList<TargetTableRef>();
+ tablesToBeLoaded.add(new TargetTableRef(qualifiedTableName));
tablesToBeLoaded.addAll(getIndexTables(conn, schemaName, qualifiedTableName));
// When loading a single index table, check index table name is correct
if(qualifedIndexTableName != null){
- boolean exists = false;
- for(String tmpTable : tablesToBeLoaded){
- if(tmpTable.compareToIgnoreCase(qualifedIndexTableName) == 0) {
- exists = true;
+ TargetTableRef targetIndexRef = null;
+ for (TargetTableRef tmpTable : tablesToBeLoaded){
+ if(tmpTable.getLogicalName().compareToIgnoreCase(qualifedIndexTableName) == 0) {
+ targetIndexRef = tmpTable;
break;
}
}
- if(!exists){
+ if(targetIndexRef == null){
throw new IllegalStateException("CSV Bulk Loader error: index table " +
qualifedIndexTableName + " doesn't exist");
}
tablesToBeLoaded.clear();
- tablesToBeLoaded.add(qualifedIndexTableName);
+ tablesToBeLoaded.add(targetIndexRef);
}
List<Future<Boolean>> runningJobs = new ArrayList<Future<Boolean>>();
ExecutorService executor = JobManager.createThreadPoolExec(Integer.MAX_VALUE, 5, 20);
try{
- for(String table : tablesToBeLoaded) {
- Path tablePath = new Path(outputPath, table);
+ for (TargetTableRef table : tablesToBeLoaded) {
+ Path tablePath = new Path(outputPath, table.getPhysicalName());
Configuration jobConf = new Configuration(conf);
jobConf.set(CsvToKeyValueMapper.TABLE_NAME_CONFKEY, qualifiedTableName);
- if(qualifiedTableName.compareToIgnoreCase(table) != 0) {
- jobConf.set(CsvToKeyValueMapper.INDEX_TABLE_NAME_CONFKEY, table);
+ if(qualifiedTableName.compareToIgnoreCase(table.getLogicalName()) != 0) {
+ jobConf.set(CsvToKeyValueMapper.INDEX_TABLE_NAME_CONFKEY, table.getPhysicalName());
}
- TableLoader tableLoader = new TableLoader(jobConf, table, inputPath, tablePath);
+ TableLoader tableLoader = new TableLoader(
+ jobConf, table.getPhysicalName(), inputPath, tablePath);
runningJobs.add(executor.submit(tableLoader));
}
} finally {
@@ -392,20 +395,56 @@ public class CsvBulkLoadTool extends Configured implements Tool {
}
/**
- * Get names of index tables of current data table
+ * Get the index tables of current data table
* @throws java.sql.SQLException
*/
- private List<String> getIndexTables(Connection conn, String schemaName, String tableName)
+ private List<TargetTableRef> getIndexTables(Connection conn, String schemaName, String qualifiedTableName)
throws SQLException {
- PTable table = PhoenixRuntime.getTable(conn, tableName);
- List<String> indexTables = new ArrayList<String>();
+ PTable table = PhoenixRuntime.getTable(conn, qualifiedTableName);
+ List<TargetTableRef> indexTables = new ArrayList<TargetTableRef>();
for(PTable indexTable : table.getIndexes()){
- indexTables.add(getQualifiedTableName(schemaName,
- indexTable.getTableName().getString()));
+ if (indexTable.getIndexType() == IndexType.LOCAL) {
+ indexTables.add(
+ new TargetTableRef(getQualifiedTableName(schemaName,
+ indexTable.getTableName().getString()),
+ MetaDataUtil.getLocalIndexTableName(qualifiedTableName)));
+ } else {
+ indexTables.add(new TargetTableRef(getQualifiedTableName(schemaName,
+ indexTable.getTableName().getString())));
+ }
}
return indexTables;
}
-
+
+ /**
+ * Represents the logical and physical name of a single table to which data is to be loaded.
+ *
+ * This class exists to allow for the difference between HBase physical table names and
+ * Phoenix logical table names.
+ */
+ private static class TargetTableRef {
+
+ private final String logicalName;
+ private final String physicalName;
+
+ private TargetTableRef(String name) {
+ this(name, name);
+ }
+
+ private TargetTableRef(String logicalName, String physicalName) {
+ this.logicalName = logicalName;
+ this.physicalName = physicalName;
+ }
+
+ public String getLogicalName() {
+ return logicalName;
+ }
+
+ public String getPhysicalName() {
+ return physicalName;
+ }
+ }
+
/**
* A runnable to load data into a single table
*
@@ -445,9 +484,9 @@ public class CsvBulkLoadTool extends Configured implements Tool {
// initialize credentials to possibily run in a secure env
TableMapReduceUtil.initCredentials(job);
-
- HTable htable = new HTable(conf, tableName);
-
+
+ HTable htable = new HTable(conf, tableName);
+
// Auto configure partitioner and reducer according to the Main Data table
HFileOutputFormat.configureIncrementalLoad(job, htable);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d6e7846f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
index ead5067..6ff7ba3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
@@ -36,6 +36,7 @@ import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
@@ -165,7 +166,7 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes
= PhoenixRuntime.getUncommittedDataIterator(conn, true);
while (uncommittedDataIterator.hasNext()) {
Pair<byte[], List<KeyValue>> kvPair = uncommittedDataIterator.next();
- if(Bytes.compareTo(tableName, kvPair.getFirst()) != 0) {
+ if (Bytes.compareTo(tableName, kvPair.getFirst()) != 0) {
// skip edits for other tables
continue;
}