You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2018/03/27 15:46:18 UTC

hive git commit: HIVE-17098 : Race condition in Hbase tables (Oleksiy Sayankin via Zoltan Haindrich)

Repository: hive
Updated Branches:
  refs/heads/master a2394c5bf -> 95a1538ae


HIVE-17098 : Race condition in Hbase tables (Oleksiy Sayankin via Zoltan Haindrich)

Signed-off-by: Ashutosh Chauhan <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/95a1538a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/95a1538a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/95a1538a

Branch: refs/heads/master
Commit: 95a1538aed11d0f027c5fe8ac9d99bde6f69501a
Parents: a2394c5
Author: Oleksiy Sayankin <ol...@gmail.com>
Authored: Mon Jul 17 02:39:00 2017 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Tue Mar 27 08:45:35 2018 -0700

----------------------------------------------------------------------
 .../hive/hbase/HiveHBaseTableInputFormat.java   | 161 ++++++++-----------
 1 file changed, 63 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/95a1538a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
index 80c6485..069c9b9 100644
--- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
+++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
@@ -19,13 +19,13 @@
 package org.apache.hadoop.hive.hbase;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -35,36 +35,18 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapred.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase;
 import org.apache.hadoop.hbase.mapreduce.TableSplit;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping;
-import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator;
 import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
 import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.ByteStream;
 import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.io.ByteWritable;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.io.ShortWritable;
