You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/03/04 23:40:42 UTC

[07/50] [abbrv] phoenix git commit: PHOENIX-1248 CsvBulkLoadTool is failing with IAE when local index specified for --index-table parameter(Gabriel Reid)

PHOENIX-1248 CsvBulkLoadTool is failing with IAE when local index specified for --index-table parameter(Gabriel Reid)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d6e7846f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d6e7846f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d6e7846f

Branch: refs/heads/calcite
Commit: d6e7846f491e09780a2d663cc5c23bc21244e26c
Parents: 3f48938
Author: Rajeshbabu Chintaguntla <ra...@apache.org>
Authored: Sun Feb 1 09:28:29 2015 -0800
Committer: Rajeshbabu Chintaguntla <ra...@apache.org>
Committed: Sun Feb 1 09:28:29 2015 -0800

----------------------------------------------------------------------
 .../phoenix/mapreduce/CsvBulkLoadToolIT.java    | 79 ++++++++++++++----
 .../phoenix/mapreduce/CsvBulkLoadTool.java      | 85 ++++++++++++++------
 .../phoenix/mapreduce/CsvToKeyValueMapper.java  |  3 +-
 3 files changed, 126 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d6e7846f/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
index 4373f47..0501142 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
@@ -17,12 +17,6 @@
  */
 package org.apache.phoenix.mapreduce;
 
