You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/10/28 13:11:40 UTC

[47/55] [abbrv] hive git commit: HIVE-12250 Zookeeper connection leaks in Hive's HBaseHandler (Naveen, reviewed by Aihua and Szehon)

HIVE-12250 Zookeeper connection leaks in Hive's HBaseHandler (Naveen, reviewed by Aihua and Szehon)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d5e8544e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d5e8544e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d5e8544e

Branch: refs/heads/spark
Commit: d5e8544e7106ba0879b176c3524e369833bd844b
Parents: 601a481
Author: Szehon Ho <sz...@cloudera.com>
Authored: Tue Oct 27 11:09:07 2015 -0700
Committer: Szehon Ho <sz...@cloudera.com>
Committed: Tue Oct 27 11:09:07 2015 -0700

----------------------------------------------------------------------
 .../hive/hbase/HiveHBaseTableInputFormat.java   | 105 ++++++++++---------
 .../hive/hbase/HiveHBaseTableOutputFormat.java  |   9 ++
 2 files changed, 64 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d5e8544e/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
index 8e72759..5f4a1e4 100644
--- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
+++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
@@ -107,6 +107,7 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
     try {
       recordReader.initialize(tableSplit, tac);
     } catch (InterruptedException e) {
+      closeTable(); // Free up the HTable connections
       throw new IOException("Failed to initialize RecordReader", e);
     }
 
@@ -445,65 +446,69 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
     String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
     boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true);
 
-    if (hbaseColumnsMapping == null) {
-      throw new IOException(HBaseSerDe.HBASE_COLUMNS_MAPPING + " required for HBase Table.");
-    }
-
-    ColumnMappings columnMappings = null;
     try {
-      columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching);
-    } catch (SerDeException e) {
-      throw new IOException(e);
-    }
+      if (hbaseColumnsMapping == null) {
+        throw new IOException(HBaseSerDe.HBASE_COLUMNS_MAPPING + " required for HBase Table.");
+      }
 
-    int iKey = columnMappings.getKeyIndex();
-    int iTimestamp = columnMappings.getTimestampIndex();
-    ColumnMapping keyMapping = columnMappings.getKeyMapping();
-
-    // Take filter pushdown into account while calculating splits; this
-    // allows us to prune off regions immediately.  Note that although
-    // the Javadoc for the superclass getSplits says that it returns one
-    // split per region, the implementation actually takes the scan
-    // definition into account and excludes regions which don't satisfy
-    // the start/stop row conditions (HBASE-1829).
-    Scan scan = createFilterScan(jobConf, iKey, iTimestamp,
-        HiveHBaseInputFormatUtil.getStorageFormatOfKey(keyMapping.mappingSpec,
-            jobConf.get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string")));
-
-    // The list of families that have been added to the scan
-    List<String> addedFamilies = new ArrayList<String>();
-
-    // REVIEW:  are we supposed to be applying the getReadColumnIDs
-    // same as in getRecordReader?
-    for (ColumnMapping colMap : columnMappings) {
-      if (colMap.hbaseRowKey || colMap.hbaseTimestamp) {
-        continue;
+      ColumnMappings columnMappings = null;
+      try {
+        columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching);
+      } catch (SerDeException e) {
+        throw new IOException(e);
       }
 
-      if (colMap.qualifierName == null) {
-        scan.addFamily(colMap.familyNameBytes);
-        addedFamilies.add(colMap.familyName);
-      } else {
-        if(!addedFamilies.contains(colMap.familyName)){
-          // add the column only if the family has not already been added
-          scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes);
+      int iKey = columnMappings.getKeyIndex();
+      int iTimestamp = columnMappings.getTimestampIndex();
+      ColumnMapping keyMapping = columnMappings.getKeyMapping();
+
+      // Take filter pushdown into account while calculating splits; this
+      // allows us to prune off regions immediately.  Note that although
+      // the Javadoc for the superclass getSplits says that it returns one
+      // split per region, the implementation actually takes the scan
+      // definition into account and excludes regions which don't satisfy
+      // the start/stop row conditions (HBASE-1829).
+      Scan scan = createFilterScan(jobConf, iKey, iTimestamp,
+          HiveHBaseInputFormatUtil.getStorageFormatOfKey(keyMapping.mappingSpec,
+              jobConf.get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string")));
+
+      // The list of families that have been added to the scan
+      List<String> addedFamilies = new ArrayList<String>();
+
+      // REVIEW:  are we supposed to be applying the getReadColumnIDs
+      // same as in getRecordReader?
+      for (ColumnMapping colMap : columnMappings) {
+        if (colMap.hbaseRowKey || colMap.hbaseTimestamp) {
+          continue;
+        }
+
+        if (colMap.qualifierName == null) {
+          scan.addFamily(colMap.familyNameBytes);
+          addedFamilies.add(colMap.familyName);
+        } else {
+          if(!addedFamilies.contains(colMap.familyName)){
+            // add the column only if the family has not already been added
+            scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes);
+          }
         }
       }
-    }
-    setScan(scan);
+      setScan(scan);
 
-    Job job = new Job(jobConf);
-    JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job);
-    Path [] tablePaths = FileInputFormat.getInputPaths(jobContext);
+      Job job = new Job(jobConf);
+      JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job);
+      Path [] tablePaths = FileInputFormat.getInputPaths(jobContext);
 
-    List<org.apache.hadoop.mapreduce.InputSplit> splits =
-      super.getSplits(jobContext);
-    InputSplit [] results = new InputSplit[splits.size()];
+      List<org.apache.hadoop.mapreduce.InputSplit> splits =
+        super.getSplits(jobContext);
+      InputSplit [] results = new InputSplit[splits.size()];
 
-    for (int i = 0; i < splits.size(); i++) {
-      results[i] = new HBaseSplit((TableSplit) splits.get(i), tablePaths[0]);
-    }
+      for (int i = 0; i < splits.size(); i++) {
+        results[i] = new HBaseSplit((TableSplit) splits.get(i), tablePaths[0]);
+      }
 
-    return results;
+      return results;
+    } finally {
+      closeTable();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d5e8544e/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
index 3100885..0715a51 100644
--- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
+++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
@@ -145,5 +145,14 @@ public class HiveHBaseTableOutputFormat extends
       }
       m_table.put(put);
     }
+
+    @Override
+    protected void finalize() throws Throwable {
+      try {
+        m_table.close();
+      } finally {
+        super.finalize();
+      }
+    }
   }
 }