You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/04/23 08:11:59 UTC

[08/50] [abbrv] hbase git commit: HBASE-20421 HBasecontext creates a connection but does not close it (Yu Wang)

HBASE-20421 HBasecontext creates a connection but does not close it (Yu Wang)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f4f2b682
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f4f2b682
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f4f2b682

Branch: refs/heads/HBASE-19064
Commit: f4f2b68238a094d7b1931dc8b7939742ccbb2b57
Parents: fd2cec7
Author: tedyu <yu...@gmail.com>
Authored: Tue Apr 17 19:45:53 2018 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Tue Apr 17 19:45:53 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/spark/HBaseContext.scala       | 356 ++++++++++---------
 1 file changed, 181 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f4f2b682/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
index 656b8c2..e50a3e8 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
@@ -628,87 +628,90 @@ class HBaseContext(@transient val sc: SparkContext,
       throw new FileAlreadyExistsException("Path " + stagingDir + " already exists")
     }
     val conn = HBaseConnectionCache.getConnection(config)
-    val regionLocator = conn.getRegionLocator(tableName)
-    val startKeys = regionLocator.getStartKeys
-    if (startKeys.length == 0) {
-      logInfo("Table " + tableName.toString + " was not found")
-    }
-    val defaultCompressionStr = config.get("hfile.compression",
-      Compression.Algorithm.NONE.getName)
-    val hfileCompression = HFileWriterImpl
-      .compressionByName(defaultCompressionStr)
-    val nowTimeStamp = System.currentTimeMillis()
-    val tableRawName = tableName.getName
+    try {
+      val regionLocator = conn.getRegionLocator(tableName)
+      val startKeys = regionLocator.getStartKeys
+      if (startKeys.length == 0) {
+        logInfo("Table " + tableName.toString + " was not found")
+      }
+      val defaultCompressionStr = config.get("hfile.compression",
+        Compression.Algorithm.NONE.getName)
+      val hfileCompression = HFileWriterImpl
+        .compressionByName(defaultCompressionStr)
+      val nowTimeStamp = System.currentTimeMillis()
+      val tableRawName = tableName.getName
 
-    val familyHFileWriteOptionsMapInternal =
-      new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
+      val familyHFileWriteOptionsMapInternal =
+        new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
 
-    val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()
+      val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()
 
-    while (entrySetIt.hasNext) {
-      val entry = entrySetIt.next()
-      familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
-    }
+      while (entrySetIt.hasNext) {
+        val entry = entrySetIt.next()
+        familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
+      }
 
-    val regionSplitPartitioner =
-      new BulkLoadPartitioner(startKeys)
-
-    //This is where all the magic happens
-    //Here we are going to do the following things
-    // 1. FlapMap every row in the RDD into key column value tuples
-    // 2. Then we are going to repartition sort and shuffle
-    // 3. Finally we are going to write out our HFiles
-    rdd.flatMap( r => flatMap(r)).
-      repartitionAndSortWithinPartitions(regionSplitPartitioner).
-      hbaseForeachPartition(this, (it, conn) => {
-
-      val conf = broadcastedConf.value.value
-      val fs = FileSystem.get(conf)
-      val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
-      var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
-      var rollOverRequested = false
-      val localTableName = TableName.valueOf(tableRawName)
-
-      //Here is where we finally iterate through the data in this partition of the
-      //RDD that has been sorted and partitioned
-      it.foreach{ case (keyFamilyQualifier, cellValue:Array[Byte]) =>
-
-        val wl = writeValueToHFile(keyFamilyQualifier.rowKey,
-          keyFamilyQualifier.family,
-          keyFamilyQualifier.qualifier,
-          cellValue,
-          nowTimeStamp,
-          fs,
-          conn,
-          localTableName,
-          conf,
-          familyHFileWriteOptionsMapInternal,
-          hfileCompression,
-          writerMap,
-          stagingDir)
+      val regionSplitPartitioner =
+        new BulkLoadPartitioner(startKeys)
+
+      //This is where all the magic happens
+      //Here we are going to do the following things
+      // 1. FlapMap every row in the RDD into key column value tuples
+      // 2. Then we are going to repartition sort and shuffle
+      // 3. Finally we are going to write out our HFiles
+      rdd.flatMap( r => flatMap(r)).
+        repartitionAndSortWithinPartitions(regionSplitPartitioner).
+        hbaseForeachPartition(this, (it, conn) => {
+
+          val conf = broadcastedConf.value.value
+          val fs = FileSystem.get(conf)
+          val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
+          var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
+          var rollOverRequested = false
+          val localTableName = TableName.valueOf(tableRawName)
+
+          //Here is where we finally iterate through the data in this partition of the
+          //RDD that has been sorted and partitioned
+          it.foreach{ case (keyFamilyQualifier, cellValue:Array[Byte]) =>
+
+            val wl = writeValueToHFile(keyFamilyQualifier.rowKey,
+              keyFamilyQualifier.family,
+              keyFamilyQualifier.qualifier,
+              cellValue,
+              nowTimeStamp,
+              fs,
+              conn,
+              localTableName,
+              conf,
+              familyHFileWriteOptionsMapInternal,
+              hfileCompression,
+              writerMap,
+              stagingDir)
 
-        rollOverRequested = rollOverRequested || wl.written > maxSize
+            rollOverRequested = rollOverRequested || wl.written > maxSize
+
+            //This will only roll if we have at least one column family file that is
+            //bigger then maxSize and we have finished a given row key
+            if (rollOverRequested && Bytes.compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) {
+              rollWriters(fs, writerMap,
+                regionSplitPartitioner,
+                previousRow,
+                compactionExclude)
+              rollOverRequested = false
+            }
 
-        //This will only roll if we have at least one column family file that is
-        //bigger then maxSize and we have finished a given row key
-        if (rollOverRequested && Bytes.compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) {
+            previousRow = keyFamilyQualifier.rowKey
+          }
+          //We have finished all the data so lets close up the writers
           rollWriters(fs, writerMap,
             regionSplitPartitioner,
             previousRow,
             compactionExclude)
           rollOverRequested = false
-        }
-
-        previousRow = keyFamilyQualifier.rowKey
-      }
-      //We have finished all the data so lets close up the writers
-      rollWriters(fs, writerMap,
-        regionSplitPartitioner,
-        previousRow,
-        compactionExclude)
-      rollOverRequested = false
-    })
-    if(null != conn) conn.close()
+        })
+    } finally {
+      if(null != conn) conn.close()
+    }
   }
 
   /**
@@ -760,118 +763,121 @@ class HBaseContext(@transient val sc: SparkContext,
       throw new FileAlreadyExistsException("Path " + stagingDir + " already exists")
     }
     val conn = HBaseConnectionCache.getConnection(config)
-    val regionLocator = conn.getRegionLocator(tableName)
-    val startKeys = regionLocator.getStartKeys
-    if (startKeys.length == 0) {
-      logInfo("Table " + tableName.toString + " was not found")
-    }
-    val defaultCompressionStr = config.get("hfile.compression",
-      Compression.Algorithm.NONE.getName)
-    val defaultCompression = HFileWriterImpl
-      .compressionByName(defaultCompressionStr)
-    val nowTimeStamp = System.currentTimeMillis()
-    val tableRawName = tableName.getName
-
-    val familyHFileWriteOptionsMapInternal =
-      new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
-
-    val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()
-
-    while (entrySetIt.hasNext) {
-      val entry = entrySetIt.next()
-      familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
-    }
-
-    val regionSplitPartitioner =
-      new BulkLoadPartitioner(startKeys)
-
-    //This is where all the magic happens
-    //Here we are going to do the following things
-    // 1. FlapMap every row in the RDD into key column value tuples
-    // 2. Then we are going to repartition sort and shuffle
-    // 3. Finally we are going to write out our HFiles
-    rdd.map( r => mapFunction(r)).
-      repartitionAndSortWithinPartitions(regionSplitPartitioner).
-      hbaseForeachPartition(this, (it, conn) => {
-
-      val conf = broadcastedConf.value.value
-      val fs = FileSystem.get(conf)
-      val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
-      var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
-      var rollOverRequested = false
-      val localTableName = TableName.valueOf(tableRawName)
-
-      //Here is where we finally iterate through the data in this partition of the
-      //RDD that has been sorted and partitioned
-      it.foreach{ case (rowKey:ByteArrayWrapper,
-      familiesQualifiersValues:FamiliesQualifiersValues) =>
-
-
-        if (Bytes.compareTo(previousRow, rowKey.value) == 0) {
-          throw new KeyAlreadyExistsException("The following key was sent to the " +
-            "HFile load more then one: " + Bytes.toString(previousRow))
-        }
-
-        //The family map is a tree map so the families will be sorted
-        val familyIt = familiesQualifiersValues.familyMap.entrySet().iterator()
-        while (familyIt.hasNext) {
-          val familyEntry = familyIt.next()
-
-          val family = familyEntry.getKey.value
-
-          val qualifierIt = familyEntry.getValue.entrySet().iterator()
-
-          //The qualifier map is a tree map so the families will be sorted
-          while (qualifierIt.hasNext) {
+    try {
+      val regionLocator = conn.getRegionLocator(tableName)
+      val startKeys = regionLocator.getStartKeys
+      if (startKeys.length == 0) {
+        logInfo("Table " + tableName.toString + " was not found")
+      }
+      val defaultCompressionStr = config.get("hfile.compression",
+        Compression.Algorithm.NONE.getName)
+      val defaultCompression = HFileWriterImpl
+        .compressionByName(defaultCompressionStr)
+      val nowTimeStamp = System.currentTimeMillis()
+      val tableRawName = tableName.getName
 
-            val qualifierEntry = qualifierIt.next()
-            val qualifier = qualifierEntry.getKey
-            val cellValue = qualifierEntry.getValue
+      val familyHFileWriteOptionsMapInternal =
+        new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
 
-            writeValueToHFile(rowKey.value,
-              family,
-              qualifier.value, // qualifier
-              cellValue, // value
-              nowTimeStamp,
-              fs,
-              conn,
-              localTableName,
-              conf,
-              familyHFileWriteOptionsMapInternal,
-              defaultCompression,
-              writerMap,
-              stagingDir)
+      val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()
 
-            previousRow = rowKey.value
-          }
+      while (entrySetIt.hasNext) {
+        val entry = entrySetIt.next()
+        familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
+      }
 
-          writerMap.values.foreach( wl => {
-            rollOverRequested = rollOverRequested || wl.written > maxSize
+      val regionSplitPartitioner =
+        new BulkLoadPartitioner(startKeys)
+
+      //This is where all the magic happens
+      //Here we are going to do the following things
+      // 1. FlapMap every row in the RDD into key column value tuples
+      // 2. Then we are going to repartition sort and shuffle
+      // 3. Finally we are going to write out our HFiles
+      rdd.map( r => mapFunction(r)).
+        repartitionAndSortWithinPartitions(regionSplitPartitioner).
+        hbaseForeachPartition(this, (it, conn) => {
+
+          val conf = broadcastedConf.value.value
+          val fs = FileSystem.get(conf)
+          val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
+          var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
+          var rollOverRequested = false
+          val localTableName = TableName.valueOf(tableRawName)
+
+          //Here is where we finally iterate through the data in this partition of the
+          //RDD that has been sorted and partitioned
+          it.foreach{ case (rowKey:ByteArrayWrapper,
+          familiesQualifiersValues:FamiliesQualifiersValues) =>
+
+
+            if (Bytes.compareTo(previousRow, rowKey.value) == 0) {
+              throw new KeyAlreadyExistsException("The following key was sent to the " +
+                "HFile load more then one: " + Bytes.toString(previousRow))
+            }
 
-            //This will only roll if we have at least one column family file that is
-            //bigger then maxSize and we have finished a given row key
-            if (rollOverRequested) {
-              rollWriters(fs, writerMap,
-                regionSplitPartitioner,
-                previousRow,
-                compactionExclude)
-              rollOverRequested = false
+            //The family map is a tree map so the families will be sorted
+            val familyIt = familiesQualifiersValues.familyMap.entrySet().iterator()
+            while (familyIt.hasNext) {
+              val familyEntry = familyIt.next()
+
+              val family = familyEntry.getKey.value
+
+              val qualifierIt = familyEntry.getValue.entrySet().iterator()
+
+              //The qualifier map is a tree map so the families will be sorted
+              while (qualifierIt.hasNext) {
+
+                val qualifierEntry = qualifierIt.next()
+                val qualifier = qualifierEntry.getKey
+                val cellValue = qualifierEntry.getValue
+
+                writeValueToHFile(rowKey.value,
+                  family,
+                  qualifier.value, // qualifier
+                  cellValue, // value
+                  nowTimeStamp,
+                  fs,
+                  conn,
+                  localTableName,
+                  conf,
+                  familyHFileWriteOptionsMapInternal,
+                  defaultCompression,
+                  writerMap,
+                  stagingDir)
+
+                previousRow = rowKey.value
+              }
+
+              writerMap.values.foreach( wl => {
+                rollOverRequested = rollOverRequested || wl.written > maxSize
+
+                //This will only roll if we have at least one column family file that is
+                //bigger then maxSize and we have finished a given row key
+                if (rollOverRequested) {
+                  rollWriters(fs, writerMap,
+                    regionSplitPartitioner,
+                    previousRow,
+                    compactionExclude)
+                  rollOverRequested = false
+                }
+              })
             }
-          })
-        }
-      }
+          }
 
-      //This will get a writer for the column family
-      //If there is no writer for a given column family then
-      //it will get created here.
-      //We have finished all the data so lets close up the writers
-      rollWriters(fs, writerMap,
-        regionSplitPartitioner,
-        previousRow,
-        compactionExclude)
-      rollOverRequested = false
-    })
-    if(null != conn) conn.close()
+          //This will get a writer for the column family
+          //If there is no writer for a given column family then
+          //it will get created here.
+          //We have finished all the data so lets close up the writers
+          rollWriters(fs, writerMap,
+            regionSplitPartitioner,
+            previousRow,
+            compactionExclude)
+          rollOverRequested = false
+        })
+    } finally {
+      if(null != conn) conn.close()
+    }
   }
 
   /**