-import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
 import java.io.PrintWriter;
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -42,6 +36,12 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 @Category(NeedsOwnMiniClusterTest.class)
 public class CsvBulkLoadToolIT {
 
@@ -191,17 +191,62 @@ public class CsvBulkLoadToolIT {
         rs.close();
         stmt.close();
     }
-    
+
     @Test
-    public void testImportOneIndexTable() throws Exception {
+    public void testImportWithLocalIndex() throws Exception {
 
         Statement stmt = conn.createStatement();
-        stmt.execute("CREATE TABLE TABLE4 (ID INTEGER NOT NULL PRIMARY KEY, " +
-            "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
-        String ddl = "CREATE INDEX TABLE4_IDX ON TABLE4 "
+        stmt.execute("CREATE TABLE TABLE6 (ID INTEGER NOT NULL PRIMARY KEY, " +
+                "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
+        String ddl = "CREATE LOCAL INDEX TABLE6_IDX ON TABLE6 "
                 + " (FIRST_NAME ASC)";
         stmt.execute(ddl);
-        
+
+        FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
+        FSDataOutputStream outputStream = fs.create(new Path("/tmp/input3.csv"));
+        PrintWriter printWriter = new PrintWriter(outputStream);
+        printWriter.println("1,FirstName 1,LastName 1");
+        printWriter.println("2,FirstName 2,LastName 2");
+        printWriter.close();
+
+        CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
+        csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
+        int exitCode = csvBulkLoadTool.run(new String[] {
+                "--input", "/tmp/input3.csv",
+                "--table", "table6",
+                "--zookeeper", zkQuorum});
+        assertEquals(0, exitCode);
+
+        ResultSet rs = stmt.executeQuery("SELECT id, FIRST_NAME FROM TABLE6 where first_name='FirstName 2'");
+        assertTrue(rs.next());
+        assertEquals(2, rs.getInt(1));
+        assertEquals("FirstName 2", rs.getString(2));
+
+        rs.close();
+        stmt.close();
+    }
+
+    @Test
+    public void testImportOneIndexTable() throws Exception {
+        testImportOneIndexTable("TABLE4", false);
+    }
+
+    @Test
+    public void testImportOneLocalIndexTable() throws Exception {
+        testImportOneIndexTable("TABLE5", true);
+    }
+
+    public void testImportOneIndexTable(String tableName, boolean localIndex) throws Exception {
+
+        String indexTableName = String.format("%s_IDX", tableName);
+        Statement stmt = conn.createStatement();
+        stmt.execute("CREATE TABLE " + tableName + "(ID INTEGER NOT NULL PRIMARY KEY, "
+                + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
+        String ddl =
+                "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexTableName + " ON "
+                        + tableName + "(FIRST_NAME ASC)";
+        stmt.execute(ddl);
+
         FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
         FSDataOutputStream outputStream = fs.create(new Path("/tmp/input4.csv"));
         PrintWriter printWriter = new PrintWriter(outputStream);
@@ -213,14 +258,14 @@ public class CsvBulkLoadToolIT {
         csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
         int exitCode = csvBulkLoadTool.run(new String[] {
                 "--input", "/tmp/input4.csv",
-                "--table", "table4",
-                "--index-table", "TABLE4_IDX",
-                "--zookeeper", zkQuorum});
+                "--table", tableName,
+                "--index-table", indexTableName,
+                "--zookeeper", zkQuorum });
         assertEquals(0, exitCode);
 
-        ResultSet rs = stmt.executeQuery("SELECT * FROM TABLE4");
+        ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
         assertFalse(rs.next());
-        rs = stmt.executeQuery("SELECT FIRST_NAME FROM TABLE4 where FIRST_NAME='FirstName 1'");
+        rs = stmt.executeQuery("SELECT FIRST_NAME FROM " + tableName + " where FIRST_NAME='FirstName 1'");
         assertTrue(rs.next());
         assertEquals("FirstName 1", rs.getString(1));
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d6e7846f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
index 54e3f2c..c92a3a3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
@@ -61,8 +61,10 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.job.JobManager;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.util.CSVCommonsLoader;
 import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.StringUtil;
@@ -212,38 +214,39 @@ public class CsvBulkLoadTool extends Configured implements Tool {
             outputPath = new Path("/tmp/" + UUID.randomUUID());
         }
         
-        List<String> tablesToBeLoaded = new ArrayList<String>();
-        tablesToBeLoaded.add(qualifiedTableName);
+        List<TargetTableRef> tablesToBeLoaded = new ArrayList<TargetTableRef>();
+        tablesToBeLoaded.add(new TargetTableRef(qualifiedTableName));
         tablesToBeLoaded.addAll(getIndexTables(conn, schemaName, qualifiedTableName));
         
         // When loading a single index table, check index table name is correct
         if(qualifedIndexTableName != null){
-        	boolean exists = false;
-        	for(String tmpTable : tablesToBeLoaded){
-        		if(tmpTable.compareToIgnoreCase(qualifedIndexTableName) == 0) {
-        			exists = true;
+            TargetTableRef targetIndexRef = null;
+        	for (TargetTableRef tmpTable : tablesToBeLoaded){
+        		if(tmpTable.getLogicalName().compareToIgnoreCase(qualifedIndexTableName) == 0) {
+                    targetIndexRef = tmpTable;
         			break;
         		}
         	}
-        	if(!exists){
+        	if(targetIndexRef == null){
                 throw new IllegalStateException("CSV Bulk Loader error: index table " +
                     qualifedIndexTableName + " doesn't exist");
         	}
         	tablesToBeLoaded.clear();
-        	tablesToBeLoaded.add(qualifedIndexTableName);
+        	tablesToBeLoaded.add(targetIndexRef);
         }
         
         List<Future<Boolean>> runningJobs = new ArrayList<Future<Boolean>>();
         ExecutorService executor =  JobManager.createThreadPoolExec(Integer.MAX_VALUE, 5, 20);
         try{
-	        for(String table : tablesToBeLoaded) {
-	        	Path tablePath = new Path(outputPath, table);
+	        for (TargetTableRef table : tablesToBeLoaded) {
+	        	Path tablePath = new Path(outputPath, table.getPhysicalName());
 	        	Configuration jobConf = new Configuration(conf);
 	        	jobConf.set(CsvToKeyValueMapper.TABLE_NAME_CONFKEY, qualifiedTableName);
-	        	if(qualifiedTableName.compareToIgnoreCase(table) != 0) {
-	        		jobConf.set(CsvToKeyValueMapper.INDEX_TABLE_NAME_CONFKEY, table);
+	        	if(qualifiedTableName.compareToIgnoreCase(table.getLogicalName()) != 0) {
+                    jobConf.set(CsvToKeyValueMapper.INDEX_TABLE_NAME_CONFKEY, table.getPhysicalName());
 	        	}
-	        	TableLoader tableLoader = new TableLoader(jobConf, table, inputPath, tablePath);
+	        	TableLoader tableLoader = new TableLoader(
+                        jobConf, table.getPhysicalName(), inputPath, tablePath);
 	        	runningJobs.add(executor.submit(tableLoader));
 	        }
         } finally {
@@ -392,20 +395,56 @@ public class CsvBulkLoadTool extends Configured implements Tool {
     }
     
     /**
-     * Get names of index tables of current data table
+     * Get the index tables of current data table
      * @throws java.sql.SQLException
      */
-    private List<String> getIndexTables(Connection conn, String schemaName, String tableName) 
+    private List<TargetTableRef> getIndexTables(Connection conn, String schemaName, String qualifiedTableName)
         throws SQLException {
-        PTable table = PhoenixRuntime.getTable(conn, tableName);
-        List<String> indexTables = new ArrayList<String>();
+        PTable table = PhoenixRuntime.getTable(conn, qualifiedTableName);
+        List<TargetTableRef> indexTables = new ArrayList<TargetTableRef>();
         for(PTable indexTable : table.getIndexes()){
-        	indexTables.add(getQualifiedTableName(schemaName, 
-                indexTable.getTableName().getString()));
+            if (indexTable.getIndexType() == IndexType.LOCAL) {
+                indexTables.add(
+                        new TargetTableRef(getQualifiedTableName(schemaName,
+                                indexTable.getTableName().getString()),
+                                MetaDataUtil.getLocalIndexTableName(qualifiedTableName)));
+            } else {
+                indexTables.add(new TargetTableRef(getQualifiedTableName(schemaName,
+                        indexTable.getTableName().getString())));
+            }
         }
         return indexTables;
     }
-    
+
+    /**
+     * Represents the logical and physical name of a single table to which data is to be loaded.
+     *
+     * This class exists to allow for the difference between HBase physical table names and
+     * Phoenix logical table names.
+     */
+    private static class TargetTableRef {
+
+        private final String logicalName;
+        private final String physicalName;
+
+        private TargetTableRef(String name) {
+            this(name, name);
+        }
+
+        private TargetTableRef(String logicalName, String physicalName) {
+            this.logicalName = logicalName;
+            this.physicalName = physicalName;
+        }
+
+        public String getLogicalName() {
+            return logicalName;
+        }
+
+        public String getPhysicalName() {
+            return physicalName;
+        }
+    }
+
     /**
      * A runnable to load data into a single table
      *
@@ -445,9 +484,9 @@ public class CsvBulkLoadTool extends Configured implements Tool {
 
 	            // initialize credentials to possibily run in a secure env
 	            TableMapReduceUtil.initCredentials(job);
-	            
-	            HTable htable = new HTable(conf, tableName);
-	
+
+                HTable htable = new HTable(conf, tableName);
+
 	            // Auto configure partitioner and reducer according to the Main Data table
 	            HFileOutputFormat.configureIncrementalLoad(job, htable);
 	

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d6e7846f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
index ead5067..6ff7ba3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
@@ -36,6 +36,7 @@ import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+
 import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.csv.CSVParser;
 import org.apache.commons.csv.CSVRecord;
@@ -165,7 +166,7 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes
                     = PhoenixRuntime.getUncommittedDataIterator(conn, true);
             while (uncommittedDataIterator.hasNext()) {
                 Pair<byte[], List<KeyValue>> kvPair = uncommittedDataIterator.next();
-                if(Bytes.compareTo(tableName, kvPair.getFirst()) != 0) {
+                if (Bytes.compareTo(tableName, kvPair.getFirst()) != 0) {
                 	// skip edits for other tables
                 	continue;
                 }