-import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -87,71 +69,58 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
     implements InputFormat<ImmutableBytesWritable, ResultWritable> {
 
   static final Logger LOG = LoggerFactory.getLogger(HiveHBaseTableInputFormat.class);
-  private static final Object hbaseTableMonitor = new Object();
-  private Connection conn = null;
+  private static final Object HBASE_TABLE_MONITOR = new Object();
 
-  @Override
-  public RecordReader<ImmutableBytesWritable, ResultWritable> getRecordReader(
-    InputSplit split,
-    JobConf jobConf,
-    final Reporter reporter) throws IOException {
+  @Override public RecordReader<ImmutableBytesWritable, ResultWritable> getRecordReader(InputSplit split,
+      JobConf jobConf, final Reporter reporter) throws IOException {
 
     HBaseSplit hbaseSplit = (HBaseSplit) split;
     TableSplit tableSplit = hbaseSplit.getTableSplit();
 
-    if (conn == null) {
-      conn = ConnectionFactory.createConnection(HBaseConfiguration.create(jobConf));
-    }
-    initializeTable(conn, tableSplit.getTable());
-
-    setScan(HiveHBaseInputFormatUtil.getScan(jobConf));
+    final org.apache.hadoop.mapreduce.RecordReader<ImmutableBytesWritable, Result> recordReader;
 
     Job job = new Job(jobConf);
-    TaskAttemptContext tac = ShimLoader.getHadoopShims().newTaskAttemptContext(
-        job.getConfiguration(), reporter);
+    TaskAttemptContext tac = ShimLoader.getHadoopShims().newTaskAttemptContext(job.getConfiguration(), reporter);
 
-    final org.apache.hadoop.mapreduce.RecordReader<ImmutableBytesWritable, Result>
-    recordReader = createRecordReader(tableSplit, tac);
-    try {
-      recordReader.initialize(tableSplit, tac);
-    } catch (InterruptedException e) {
-      closeTable(); // Free up the HTable connections
-      if (conn != null) {
+    final Connection conn;
+
+    synchronized (HBASE_TABLE_MONITOR) {
+      conn = ConnectionFactory.createConnection(HBaseConfiguration.create(jobConf));
+      initializeTable(conn, tableSplit.getTable());
+      setScan(HiveHBaseInputFormatUtil.getScan(jobConf));
+      recordReader = createRecordReader(tableSplit, tac);
+      try {
+        recordReader.initialize(tableSplit, tac);
+      } catch (InterruptedException e) {
+        closeTable(); // Free up the HTable connections
         conn.close();
-        conn = null;
+        throw new IOException("Failed to initialize RecordReader", e);
       }
-      throw new IOException("Failed to initialize RecordReader", e);
     }
 
     return new RecordReader<ImmutableBytesWritable, ResultWritable>() {
 
-      @Override
-      public void close() throws IOException {
-        recordReader.close();
-        closeTable();
-        if (conn != null) {
+      @Override public void close() throws IOException {
+        synchronized (HBASE_TABLE_MONITOR) {
+          recordReader.close();
+          closeTable();
           conn.close();
-          conn = null;
         }
       }
 
-      @Override
-      public ImmutableBytesWritable createKey() {
+      @Override public ImmutableBytesWritable createKey() {
         return new ImmutableBytesWritable();
       }
 
-      @Override
-      public ResultWritable createValue() {
+      @Override public ResultWritable createValue() {
         return new ResultWritable(new Result());
       }
 
-      @Override
-      public long getPos() throws IOException {
+      @Override public long getPos() throws IOException {
         return 0;
       }
 
-      @Override
-      public float getProgress() throws IOException {
+      @Override public float getProgress() throws IOException {
         float progress = 0.0F;
 
         try {
@@ -163,8 +132,7 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
         return progress;
       }
 
-      @Override
-      public boolean next(ImmutableBytesWritable rowKey, ResultWritable value) throws IOException {
+      @Override public boolean next(ImmutableBytesWritable rowKey, ResultWritable value) throws IOException {
 
         boolean next = false;
 
@@ -195,8 +163,7 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
    *
    * @return converted table split if any
    */
-  private Scan createFilterScan(JobConf jobConf, int iKey, int iTimestamp, boolean isKeyBinary)
-      throws IOException {
+  private Scan createFilterScan(JobConf jobConf, int iKey, int iTimestamp, boolean isKeyBinary) throws IOException {
 
     // TODO: assert iKey is HBaseSerDe#HBASE_KEY_COL
 
@@ -212,21 +179,19 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
       return scan;
     }
 
-    ExprNodeGenericFuncDesc filterExpr =
-        SerializationUtilities.deserializeExpression(filterExprSerialized);
+    ExprNodeGenericFuncDesc filterExpr = SerializationUtilities.deserializeExpression(filterExprSerialized);
 
     String keyColName = jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iKey];
     ArrayList<TypeInfo> cols = TypeInfoUtils.getTypeInfosFromTypeString(jobConf.get(serdeConstants.LIST_COLUMN_TYPES));
     String colType = cols.get(iKey).getTypeName();
-    boolean isKeyComparable = isKeyBinary || colType.equalsIgnoreCase("string");
+    boolean isKeyComparable = isKeyBinary || "string".equalsIgnoreCase(colType);
 
     String tsColName = null;
     if (iTimestamp >= 0) {
       tsColName = jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iTimestamp];
     }
 
-    IndexPredicateAnalyzer analyzer =
-        newIndexPredicateAnalyzer(keyColName, isKeyComparable, tsColName);
+    IndexPredicateAnalyzer analyzer = newIndexPredicateAnalyzer(keyColName, isKeyComparable, tsColName);
 
     List<IndexSearchCondition> conditions = new ArrayList<IndexSearchCondition>();
     ExprNodeDesc residualPredicate = analyzer.analyzePredicate(filterExpr, conditions);
@@ -263,8 +228,8 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
    *
    * @return preconfigured predicate analyzer
    */
-  static IndexPredicateAnalyzer newIndexPredicateAnalyzer(
-      String keyColumnName, boolean isKeyComparable, String timestampColumn) {
+  static IndexPredicateAnalyzer newIndexPredicateAnalyzer(String keyColumnName, boolean isKeyComparable,
+      String timestampColumn) {
 
     IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer();
 
@@ -273,20 +238,17 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
     // We can do other comparisons only if storage format in hbase is either binary
     // or we are dealing with string types since there lexicographic ordering will suffice.
     if (isKeyComparable) {
-      analyzer.addComparisonOp(keyColumnName,
-          "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual",
+      analyzer.addComparisonOp(keyColumnName, "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual",
           "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan",
           "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan",
           "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan",
           "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan");
     } else {
-      analyzer.addComparisonOp(keyColumnName,
-          "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual");
+      analyzer.addComparisonOp(keyColumnName, "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual");
     }
 
     if (timestampColumn != null) {
-      analyzer.addComparisonOp(timestampColumn,
-          "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual",
+      analyzer.addComparisonOp(timestampColumn, "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual",
           "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan",
           "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan",
           "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan",
@@ -296,10 +258,22 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
     return analyzer;
   }
 
-  @Override
-  public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
-    synchronized (hbaseTableMonitor) {
-      return getSplitsInternal(jobConf, numSplits);
+  @Override public InputSplit[] getSplits(final JobConf jobConf, final int numSplits) throws IOException {
+    synchronized (HBASE_TABLE_MONITOR) {
+      final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+      if (ugi == null) {
+        return getSplitsInternal(jobConf, numSplits);
+      }
+
+      try {
+        return ugi.doAs(new PrivilegedExceptionAction<InputSplit[]>() {
+          @Override public InputSplit[] run() throws IOException {
+            return getSplitsInternal(jobConf, numSplits);
+          }
+        });
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
     }
   }
 
@@ -311,9 +285,9 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
     }
 
     String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME);
-    if (conn == null) {
-      conn = ConnectionFactory.createConnection(HBaseConfiguration.create(jobConf));
-    }
+
+    Connection conn = ConnectionFactory.createConnection(HBaseConfiguration.create(jobConf));
+
     TableName tableName = TableName.valueOf(hbaseTableName);
     initializeTable(conn, tableName);
 
@@ -342,8 +316,8 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
       // split per region, the implementation actually takes the scan
       // definition into account and excludes regions which don't satisfy
       // the start/stop row conditions (HBASE-1829).
-      Scan scan = createFilterScan(jobConf, iKey, iTimestamp,
-          HiveHBaseInputFormatUtil.getStorageFormatOfKey(keyMapping.mappingSpec,
+      Scan scan = createFilterScan(jobConf, iKey, iTimestamp, HiveHBaseInputFormatUtil
+          .getStorageFormatOfKey(keyMapping.mappingSpec,
               jobConf.get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string")));
 
       // The list of families that have been added to the scan
@@ -360,7 +334,7 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
           scan.addFamily(colMap.familyNameBytes);
           addedFamilies.add(colMap.familyName);
         } else {
-          if(!addedFamilies.contains(colMap.familyName)){
+          if (!addedFamilies.contains(colMap.familyName)) {
             // add the column only if the family has not already been added
             scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes);
           }
@@ -370,11 +344,10 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
 
       Job job = new Job(jobConf);
       JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job);
-      Path [] tablePaths = FileInputFormat.getInputPaths(jobContext);
+      Path[] tablePaths = FileInputFormat.getInputPaths(jobContext);
 
-      List<org.apache.hadoop.mapreduce.InputSplit> splits =
-        super.getSplits(jobContext);
-      InputSplit [] results = new InputSplit[splits.size()];
+      List<org.apache.hadoop.mapreduce.InputSplit> splits = super.getSplits(jobContext);
+      InputSplit[] results = new InputSplit[splits.size()];
 
       for (int i = 0; i < splits.size(); i++) {
         results[i] = new HBaseSplit((TableSplit) splits.get(i), tablePaths[0]);
@@ -383,21 +356,13 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
       return results;
     } finally {
       closeTable();
-      if (conn != null) {
-        conn.close();
-        conn = null;
-      }
+      conn.close();
     }
   }
 
-  @Override
-  protected void finalize() throws Throwable {
+  @Override protected void finalize() throws Throwable {
     try {
       closeTable();
-      if (conn != null) {
-        conn.close();
-        conn = null;
-      }
     } finally {
       super.finalize();
     }