You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2016/08/05 21:03:28 UTC
[29/50] [abbrv] phoenix git commit: PHOENIX-2926 Skip loading data
for table having local indexes when there is split during bulkload
job-addendum(Rajeshbabu)
PHOENIX-2926 Skip loading data for table having local indexes when there is split during bulkload job-addendum(Rajeshbabu)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2384f6b2
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2384f6b2
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2384f6b2
Branch: refs/heads/encodecolumns
Commit: 2384f6b2f3daab8d85e415b3574fb75667d80f6f
Parents: 5e2537a
Author: Rajeshbabu Chintaguntla <ra...@apache.org>
Authored: Fri Jul 29 00:20:08 2016 +0530
Committer: Rajeshbabu Chintaguntla <ra...@apache.org>
Committed: Fri Jul 29 00:20:08 2016 +0530
----------------------------------------------------------------------
.../phoenix/mapreduce/AbstractBulkLoadTool.java | 32 ++++++--------------
.../phoenix/mapreduce/index/IndexTool.java | 20 ++++++++++--
.../java/org/apache/phoenix/util/IndexUtil.java | 14 +++++++++
3 files changed, 42 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2384f6b2/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index a41de8a..f7b7d22 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.mapreduce;
+import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -59,6 +60,7 @@ import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -309,33 +311,19 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
if (success) {
if (hasLocalIndexes) {
- byte[][] splitKeysAfterJob = null;
try {
table = new HTable(job.getConfiguration(), qualifiedTableName);
- splitKeysAfterJob = table.getStartKeys();
+ if(!IndexUtil.matchingSplitKeys(splitKeysBeforeJob, table.getStartKeys())) {
+ LOG.error("The table "
+ + qualifiedTableName
+ + " has local indexes and there is split key mismatch before and"
+ + " after running bulkload job. Please rerun the job otherwise"
+ + " there may be inconsistencies between actual data and index data.");
+ return -1;
+ }
} finally {
if (table != null) table.close();
}
- boolean matchingSplitKeys = true;
- if (splitKeysBeforeJob != null && splitKeysAfterJob != null
- && splitKeysBeforeJob.length == splitKeysAfterJob.length) {
- for (int i = 0; i < splitKeysBeforeJob.length; i++) {
- if (Bytes.compareTo(splitKeysBeforeJob[i], splitKeysAfterJob[i]) != 0) {
- matchingSplitKeys = false;
- break;
- }
- }
- } else {
- matchingSplitKeys = false;
- }
- if(!matchingSplitKeys) {
- LOG.error("The table "
- + qualifiedTableName
- + " has local indexes and there is split key mismatch before and"
- + " after running bulkload job. Please rerun the job otherwise"
- + " there may be inconsistencies between actual data and index data.");
- return -1;
- }
}
LOG.info("Loading HFiles from {}", outputPath);
completebulkload(conf,outputPath,tablesToBeLoaded);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2384f6b2/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 34c9013..8488123 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -63,6 +63,7 @@ import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
@@ -205,8 +206,10 @@ public class IndexTool extends Configured implements Tool {
// check if the index type is LOCAL, if so, derive and set the physicalIndexName that is
// computed from the qDataTable name.
String physicalIndexTable = pindexTable.getPhysicalName().getString();
+ boolean isLocalIndexBuild = false;
if (IndexType.LOCAL.equals(pindexTable.getIndexType())) {
physicalIndexTable = qDataTable;
+ isLocalIndexBuild = true;
}
final PhoenixConnection pConnection = connection.unwrap(PhoenixConnection.class);
@@ -247,7 +250,7 @@ public class IndexTool extends Configured implements Tool {
configureSubmittableJobUsingDirectApi(job, outputPath,
cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt()));
} else {
- configureRunnableJobUsingBulkLoad(job, outputPath);
+ configureRunnableJobUsingBulkLoad(job, outputPath, isLocalIndexBuild);
// Without direct API, we need to update the index state to ACTIVE from client.
IndexToolUtil.updateIndexState(connection, qDataTable, indexTable,
PIndexState.ACTIVE);
@@ -276,7 +279,7 @@ public class IndexTool extends Configured implements Tool {
* @return
* @throws Exception
*/
- private void configureRunnableJobUsingBulkLoad(Job job, Path outputPath) throws Exception {
+ private void configureRunnableJobUsingBulkLoad(Job job, Path outputPath, boolean isLocalIndexBuild) throws Exception {
job.setMapperClass(PhoenixIndexImportMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
@@ -285,11 +288,24 @@ public class IndexTool extends Configured implements Tool {
PhoenixConfigurationUtil.getPhysicalTableName(configuration);
final HTable htable = new HTable(configuration, physicalIndexTable);
HFileOutputFormat.configureIncrementalLoad(job, htable);
+ byte[][] splitKeysBeforeJob = null;
+ if(isLocalIndexBuild) {
+ splitKeysBeforeJob = htable.getStartKeys();
+ }
boolean status = job.waitForCompletion(true);
if (!status) {
LOG.error("IndexTool job failed!");
htable.close();
throw new Exception("IndexTool job failed: " + job.toString());
+ } else {
+ if (isLocalIndexBuild
+ && !IndexUtil.matchingSplitKeys(splitKeysBeforeJob, htable.getStartKeys())) {
+ String errMsg = "The index to build is local index and the split keys are not matching"
+ + " before and after running the job. Please rerun the job otherwise"
+ + " there may be inconsistencies between actual data and index data";
+ LOG.error(errMsg);
+ throw new Exception(errMsg);
+ }
}
LOG.info("Loading HFiles from {}", outputPath);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2384f6b2/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 13dcc05..7cd7eb3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -672,4 +672,18 @@ public class IndexUtil {
mutations.toArray(new Mutation[mutations.size()]),
HConstants.NO_NONCE, HConstants.NO_NONCE);
}
+
+ public static boolean matchingSplitKeys(byte[][] splitKeys1, byte[][] splitKeys2) throws IOException {
+ if (splitKeys1 != null && splitKeys2 != null
+ && splitKeys1.length == splitKeys2.length) {
+ for (int i = 0; i < splitKeys1.length; i++) {
+ if (Bytes.compareTo(splitKeys1[i], splitKeys2[i]) != 0) {
+ return false;
+ }
+ }
+ } else {
+ return false;
+ }
+ return true;
+ }
}