You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2016/07/28 15:58:33 UTC

phoenix git commit: PHOENIX-2926 Skip loading data for table having local indexes when there is split during bulkload job-addendem(Rajeshbabu)

Repository: phoenix
Updated Branches:
  refs/heads/master 7d80fd85d -> ef7560f50


PHOENIX-2926 Skip loading data for table having local indexes when there is split during bulkload job-addendem(Rajeshbabu)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ef7560f5
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ef7560f5
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ef7560f5

Branch: refs/heads/master
Commit: ef7560f501747e6a389bbf62b1c3c2b30b65309f
Parents: 7d80fd8
Author: Rajeshbabu Chintaguntla <ra...@apache.org>
Authored: Thu Jul 28 21:35:50 2016 +0530
Committer: Rajeshbabu Chintaguntla <ra...@apache.org>
Committed: Thu Jul 28 21:35:50 2016 +0530

----------------------------------------------------------------------
 .../phoenix/mapreduce/AbstractBulkLoadTool.java | 32 ++++++--------------
 .../phoenix/mapreduce/index/IndexTool.java      | 21 +++++++++++--
 .../java/org/apache/phoenix/util/IndexUtil.java | 13 ++++++++
 3 files changed, 42 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef7560f5/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index faa0a6e..b32f9c6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.mapreduce;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -60,6 +61,7 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
@@ -312,33 +314,19 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
 
         if (success) {
             if (hasLocalIndexes) {
-                byte[][] splitKeysAfterJob = null;
                 try {
                     table = new HTable(job.getConfiguration(), qualifiedTableName);
-                    splitKeysAfterJob = table.getRegionLocator().getStartKeys();
+                    if(!IndexUtil.matchingSplitKeys(splitKeysBeforeJob, table.getRegionLocator().getStartKeys())) {
+                        LOG.error("The table "
+                                + qualifiedTableName
+                                + " has local indexes and there is split key mismatch before and"
+                                + " after running bulkload job. Please rerun the job otherwise"
+                                + " there may be inconsistencies between actual data and index data.");
+                        return -1;
+                    }
                 } finally {
                     if (table != null) table.close();
                 }
-                boolean matchingSplitKeys = true;
-                if (splitKeysBeforeJob != null && splitKeysAfterJob != null
-                        && splitKeysBeforeJob.length == splitKeysAfterJob.length) {
-                    for (int i = 0; i < splitKeysBeforeJob.length; i++) {
-                        if (Bytes.compareTo(splitKeysBeforeJob[i], splitKeysAfterJob[i]) != 0) {
-                            matchingSplitKeys = false;
-                            break;
-                        }
-                    }
-                } else {
-                    matchingSplitKeys = false;
-                }
-                if(!matchingSplitKeys) {
-                    LOG.error("The table "
-                            + qualifiedTableName
-                            + " has local indexes and there is split key mismatch before and"
-                            + " after running bulkload job. Please rerun the job otherwise"
-                            + " there may be inconsistencies between actual data and index data.");
-                    return -1;
-                }
             }
             LOG.info("Loading HFiles from {}", outputPath);
             completebulkload(conf,outputPath,tablesToBeLoaded);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef7560f5/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 34c9013..82b353c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -63,6 +63,7 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
@@ -205,8 +206,10 @@ public class IndexTool extends Configured implements Tool {
             // check if the index type is LOCAL, if so, derive and set the physicalIndexName that is
             // computed from the qDataTable name.
             String physicalIndexTable = pindexTable.getPhysicalName().getString();
+            boolean isLocalIndexBuild = false;
             if (IndexType.LOCAL.equals(pindexTable.getIndexType())) {
                 physicalIndexTable = qDataTable;
+                isLocalIndexBuild = true;
             }
 
             final PhoenixConnection pConnection = connection.unwrap(PhoenixConnection.class);
@@ -247,7 +250,7 @@ public class IndexTool extends Configured implements Tool {
                 configureSubmittableJobUsingDirectApi(job, outputPath,
                     cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt()));
             } else {
-                configureRunnableJobUsingBulkLoad(job, outputPath);
+                configureRunnableJobUsingBulkLoad(job, outputPath, isLocalIndexBuild);
                 // Without direct API, we need to update the index state to ACTIVE from client.
                 IndexToolUtil.updateIndexState(connection, qDataTable, indexTable,
                         PIndexState.ACTIVE);
@@ -276,7 +279,7 @@ public class IndexTool extends Configured implements Tool {
      * @return
      * @throws Exception
      */
-    private void configureRunnableJobUsingBulkLoad(Job job, Path outputPath) throws Exception {
+    private void configureRunnableJobUsingBulkLoad(Job job, Path outputPath, boolean isLocalIndexBuild) throws Exception {
         job.setMapperClass(PhoenixIndexImportMapper.class);
         job.setMapOutputKeyClass(ImmutableBytesWritable.class);
         job.setMapOutputValueClass(KeyValue.class);
@@ -285,11 +288,25 @@ public class IndexTool extends Configured implements Tool {
                 PhoenixConfigurationUtil.getPhysicalTableName(configuration);
         final HTable htable = new HTable(configuration, physicalIndexTable);
         HFileOutputFormat.configureIncrementalLoad(job, htable);
+        byte[][] splitKeysBeforeJob = null;
+        if(isLocalIndexBuild) {
+            splitKeysBeforeJob = htable.getRegionLocator().getStartKeys();
+        }
         boolean status = job.waitForCompletion(true);
         if (!status) {
             LOG.error("IndexTool job failed!");
             htable.close();
             throw new Exception("IndexTool job failed: " + job.toString());
+        } else {
+            if (isLocalIndexBuild
+                    && !IndexUtil.matchingSplitKeys(splitKeysBeforeJob, htable.getRegionLocator()
+                            .getStartKeys())) {
+                String errMsg = "The index to build is local index and the split keys are not matching"
+                        + " before and after running the job. Please rerun the job otherwise"
+                        + " there may be inconsistencies between actual data and index data";
+                LOG.error(errMsg);
+                throw new Exception(errMsg);
+            }
         }
 
         LOG.info("Loading HFiles from {}", outputPath);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef7560f5/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index b0abe36..9089b68 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -673,4 +673,17 @@ public class IndexUtil {
             HConstants.NO_NONCE, HConstants.NO_NONCE);
     }
 
+    public static boolean matchingSplitKeys(byte[][] splitKeys1, byte[][] splitKeys2) throws IOException {
+        if (splitKeys1 != null && splitKeys2 != null
+                && splitKeys1.length == splitKeys2.length) {
+            for (int i = 0; i < splitKeys1.length; i++) {
+                if (Bytes.compareTo(splitKeys1[i], splitKeys2[i]) != 0) {
+                    return false;
+                }
+            }
+        } else {
+            return false;
+        }
+        return true;
+    }
 }