You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2015/10/27 19:10:10 UTC
hive git commit: HIVE-12250 Zookeeper connection leaks in Hive's
HBaseHandler (Naveen, reviewed by Aihua and Szehon)
Repository: hive
Updated Branches:
refs/heads/master 601a48122 -> d5e8544e7
HIVE-12250 Zookeeper connection leaks in Hive's HBaseHandler (Naveen, reviewed by Aihua and Szehon)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d5e8544e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d5e8544e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d5e8544e
Branch: refs/heads/master
Commit: d5e8544e7106ba0879b176c3524e369833bd844b
Parents: 601a481
Author: Szehon Ho <sz...@cloudera.com>
Authored: Tue Oct 27 11:09:07 2015 -0700
Committer: Szehon Ho <sz...@cloudera.com>
Committed: Tue Oct 27 11:09:07 2015 -0700
----------------------------------------------------------------------
.../hive/hbase/HiveHBaseTableInputFormat.java | 105 ++++++++++---------
.../hive/hbase/HiveHBaseTableOutputFormat.java | 9 ++
2 files changed, 64 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/d5e8544e/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
index 8e72759..5f4a1e4 100644
--- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
+++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
@@ -107,6 +107,7 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
try {
recordReader.initialize(tableSplit, tac);
} catch (InterruptedException e) {
+ closeTable(); // Free up the HTable connections
throw new IOException("Failed to initialize RecordReader", e);
}
@@ -445,65 +446,69 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true);
- if (hbaseColumnsMapping == null) {
- throw new IOException(HBaseSerDe.HBASE_COLUMNS_MAPPING + " required for HBase Table.");
- }
-
- ColumnMappings columnMappings = null;
try {
- columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching);
- } catch (SerDeException e) {
- throw new IOException(e);
- }
+ if (hbaseColumnsMapping == null) {
+ throw new IOException(HBaseSerDe.HBASE_COLUMNS_MAPPING + " required for HBase Table.");
+ }
- int iKey = columnMappings.getKeyIndex();
- int iTimestamp = columnMappings.getTimestampIndex();
- ColumnMapping keyMapping = columnMappings.getKeyMapping();
-
- // Take filter pushdown into account while calculating splits; this
- // allows us to prune off regions immediately. Note that although
- // the Javadoc for the superclass getSplits says that it returns one
- // split per region, the implementation actually takes the scan
- // definition into account and excludes regions which don't satisfy
- // the start/stop row conditions (HBASE-1829).
- Scan scan = createFilterScan(jobConf, iKey, iTimestamp,
- HiveHBaseInputFormatUtil.getStorageFormatOfKey(keyMapping.mappingSpec,
- jobConf.get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string")));
-
- // The list of families that have been added to the scan
- List<String> addedFamilies = new ArrayList<String>();
-
- // REVIEW: are we supposed to be applying the getReadColumnIDs
- // same as in getRecordReader?
- for (ColumnMapping colMap : columnMappings) {
- if (colMap.hbaseRowKey || colMap.hbaseTimestamp) {
- continue;
+ ColumnMappings columnMappings = null;
+ try {
+ columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching);
+ } catch (SerDeException e) {
+ throw new IOException(e);
}
- if (colMap.qualifierName == null) {
- scan.addFamily(colMap.familyNameBytes);
- addedFamilies.add(colMap.familyName);
- } else {
- if(!addedFamilies.contains(colMap.familyName)){
- // add the column only if the family has not already been added
- scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes);
+ int iKey = columnMappings.getKeyIndex();
+ int iTimestamp = columnMappings.getTimestampIndex();
+ ColumnMapping keyMapping = columnMappings.getKeyMapping();
+
+ // Take filter pushdown into account while calculating splits; this
+ // allows us to prune off regions immediately. Note that although
+ // the Javadoc for the superclass getSplits says that it returns one
+ // split per region, the implementation actually takes the scan
+ // definition into account and excludes regions which don't satisfy
+ // the start/stop row conditions (HBASE-1829).
+ Scan scan = createFilterScan(jobConf, iKey, iTimestamp,
+ HiveHBaseInputFormatUtil.getStorageFormatOfKey(keyMapping.mappingSpec,
+ jobConf.get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string")));
+
+ // The list of families that have been added to the scan
+ List<String> addedFamilies = new ArrayList<String>();
+
+ // REVIEW: are we supposed to be applying the getReadColumnIDs
+ // same as in getRecordReader?
+ for (ColumnMapping colMap : columnMappings) {
+ if (colMap.hbaseRowKey || colMap.hbaseTimestamp) {
+ continue;
+ }
+
+ if (colMap.qualifierName == null) {
+ scan.addFamily(colMap.familyNameBytes);
+ addedFamilies.add(colMap.familyName);
+ } else {
+ if(!addedFamilies.contains(colMap.familyName)){
+ // add the column only if the family has not already been added
+ scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes);
+ }
}
}
- }
- setScan(scan);
+ setScan(scan);
- Job job = new Job(jobConf);
- JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job);
- Path [] tablePaths = FileInputFormat.getInputPaths(jobContext);
+ Job job = new Job(jobConf);
+ JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job);
+ Path [] tablePaths = FileInputFormat.getInputPaths(jobContext);
- List<org.apache.hadoop.mapreduce.InputSplit> splits =
- super.getSplits(jobContext);
- InputSplit [] results = new InputSplit[splits.size()];
+ List<org.apache.hadoop.mapreduce.InputSplit> splits =
+ super.getSplits(jobContext);
+ InputSplit [] results = new InputSplit[splits.size()];
- for (int i = 0; i < splits.size(); i++) {
- results[i] = new HBaseSplit((TableSplit) splits.get(i), tablePaths[0]);
- }
+ for (int i = 0; i < splits.size(); i++) {
+ results[i] = new HBaseSplit((TableSplit) splits.get(i), tablePaths[0]);
+ }
- return results;
+ return results;
+ } finally {
+ closeTable();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d5e8544e/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
index 3100885..0715a51 100644
--- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
+++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
@@ -145,5 +145,14 @@ public class HiveHBaseTableOutputFormat extends
}
m_table.put(put);
}
+
+ @Override
+ protected void finalize() throws Throwable {
+ try {
+ m_table.close();
+ } finally {
+ super.finalize();
+ }
+ }
}
}