You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/04/23 08:11:59 UTC
[08/50] [abbrv] hbase git commit: HBASE-20421 HBasecontext creates a
connection but does not close it (Yu Wang)
HBASE-20421 HBasecontext creates a connection but does not close it (Yu Wang)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f4f2b682
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f4f2b682
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f4f2b682
Branch: refs/heads/HBASE-19064
Commit: f4f2b68238a094d7b1931dc8b7939742ccbb2b57
Parents: fd2cec7
Author: tedyu <yu...@gmail.com>
Authored: Tue Apr 17 19:45:53 2018 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Tue Apr 17 19:45:53 2018 -0700
----------------------------------------------------------------------
.../hadoop/hbase/spark/HBaseContext.scala | 356 ++++++++++---------
1 file changed, 181 insertions(+), 175 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/f4f2b682/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
index 656b8c2..e50a3e8 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
@@ -628,87 +628,90 @@ class HBaseContext(@transient val sc: SparkContext,
throw new FileAlreadyExistsException("Path " + stagingDir + " already exists")
}
val conn = HBaseConnectionCache.getConnection(config)
- val regionLocator = conn.getRegionLocator(tableName)
- val startKeys = regionLocator.getStartKeys
- if (startKeys.length == 0) {
- logInfo("Table " + tableName.toString + " was not found")
- }
- val defaultCompressionStr = config.get("hfile.compression",
- Compression.Algorithm.NONE.getName)
- val hfileCompression = HFileWriterImpl
- .compressionByName(defaultCompressionStr)
- val nowTimeStamp = System.currentTimeMillis()
- val tableRawName = tableName.getName
+ try {
+ val regionLocator = conn.getRegionLocator(tableName)
+ val startKeys = regionLocator.getStartKeys
+ if (startKeys.length == 0) {
+ logInfo("Table " + tableName.toString + " was not found")
+ }
+ val defaultCompressionStr = config.get("hfile.compression",
+ Compression.Algorithm.NONE.getName)
+ val hfileCompression = HFileWriterImpl
+ .compressionByName(defaultCompressionStr)
+ val nowTimeStamp = System.currentTimeMillis()
+ val tableRawName = tableName.getName
- val familyHFileWriteOptionsMapInternal =
- new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
+ val familyHFileWriteOptionsMapInternal =
+ new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
- val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()
+ val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()
- while (entrySetIt.hasNext) {
- val entry = entrySetIt.next()
- familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
- }
+ while (entrySetIt.hasNext) {
+ val entry = entrySetIt.next()
+ familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
+ }
- val regionSplitPartitioner =
- new BulkLoadPartitioner(startKeys)
-
- //This is where all the magic happens
- //Here we are going to do the following things
- // 1. FlapMap every row in the RDD into key column value tuples
- // 2. Then we are going to repartition sort and shuffle
- // 3. Finally we are going to write out our HFiles
- rdd.flatMap( r => flatMap(r)).
- repartitionAndSortWithinPartitions(regionSplitPartitioner).
- hbaseForeachPartition(this, (it, conn) => {
-
- val conf = broadcastedConf.value.value
- val fs = FileSystem.get(conf)
- val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
- var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
- var rollOverRequested = false
- val localTableName = TableName.valueOf(tableRawName)
-
- //Here is where we finally iterate through the data in this partition of the
- //RDD that has been sorted and partitioned
- it.foreach{ case (keyFamilyQualifier, cellValue:Array[Byte]) =>
-
- val wl = writeValueToHFile(keyFamilyQualifier.rowKey,
- keyFamilyQualifier.family,
- keyFamilyQualifier.qualifier,
- cellValue,
- nowTimeStamp,
- fs,
- conn,
- localTableName,
- conf,
- familyHFileWriteOptionsMapInternal,
- hfileCompression,
- writerMap,
- stagingDir)
+ val regionSplitPartitioner =
+ new BulkLoadPartitioner(startKeys)
+
+ //This is where all the magic happens
+ //Here we are going to do the following things
+ // 1. FlapMap every row in the RDD into key column value tuples
+ // 2. Then we are going to repartition sort and shuffle
+ // 3. Finally we are going to write out our HFiles
+ rdd.flatMap( r => flatMap(r)).
+ repartitionAndSortWithinPartitions(regionSplitPartitioner).
+ hbaseForeachPartition(this, (it, conn) => {
+
+ val conf = broadcastedConf.value.value
+ val fs = FileSystem.get(conf)
+ val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
+ var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
+ var rollOverRequested = false
+ val localTableName = TableName.valueOf(tableRawName)
+
+ //Here is where we finally iterate through the data in this partition of the
+ //RDD that has been sorted and partitioned
+ it.foreach{ case (keyFamilyQualifier, cellValue:Array[Byte]) =>
+
+ val wl = writeValueToHFile(keyFamilyQualifier.rowKey,
+ keyFamilyQualifier.family,
+ keyFamilyQualifier.qualifier,
+ cellValue,
+ nowTimeStamp,
+ fs,
+ conn,
+ localTableName,
+ conf,
+ familyHFileWriteOptionsMapInternal,
+ hfileCompression,
+ writerMap,
+ stagingDir)
- rollOverRequested = rollOverRequested || wl.written > maxSize
+ rollOverRequested = rollOverRequested || wl.written > maxSize
+
+ //This will only roll if we have at least one column family file that is
+ //bigger then maxSize and we have finished a given row key
+ if (rollOverRequested && Bytes.compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) {
+ rollWriters(fs, writerMap,
+ regionSplitPartitioner,
+ previousRow,
+ compactionExclude)
+ rollOverRequested = false
+ }
- //This will only roll if we have at least one column family file that is
- //bigger then maxSize and we have finished a given row key
- if (rollOverRequested && Bytes.compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) {
+ previousRow = keyFamilyQualifier.rowKey
+ }
+ //We have finished all the data so lets close up the writers
rollWriters(fs, writerMap,
regionSplitPartitioner,
previousRow,
compactionExclude)
rollOverRequested = false
- }
-
- previousRow = keyFamilyQualifier.rowKey
- }
- //We have finished all the data so lets close up the writers
- rollWriters(fs, writerMap,
- regionSplitPartitioner,
- previousRow,
- compactionExclude)
- rollOverRequested = false
- })
- if(null != conn) conn.close()
+ })
+ } finally {
+ if(null != conn) conn.close()
+ }
}
/**
@@ -760,118 +763,121 @@ class HBaseContext(@transient val sc: SparkContext,
throw new FileAlreadyExistsException("Path " + stagingDir + " already exists")
}
val conn = HBaseConnectionCache.getConnection(config)
- val regionLocator = conn.getRegionLocator(tableName)
- val startKeys = regionLocator.getStartKeys
- if (startKeys.length == 0) {
- logInfo("Table " + tableName.toString + " was not found")
- }
- val defaultCompressionStr = config.get("hfile.compression",
- Compression.Algorithm.NONE.getName)
- val defaultCompression = HFileWriterImpl
- .compressionByName(defaultCompressionStr)
- val nowTimeStamp = System.currentTimeMillis()
- val tableRawName = tableName.getName
-
- val familyHFileWriteOptionsMapInternal =
- new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
-
- val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()
-
- while (entrySetIt.hasNext) {
- val entry = entrySetIt.next()
- familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
- }
-
- val regionSplitPartitioner =
- new BulkLoadPartitioner(startKeys)
-
- //This is where all the magic happens
- //Here we are going to do the following things
- // 1. FlapMap every row in the RDD into key column value tuples
- // 2. Then we are going to repartition sort and shuffle
- // 3. Finally we are going to write out our HFiles
- rdd.map( r => mapFunction(r)).
- repartitionAndSortWithinPartitions(regionSplitPartitioner).
- hbaseForeachPartition(this, (it, conn) => {
-
- val conf = broadcastedConf.value.value
- val fs = FileSystem.get(conf)
- val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
- var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
- var rollOverRequested = false
- val localTableName = TableName.valueOf(tableRawName)
-
- //Here is where we finally iterate through the data in this partition of the
- //RDD that has been sorted and partitioned
- it.foreach{ case (rowKey:ByteArrayWrapper,
- familiesQualifiersValues:FamiliesQualifiersValues) =>
-
-
- if (Bytes.compareTo(previousRow, rowKey.value) == 0) {
- throw new KeyAlreadyExistsException("The following key was sent to the " +
- "HFile load more then one: " + Bytes.toString(previousRow))
- }
-
- //The family map is a tree map so the families will be sorted
- val familyIt = familiesQualifiersValues.familyMap.entrySet().iterator()
- while (familyIt.hasNext) {
- val familyEntry = familyIt.next()
-
- val family = familyEntry.getKey.value
-
- val qualifierIt = familyEntry.getValue.entrySet().iterator()
-
- //The qualifier map is a tree map so the families will be sorted
- while (qualifierIt.hasNext) {
+ try {
+ val regionLocator = conn.getRegionLocator(tableName)
+ val startKeys = regionLocator.getStartKeys
+ if (startKeys.length == 0) {
+ logInfo("Table " + tableName.toString + " was not found")
+ }
+ val defaultCompressionStr = config.get("hfile.compression",
+ Compression.Algorithm.NONE.getName)
+ val defaultCompression = HFileWriterImpl
+ .compressionByName(defaultCompressionStr)
+ val nowTimeStamp = System.currentTimeMillis()
+ val tableRawName = tableName.getName
- val qualifierEntry = qualifierIt.next()
- val qualifier = qualifierEntry.getKey
- val cellValue = qualifierEntry.getValue
+ val familyHFileWriteOptionsMapInternal =
+ new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
- writeValueToHFile(rowKey.value,
- family,
- qualifier.value, // qualifier
- cellValue, // value
- nowTimeStamp,
- fs,
- conn,
- localTableName,
- conf,
- familyHFileWriteOptionsMapInternal,
- defaultCompression,
- writerMap,
- stagingDir)
+ val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()
- previousRow = rowKey.value
- }
+ while (entrySetIt.hasNext) {
+ val entry = entrySetIt.next()
+ familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
+ }
- writerMap.values.foreach( wl => {
- rollOverRequested = rollOverRequested || wl.written > maxSize
+ val regionSplitPartitioner =
+ new BulkLoadPartitioner(startKeys)
+
+ //This is where all the magic happens
+ //Here we are going to do the following things
+ // 1. FlapMap every row in the RDD into key column value tuples
+ // 2. Then we are going to repartition sort and shuffle
+ // 3. Finally we are going to write out our HFiles
+ rdd.map( r => mapFunction(r)).
+ repartitionAndSortWithinPartitions(regionSplitPartitioner).
+ hbaseForeachPartition(this, (it, conn) => {
+
+ val conf = broadcastedConf.value.value
+ val fs = FileSystem.get(conf)
+ val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
+ var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
+ var rollOverRequested = false
+ val localTableName = TableName.valueOf(tableRawName)
+
+ //Here is where we finally iterate through the data in this partition of the
+ //RDD that has been sorted and partitioned
+ it.foreach{ case (rowKey:ByteArrayWrapper,
+ familiesQualifiersValues:FamiliesQualifiersValues) =>
+
+
+ if (Bytes.compareTo(previousRow, rowKey.value) == 0) {
+ throw new KeyAlreadyExistsException("The following key was sent to the " +
+ "HFile load more then one: " + Bytes.toString(previousRow))
+ }
- //This will only roll if we have at least one column family file that is
- //bigger then maxSize and we have finished a given row key
- if (rollOverRequested) {
- rollWriters(fs, writerMap,
- regionSplitPartitioner,
- previousRow,
- compactionExclude)
- rollOverRequested = false
+ //The family map is a tree map so the families will be sorted
+ val familyIt = familiesQualifiersValues.familyMap.entrySet().iterator()
+ while (familyIt.hasNext) {
+ val familyEntry = familyIt.next()
+
+ val family = familyEntry.getKey.value
+
+ val qualifierIt = familyEntry.getValue.entrySet().iterator()
+
+ //The qualifier map is a tree map so the families will be sorted
+ while (qualifierIt.hasNext) {
+
+ val qualifierEntry = qualifierIt.next()
+ val qualifier = qualifierEntry.getKey
+ val cellValue = qualifierEntry.getValue
+
+ writeValueToHFile(rowKey.value,
+ family,
+ qualifier.value, // qualifier
+ cellValue, // value
+ nowTimeStamp,
+ fs,
+ conn,
+ localTableName,
+ conf,
+ familyHFileWriteOptionsMapInternal,
+ defaultCompression,
+ writerMap,
+ stagingDir)
+
+ previousRow = rowKey.value
+ }
+
+ writerMap.values.foreach( wl => {
+ rollOverRequested = rollOverRequested || wl.written > maxSize
+
+ //This will only roll if we have at least one column family file that is
+ //bigger then maxSize and we have finished a given row key
+ if (rollOverRequested) {
+ rollWriters(fs, writerMap,
+ regionSplitPartitioner,
+ previousRow,
+ compactionExclude)
+ rollOverRequested = false
+ }
+ })
}
- })
- }
- }
+ }
- //This will get a writer for the column family
- //If there is no writer for a given column family then
- //it will get created here.
- //We have finished all the data so lets close up the writers
- rollWriters(fs, writerMap,
- regionSplitPartitioner,
- previousRow,
- compactionExclude)
- rollOverRequested = false
- })
- if(null != conn) conn.close()
+ //This will get a writer for the column family
+ //If there is no writer for a given column family then
+ //it will get created here.
+ //We have finished all the data so lets close up the writers
+ rollWriters(fs, writerMap,
+ regionSplitPartitioner,
+ previousRow,
+ compactionExclude)
+ rollOverRequested = false
+ })
+ } finally {
+ if(null != conn) conn.close()
+ }
}
/**