You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2016/02/10 22:14:54 UTC

phoenix git commit: PHOENIX-2334 CSV Bulk load fails on local indexes(Rajeshbabu)

Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 7498d7ba0 -> bcacdc128


PHOENIX-2334 CSV Bulk load fails on local indexes(Rajeshbabu)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/bcacdc12
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/bcacdc12
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/bcacdc12

Branch: refs/heads/4.x-HBase-0.98
Commit: bcacdc128ecc45b66f67778e0c99376d0c88c1bf
Parents: 7498d7b
Author: Rajeshbabu Chintaguntla <ra...@apache.org>
Authored: Thu Feb 11 02:47:24 2016 +0530
Committer: Rajeshbabu Chintaguntla <ra...@apache.org>
Committed: Thu Feb 11 02:47:24 2016 +0530

----------------------------------------------------------------------
 .../phoenix/end2end/CsvBulkLoadToolIT.java      | 27 +++++++++++---------
 .../phoenix/mapreduce/AbstractBulkLoadTool.java | 16 ++++++++----
 .../mapreduce/bulkload/TargetTableRef.java      |  2 +-
 .../phoenix/query/ConnectionQueryServices.java  |  1 +
 .../query/ConnectionQueryServicesImpl.java      | 27 ++++++++++++++++++++
 .../query/ConnectionlessQueryServicesImpl.java  | 15 +++++++++++
 .../query/DelegateConnectionQueryServices.java  |  6 +++++
 .../java/org/apache/phoenix/util/IndexUtil.java | 10 +++++++-
 8 files changed, 85 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/bcacdc12/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
