You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2018/03/27 15:46:18 UTC
hive git commit: HIVE-17098 : Race condition in Hbase tables (Oleksiy
Sayankin via Zoltan Haindrich)
Repository: hive
Updated Branches:
refs/heads/master a2394c5bf -> 95a1538ae
HIVE-17098 : Race condition in Hbase tables (Oleksiy Sayankin via Zoltan Haindrich)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/95a1538a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/95a1538a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/95a1538a
Branch: refs/heads/master
Commit: 95a1538aed11d0f027c5fe8ac9d99bde6f69501a
Parents: a2394c5
Author: Oleksiy Sayankin <ol...@gmail.com>
Authored: Mon Jul 17 02:39:00 2017 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Tue Mar 27 08:45:35 2018 -0700
----------------------------------------------------------------------
.../hive/hbase/HiveHBaseTableInputFormat.java | 161 ++++++++-----------
1 file changed, 63 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/95a1538a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
index 80c6485..069c9b9 100644
--- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
+++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
@@ -19,13 +19,13 @@
package org.apache.hadoop.hive.hbase;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -35,36 +35,18 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase;
import org.apache.hadoop.hbase.mapreduce.TableSplit;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping;
-import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator;
import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.ByteStream;
import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.io.ByteWritable;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.io.ShortWritable;
-import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -87,71 +69,58 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
implements InputFormat<ImmutableBytesWritable, ResultWritable> {
static final Logger LOG = LoggerFactory.getLogger(HiveHBaseTableInputFormat.class);
- private static final Object hbaseTableMonitor = new Object();
- private Connection conn = null;
+ private static final Object HBASE_TABLE_MONITOR = new Object();
- @Override
- public RecordReader<ImmutableBytesWritable, ResultWritable> getRecordReader(
- InputSplit split,
- JobConf jobConf,
- final Reporter reporter) throws IOException {
+ @Override public RecordReader<ImmutableBytesWritable, ResultWritable> getRecordReader(InputSplit split,
+ JobConf jobConf, final Reporter reporter) throws IOException {
HBaseSplit hbaseSplit = (HBaseSplit) split;
TableSplit tableSplit = hbaseSplit.getTableSplit();
- if (conn == null) {
- conn = ConnectionFactory.createConnection(HBaseConfiguration.create(jobConf));
- }
- initializeTable(conn, tableSplit.getTable());
-
- setScan(HiveHBaseInputFormatUtil.getScan(jobConf));
+ final org.apache.hadoop.mapreduce.RecordReader<ImmutableBytesWritable, Result> recordReader;
Job job = new Job(jobConf);
- TaskAttemptContext tac = ShimLoader.getHadoopShims().newTaskAttemptContext(
- job.getConfiguration(), reporter);
+ TaskAttemptContext tac = ShimLoader.getHadoopShims().newTaskAttemptContext(job.getConfiguration(), reporter);
- final org.apache.hadoop.mapreduce.RecordReader<ImmutableBytesWritable, Result>
- recordReader = createRecordReader(tableSplit, tac);
- try {
- recordReader.initialize(tableSplit, tac);
- } catch (InterruptedException e) {
- closeTable(); // Free up the HTable connections
- if (conn != null) {
+ final Connection conn;
+
+ synchronized (HBASE_TABLE_MONITOR) {
+ conn = ConnectionFactory.createConnection(HBaseConfiguration.create(jobConf));
+ initializeTable(conn, tableSplit.getTable());
+ setScan(HiveHBaseInputFormatUtil.getScan(jobConf));
+ recordReader = createRecordReader(tableSplit, tac);
+ try {
+ recordReader.initialize(tableSplit, tac);
+ } catch (InterruptedException e) {
+ closeTable(); // Free up the HTable connections
conn.close();
- conn = null;
+ throw new IOException("Failed to initialize RecordReader", e);
}
- throw new IOException("Failed to initialize RecordReader", e);
}
return new RecordReader<ImmutableBytesWritable, ResultWritable>() {
- @Override
- public void close() throws IOException {
- recordReader.close();
- closeTable();
- if (conn != null) {
+ @Override public void close() throws IOException {
+ synchronized (HBASE_TABLE_MONITOR) {
+ recordReader.close();
+ closeTable();
conn.close();
- conn = null;
}
}
- @Override
- public ImmutableBytesWritable createKey() {
+ @Override public ImmutableBytesWritable createKey() {
return new ImmutableBytesWritable();
}
- @Override
- public ResultWritable createValue() {
+ @Override public ResultWritable createValue() {
return new ResultWritable(new Result());
}
- @Override
- public long getPos() throws IOException {
+ @Override public long getPos() throws IOException {
return 0;
}
- @Override
- public float getProgress() throws IOException {
+ @Override public float getProgress() throws IOException {
float progress = 0.0F;
try {
@@ -163,8 +132,7 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
return progress;
}
- @Override
- public boolean next(ImmutableBytesWritable rowKey, ResultWritable value) throws IOException {
+ @Override public boolean next(ImmutableBytesWritable rowKey, ResultWritable value) throws IOException {
boolean next = false;
@@ -195,8 +163,7 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
*
* @return converted table split if any
*/
- private Scan createFilterScan(JobConf jobConf, int iKey, int iTimestamp, boolean isKeyBinary)
- throws IOException {
+ private Scan createFilterScan(JobConf jobConf, int iKey, int iTimestamp, boolean isKeyBinary) throws IOException {
// TODO: assert iKey is HBaseSerDe#HBASE_KEY_COL
@@ -212,21 +179,19 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
return scan;
}
- ExprNodeGenericFuncDesc filterExpr =
- SerializationUtilities.deserializeExpression(filterExprSerialized);
+ ExprNodeGenericFuncDesc filterExpr = SerializationUtilities.deserializeExpression(filterExprSerialized);
String keyColName = jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iKey];
ArrayList<TypeInfo> cols = TypeInfoUtils.getTypeInfosFromTypeString(jobConf.get(serdeConstants.LIST_COLUMN_TYPES));
String colType = cols.get(iKey).getTypeName();
- boolean isKeyComparable = isKeyBinary || colType.equalsIgnoreCase("string");
+ boolean isKeyComparable = isKeyBinary || "string".equalsIgnoreCase(colType);
String tsColName = null;
if (iTimestamp >= 0) {
tsColName = jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iTimestamp];
}
- IndexPredicateAnalyzer analyzer =
- newIndexPredicateAnalyzer(keyColName, isKeyComparable, tsColName);
+ IndexPredicateAnalyzer analyzer = newIndexPredicateAnalyzer(keyColName, isKeyComparable, tsColName);
List<IndexSearchCondition> conditions = new ArrayList<IndexSearchCondition>();
ExprNodeDesc residualPredicate = analyzer.analyzePredicate(filterExpr, conditions);
@@ -263,8 +228,8 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
*
* @return preconfigured predicate analyzer
*/
- static IndexPredicateAnalyzer newIndexPredicateAnalyzer(
- String keyColumnName, boolean isKeyComparable, String timestampColumn) {
+ static IndexPredicateAnalyzer newIndexPredicateAnalyzer(String keyColumnName, boolean isKeyComparable,
+ String timestampColumn) {
IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer();
@@ -273,20 +238,17 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
// We can do other comparisons only if storage format in hbase is either binary
// or we are dealing with string types since there lexicographic ordering will suffice.
if (isKeyComparable) {
- analyzer.addComparisonOp(keyColumnName,
- "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual",
+ analyzer.addComparisonOp(keyColumnName, "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual",
"org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan",
"org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan",
"org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan",
"org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan");
} else {
- analyzer.addComparisonOp(keyColumnName,
- "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual");
+ analyzer.addComparisonOp(keyColumnName, "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual");
}
if (timestampColumn != null) {
- analyzer.addComparisonOp(timestampColumn,
- "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual",
+ analyzer.addComparisonOp(timestampColumn, "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual",
"org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan",
"org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan",
"org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan",
@@ -296,10 +258,22 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
return analyzer;
}
- @Override
- public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
- synchronized (hbaseTableMonitor) {
- return getSplitsInternal(jobConf, numSplits);
+ @Override public InputSplit[] getSplits(final JobConf jobConf, final int numSplits) throws IOException {
+ synchronized (HBASE_TABLE_MONITOR) {
+ final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ if (ugi == null) {
+ return getSplitsInternal(jobConf, numSplits);
+ }
+
+ try {
+ return ugi.doAs(new PrivilegedExceptionAction<InputSplit[]>() {
+ @Override public InputSplit[] run() throws IOException {
+ return getSplitsInternal(jobConf, numSplits);
+ }
+ });
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
}
}
@@ -311,9 +285,9 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
}
String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME);
- if (conn == null) {
- conn = ConnectionFactory.createConnection(HBaseConfiguration.create(jobConf));
- }
+
+ Connection conn = ConnectionFactory.createConnection(HBaseConfiguration.create(jobConf));
+
TableName tableName = TableName.valueOf(hbaseTableName);
initializeTable(conn, tableName);
@@ -342,8 +316,8 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
// split per region, the implementation actually takes the scan
// definition into account and excludes regions which don't satisfy
// the start/stop row conditions (HBASE-1829).
- Scan scan = createFilterScan(jobConf, iKey, iTimestamp,
- HiveHBaseInputFormatUtil.getStorageFormatOfKey(keyMapping.mappingSpec,
+ Scan scan = createFilterScan(jobConf, iKey, iTimestamp, HiveHBaseInputFormatUtil
+ .getStorageFormatOfKey(keyMapping.mappingSpec,
jobConf.get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string")));
// The list of families that have been added to the scan
@@ -360,7 +334,7 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
scan.addFamily(colMap.familyNameBytes);
addedFamilies.add(colMap.familyName);
} else {
- if(!addedFamilies.contains(colMap.familyName)){
+ if (!addedFamilies.contains(colMap.familyName)) {
// add the column only if the family has not already been added
scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes);
}
@@ -370,11 +344,10 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
Job job = new Job(jobConf);
JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job);
- Path [] tablePaths = FileInputFormat.getInputPaths(jobContext);
+ Path[] tablePaths = FileInputFormat.getInputPaths(jobContext);
- List<org.apache.hadoop.mapreduce.InputSplit> splits =
- super.getSplits(jobContext);
- InputSplit [] results = new InputSplit[splits.size()];
+ List<org.apache.hadoop.mapreduce.InputSplit> splits = super.getSplits(jobContext);
+ InputSplit[] results = new InputSplit[splits.size()];
for (int i = 0; i < splits.size(); i++) {
results[i] = new HBaseSplit((TableSplit) splits.get(i), tablePaths[0]);
@@ -383,21 +356,13 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
return results;
} finally {
closeTable();
- if (conn != null) {
- conn.close();
- conn = null;
- }
+ conn.close();
}
}
- @Override
- protected void finalize() throws Throwable {
+ @Override protected void finalize() throws Throwable {
try {
closeTable();
- if (conn != null) {
- conn.close();
- conn = null;
- }
} finally {
super.finalize();
}