index 26ec889..96042c5 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
@@ -218,7 +218,7 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterHBaseManagedTimeIT {
 
         Statement stmt = conn.createStatement();
         stmt.execute("CREATE TABLE TABLE6 (ID INTEGER NOT NULL PRIMARY KEY, " +
-                "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
+                "FIRST_NAME VARCHAR, LAST_NAME VARCHAR) SPLIt ON (1,2)");
         String ddl = "CREATE LOCAL INDEX TABLE6_IDX ON TABLE6 "
                 + " (FIRST_NAME ASC)";
         stmt.execute(ddl);
@@ -234,16 +234,19 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterHBaseManagedTimeIT {
 
         CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
         csvBulkLoadTool.setConf(getUtility().getConfiguration());
-        try {
-            csvBulkLoadTool.run(new String[] {
-                    "--input", "/tmp/input3.csv",
-                    "--table", "table6",
-                    "--zookeeper", zkQuorum});
-            fail("Csv bulk load currently has issues with local indexes.");
-        } catch( UnsupportedOperationException ise) {
-            assertEquals("Local indexes not supported by Bulk Loader",ise.getMessage());
-        }
-        
+        int exitCode = csvBulkLoadTool.run(new String[] {
+                "--input", "/tmp/input3.csv",
+                "--table", "table6",
+                "--zookeeper", zkQuorum});
+        assertEquals(0, exitCode);
+
+        ResultSet rs = stmt.executeQuery("SELECT id, FIRST_NAME FROM TABLE6 where first_name='FirstName 2'");
+        assertTrue(rs.next());
+        assertEquals(2, rs.getInt(1));
+        assertEquals("FirstName 2", rs.getString(2));
+
+        rs.close();
+        stmt.close();
     }
 
     @Test
@@ -251,7 +254,7 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterHBaseManagedTimeIT {
         testImportOneIndexTable("TABLE4", false);
     }
 
-    //@Test
+    @Test
     public void testImportOneLocalIndexTable() throws Exception {
         testImportOneIndexTable("TABLE5", true);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bcacdc12/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index f6ba5f6..39ee4b1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -21,8 +21,10 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
 import org.apache.commons.cli.CommandLine;
@@ -54,6 +56,7 @@ import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -295,7 +298,12 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
     }
 
     private void completebulkload(Configuration conf,Path outputPath , List<TargetTableRef> tablesToBeLoaded) throws Exception {
+        Set<String> tableNames = new HashSet<>(tablesToBeLoaded.size());
         for(TargetTableRef table : tablesToBeLoaded) {
+            if(tableNames.contains(table.getPhysicalName())){
+                continue;
+            }
+            tableNames.add(table.getPhysicalName());
             LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
             String tableName = table.getPhysicalName();
             Path tableOutputPath = new Path(outputPath,tableName);
@@ -382,11 +390,9 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
         List<TargetTableRef> indexTables = new ArrayList<TargetTableRef>();
         for(PTable indexTable : table.getIndexes()){
             if (indexTable.getIndexType() == PTable.IndexType.LOCAL) {
-                throw new UnsupportedOperationException("Local indexes not supported by Bulk Loader");
-                /*indexTables.add(
-                        new TargetTableRef(getQualifiedTableName(schemaName,
-                                indexTable.getTableName().getString()),
-                                MetaDataUtil.getLocalIndexTableName(qualifiedTableName))); */
+                indexTables.add(new TargetTableRef(getQualifiedTableName(schemaName, indexTable
+                        .getTableName().getString()), MetaDataUtil
+                        .getLocalIndexTableName(qualifiedTableName)));
             } else {
                 indexTables.add(new TargetTableRef(getQualifiedTableName(schemaName,
                         indexTable.getTableName().getString())));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bcacdc12/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRef.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRef.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRef.java
index 1a846f9..2c3069f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRef.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRef.java
@@ -46,7 +46,7 @@ public class TargetTableRef {
     }
 
     @JsonCreator
-    private TargetTableRef(@JsonProperty("logicalName") String logicalName,
+    public TargetTableRef(@JsonProperty("logicalName") String logicalName,
         @JsonProperty("physicalName") String physicalName) {
         this.logicalName = logicalName;
         this.physicalName = physicalName;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bcacdc12/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index d3b7de8..ae4b7ac 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -69,6 +69,7 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
 
     public HTableDescriptor getTableDescriptor(byte[] tableName) throws SQLException;
 
+    public HRegionLocation getTableRegionLocation(byte[] tableName, byte[] row) throws SQLException;
     public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws SQLException;
 
     public PhoenixConnection connect(String url, Properties info) throws SQLException;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bcacdc12/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index a568dce..998b7f8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -3363,4 +3363,31 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     public boolean isRenewingLeasesEnabled() {
         return supportsFeature(ConnectionQueryServices.Feature.RENEW_LEASE) && renewLeaseEnabled;
     }
+
+
+    @Override
+    public HRegionLocation getTableRegionLocation(byte[] tableName, byte[] row) throws SQLException {
+       /*
+        * Use HConnection.getRegionLocation as it uses the cache in HConnection, to get the region
+        * to which specified row belongs to.
+         */
+        int retryCount = 0, maxRetryCount = 1;
+        boolean reload =false;
+        while (true) {
+                try {
+                        return connection.getRegionLocation(TableName.valueOf(tableName), row, reload);
+                } catch (org.apache.hadoop.hbase.TableNotFoundException e) {
+                        String fullName = Bytes.toString(tableName);
+                        throw new TableNotFoundException(SchemaUtil.getSchemaNameFromFullName(fullName), SchemaUtil.getTableNameFromFullName(fullName));
+                } catch (IOException e) {
+                        if (retryCount++ < maxRetryCount) { // One retry, in case split occurs while navigating
+                                reload = true;
+                                continue;
+                        }
+                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.GET_TABLE_REGIONS_FAIL)
+                        .setRootCause(e).build().buildException();
+                }
+        }
+     }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bcacdc12/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 6cfb382..b4bbe1f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -577,4 +577,19 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
     public boolean isRenewingLeasesEnabled() {
         return false;
     }
+
+    public HRegionLocation getTableRegionLocation(byte[] tableName, byte[] row) throws SQLException {
+       List<HRegionLocation> regions = tableSplits.get(Bytes.toString(tableName));
+       if (regions != null) {
+               for(HRegionLocation region: regions) {
+                       if (Bytes.compareTo(region.getRegionInfo().getStartKey(), row) <= 0
+                                       && Bytes.compareTo(region.getRegionInfo().getEndKey(), row) > 0) {
+                           return region;
+                       }
+               }
+       }
+       return new HRegionLocation(
+                       new HRegionInfo(TableName.valueOf(tableName), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW),
+                       SERVER_NAME, -1);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bcacdc12/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 9b721f8..4c7446b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -304,4 +304,10 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
     public boolean isRenewingLeasesEnabled() {
         return getDelegate().isRenewingLeasesEnabled();
     }
+
+    @Override
+    public HRegionLocation getTableRegionLocation(byte[] tableName, byte[] row)
+            throws SQLException {
+        return getDelegate().getTableRegionLocation(tableName, row);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bcacdc12/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 2c64529..7c24ad6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
@@ -281,7 +282,14 @@ public class IndexUtil {
                         }
                         
                     };
-                    indexMutations.add(maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, ts, null, null));
+                    byte[] regionStartKey = null;
+                    byte[] regionEndkey = null;
+                    if(maintainer.isLocalIndex()) {
+                        HRegionLocation tableRegionLocation = connection.getQueryServices().getTableRegionLocation(table.getName().getBytes(), dataMutation.getRow());
+                        regionStartKey = tableRegionLocation.getRegionInfo().getStartKey();
+                        regionEndkey = tableRegionLocation.getRegionInfo().getEndKey();
+                    }
+                    indexMutations.add(maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, ts, regionStartKey, regionEndkey));
                 }
             }
             return indexMutations;