You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2017/03/16 17:12:31 UTC

[03/14] hbase git commit: HBASE-15597 Clean up configuration keys used in hbase-spark module (Yi Liang)

HBASE-15597 Clean up configuration keys used in hbase-spark module (Yi Liang)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/35d7a0cd
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/35d7a0cd
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/35d7a0cd

Branch: refs/heads/hbase-12439
Commit: 35d7a0cd0798cabe7df5766fcc993512eca6c92e
Parents: fee67bc
Author: Jerry He <je...@apache.org>
Authored: Mon Mar 13 12:02:07 2017 -0700
Committer: Jerry He <je...@apache.org>
Committed: Mon Mar 13 12:02:07 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/spark/DefaultSource.scala      | 28 ++++-----
 .../hbase/spark/HBaseConnectionCache.scala      |  2 +-
 .../spark/datasources/HBaseSparkConf.scala      | 62 ++++++++++++--------
 .../hadoop/hbase/spark/DefaultSourceSuite.scala | 16 ++---
 .../spark/DynamicLogicExpressionSuite.scala     |  2 +-
 .../hadoop/hbase/spark/HBaseTestSource.scala    | 13 ++--
 .../hbase/spark/PartitionFilterSuite.scala      |  6 +-
 7 files changed, 69 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
index a8b2ab8..b2b646a 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
@@ -97,36 +97,36 @@ case class HBaseRelation (
   )(@transient val sqlContext: SQLContext)
   extends BaseRelation with PrunedFilteredScan  with InsertableRelation  with Logging {
   val timestamp = parameters.get(HBaseSparkConf.TIMESTAMP).map(_.toLong)
-  val minTimestamp = parameters.get(HBaseSparkConf.MIN_TIMESTAMP).map(_.toLong)
-  val maxTimestamp = parameters.get(HBaseSparkConf.MAX_TIMESTAMP).map(_.toLong)
+  val minTimestamp = parameters.get(HBaseSparkConf.TIMERANGE_START).map(_.toLong)
+  val maxTimestamp = parameters.get(HBaseSparkConf.TIMERANGE_END).map(_.toLong)
   val maxVersions = parameters.get(HBaseSparkConf.MAX_VERSIONS).map(_.toInt)
-  val encoderClsName = parameters.get(HBaseSparkConf.ENCODER).getOrElse(HBaseSparkConf.defaultEncoder)
+  val encoderClsName = parameters.get(HBaseSparkConf.QUERY_ENCODER).getOrElse(HBaseSparkConf.DEFAULT_QUERY_ENCODER)
 
   @transient val encoder = JavaBytesEncoder.create(encoderClsName)
 
   val catalog = HBaseTableCatalog(parameters)
   def tableName = catalog.name
-  val configResources = parameters.getOrElse(HBaseSparkConf.HBASE_CONFIG_RESOURCES_LOCATIONS, "")
-  val useHBaseContext =  parameters.get(HBaseSparkConf.USE_HBASE_CONTEXT).map(_.toBoolean).getOrElse(true)
-  val usePushDownColumnFilter = parameters.get(HBaseSparkConf.PUSH_DOWN_COLUMN_FILTER)
-    .map(_.toBoolean).getOrElse(true)
+  val configResources = parameters.getOrElse(HBaseSparkConf.HBASE_CONFIG_LOCATION, "")
+  val useHBaseContext =  parameters.get(HBaseSparkConf.USE_HBASECONTEXT).map(_.toBoolean).getOrElse(HBaseSparkConf.DEFAULT_USE_HBASECONTEXT)
+  val usePushDownColumnFilter = parameters.get(HBaseSparkConf.PUSHDOWN_COLUMN_FILTER)
+    .map(_.toBoolean).getOrElse(HBaseSparkConf.DEFAULT_PUSHDOWN_COLUMN_FILTER)
 
   // The user supplied per table parameter will overwrite global ones in SparkConf
-  val blockCacheEnable = parameters.get(HBaseSparkConf.BLOCK_CACHE_ENABLE).map(_.toBoolean)
+  val blockCacheEnable = parameters.get(HBaseSparkConf.QUERY_CACHEBLOCKS).map(_.toBoolean)
     .getOrElse(
       sqlContext.sparkContext.getConf.getBoolean(
-        HBaseSparkConf.BLOCK_CACHE_ENABLE, HBaseSparkConf.defaultBlockCacheEnable))
-  val cacheSize = parameters.get(HBaseSparkConf.CACHE_SIZE).map(_.toInt)
+        HBaseSparkConf.QUERY_CACHEBLOCKS, HBaseSparkConf.DEFAULT_QUERY_CACHEBLOCKS))
+  val cacheSize = parameters.get(HBaseSparkConf.QUERY_CACHEDROWS).map(_.toInt)
     .getOrElse(
       sqlContext.sparkContext.getConf.getInt(
-      HBaseSparkConf.CACHE_SIZE, HBaseSparkConf.defaultCachingSize))
-  val batchNum = parameters.get(HBaseSparkConf.BATCH_NUM).map(_.toInt)
+      HBaseSparkConf.QUERY_CACHEDROWS, -1))
+  val batchNum = parameters.get(HBaseSparkConf.QUERY_BATCHSIZE).map(_.toInt)
     .getOrElse(sqlContext.sparkContext.getConf.getInt(
-    HBaseSparkConf.BATCH_NUM,  HBaseSparkConf.defaultBatchNum))
+    HBaseSparkConf.QUERY_BATCHSIZE,  -1))
 
   val bulkGetSize =  parameters.get(HBaseSparkConf.BULKGET_SIZE).map(_.toInt)
     .getOrElse(sqlContext.sparkContext.getConf.getInt(
-    HBaseSparkConf.BULKGET_SIZE,  HBaseSparkConf.defaultBulkGetSize))
+    HBaseSparkConf.BULKGET_SIZE,  HBaseSparkConf.DEFAULT_BULKGET_SIZE))
 
   //create or get latest HBaseContext
   val hbaseContext:HBaseContext = if (useHBaseContext) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
index fb5833e..2858da8 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
@@ -37,7 +37,7 @@ private[spark] object HBaseConnectionCache extends Logging {
   val cacheStat = HBaseConnectionCacheStat(0, 0, 0)
 
   // in milliseconds
-  private final val DEFAULT_TIME_OUT: Long = HBaseSparkConf.connectionCloseDelay
+  private final val DEFAULT_TIME_OUT: Long = HBaseSparkConf.DEFAULT_CONNECTION_CLOSE_DELAY
   private var timeout = DEFAULT_TIME_OUT
   private var closed: Boolean = false
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
index 0f20d1d..8c1cb35 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
@@ -20,35 +20,45 @@ package org.apache.hadoop.hbase.spark.datasources
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 
+/**
+ * This is the hbase configuration. User can either set them in SparkConf, which
+ * will take effect globally, or configure it per table, which will overwrite the value
+ * set in SparkConf. If not set, the default value will take effect.
+ */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 object HBaseSparkConf{
-  // This is the hbase configuration. User can either set them in SparkConf, which
-  // will take effect globally, or configure it per table, which will overwrite the value
-  // set in SparkConf. If not setted, the default value will take effect.
-  val BLOCK_CACHE_ENABLE = "spark.hbase.blockcache.enable"
-  // default block cache is set to true by default following hbase convention, but note that
-  // this potentially may slow down the system
-  val defaultBlockCacheEnable = true
-  val CACHE_SIZE = "spark.hbase.cacheSize"
-  val defaultCachingSize = 1000
-  val BATCH_NUM = "spark.hbase.batchNum"
-  val defaultBatchNum = 1000
-  val BULKGET_SIZE = "spark.hbase.bulkGetSize"
-  val defaultBulkGetSize = 1000
-
-  val HBASE_CONFIG_RESOURCES_LOCATIONS = "hbase.config.resources"
-  val USE_HBASE_CONTEXT = "hbase.use.hbase.context"
-  val PUSH_DOWN_COLUMN_FILTER = "hbase.pushdown.column.filter"
-  val defaultPushDownColumnFilter = true
-
+  /** Set to false to disable server-side caching of blocks for this scan,
+   *  false by default, since full table scans generate too much BC churn.
+   */
+  val QUERY_CACHEBLOCKS = "hbase.spark.query.cacheblocks"
+  val DEFAULT_QUERY_CACHEBLOCKS = false
+  /** The number of rows for caching that will be passed to scan. */
+  val QUERY_CACHEDROWS = "hbase.spark.query.cachedrows"
+  /** Set the maximum number of values to return for each call to next() in scan. */
+  val QUERY_BATCHSIZE = "hbase.spark.query.batchsize"
+  /** The number of BulkGets send to HBase. */
+  val BULKGET_SIZE = "hbase.spark.bulkget.size"
+  val DEFAULT_BULKGET_SIZE = 1000
+  /** Set to specify the location of hbase configuration file. */
+  val HBASE_CONFIG_LOCATION = "hbase.spark.config.location"
+  /** Set to specify whether create or use latest cached HBaseContext*/
+  val USE_HBASECONTEXT = "hbase.spark.use.hbasecontext"
+  val DEFAULT_USE_HBASECONTEXT = true
+  /** Pushdown the filter to data source engine to increase the performance of queries. */
+  val PUSHDOWN_COLUMN_FILTER = "hbase.spark.pushdown.columnfilter"
+  val DEFAULT_PUSHDOWN_COLUMN_FILTER= true
+  /** Class name of the encoder, which encode data types from Spark to HBase bytes. */
+  val QUERY_ENCODER = "hbase.spark.query.encoder"
+  val DEFAULT_QUERY_ENCODER = classOf[NaiveEncoder].getCanonicalName
+  /** The timestamp used to filter columns with a specific timestamp. */
   val TIMESTAMP = "hbase.spark.query.timestamp"
-  val MIN_TIMESTAMP = "hbase.spark.query.minTimestamp"
-  val MAX_TIMESTAMP = "hbase.spark.query.maxTimestamp"
+  /** The starting timestamp used to filter columns with a specific range of versions. */
+  val TIMERANGE_START = "hbase.spark.query.timerange.start"
+  /** The ending timestamp used to filter columns with a specific range of versions. */
+  val TIMERANGE_END =  "hbase.spark.query.timerange.end"
+  /** The maximum number of version to return. */
   val MAX_VERSIONS = "hbase.spark.query.maxVersions"
-  val ENCODER = "hbase.spark.query.encoder"
-  val defaultEncoder = classOf[NaiveEncoder].getCanonicalName
-
-  // in milliseconds
-  val connectionCloseDelay = 10 * 60 * 1000
+  /** Delayed time to close hbase-spark connection when no reference to this connection, in milliseconds. */
+  val DEFAULT_CONNECTION_CLOSE_DELAY = 10 * 60 * 1000
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
index 7b8b844..3bce041 100644
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
@@ -116,9 +116,9 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
     TEST_UTIL.createTable(TableName.valueOf(t2TableName), Bytes.toBytes(columnFamily))
     logInfo(" - created table")
     val sparkConf = new SparkConf
-    sparkConf.set(HBaseSparkConf.BLOCK_CACHE_ENABLE, "true")
-    sparkConf.set(HBaseSparkConf.BATCH_NUM, "100")
-    sparkConf.set(HBaseSparkConf.CACHE_SIZE, "100")
+    sparkConf.set(HBaseSparkConf.QUERY_CACHEBLOCKS, "true")
+    sparkConf.set(HBaseSparkConf.QUERY_BATCHSIZE, "100")
+    sparkConf.set(HBaseSparkConf.QUERY_CACHEDROWS, "100")
 
     sc  = new SparkContext("local", "test", sparkConf)
 
@@ -791,7 +791,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
           |}""".stripMargin
     df = sqlContext.load("org.apache.hadoop.hbase.spark",
       Map(HBaseTableCatalog.tableCatalog->catalog,
-        HBaseSparkConf.PUSH_DOWN_COLUMN_FILTER -> "false"))
+        HBaseSparkConf.PUSHDOWN_COLUMN_FILTER -> "false"))
 
     df.registerTempTable("hbaseNoPushDownTmp")
 
@@ -913,8 +913,8 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
 
     // Test Getting old stuff -- Full Scan, TimeRange
     val oldRange = sqlContext.read
-      .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0",
-        HBaseSparkConf.MAX_TIMESTAMP -> (oldMs + 100).toString))
+      .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMERANGE_START -> "0",
+        HBaseSparkConf.TIMERANGE_END -> (oldMs + 100).toString))
       .format("org.apache.hadoop.hbase.spark")
       .load()
     assert(oldRange.count() == 101)
@@ -924,8 +924,8 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
 
     // Test Getting middle stuff -- Full Scan, TimeRange
     val middleRange = sqlContext.read
-      .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0",
-        HBaseSparkConf.MAX_TIMESTAMP -> (startMs + 100).toString))
+      .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMERANGE_START -> "0",
+        HBaseSparkConf.TIMERANGE_END -> (startMs + 100).toString))
       .format("org.apache.hadoop.hbase.spark")
       .load()
     assert(middleRange.count() == 256)

http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala
index b9c15ce..bc833e8 100644
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala
@@ -28,7 +28,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
 class DynamicLogicExpressionSuite  extends FunSuite with
 BeforeAndAfterEach with BeforeAndAfterAll with Logging {
 
-  val encoder = JavaBytesEncoder.create(HBaseSparkConf.defaultEncoder)
+  val encoder = JavaBytesEncoder.create(HBaseSparkConf.DEFAULT_QUERY_ENCODER)
 
   test("Basic And Test") {
     val leftLogic = new LessThanLogicExpression("Col1", 0)

http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala
index 83465d9..ccb4625 100644
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala
@@ -49,13 +49,12 @@ case class DummyScan(
   override def buildScan(): RDD[Row] = sqlContext.sparkContext.parallelize(0 until rowNum)
     .map(Row(_))
     .map{ x =>
-      if (sparkConf.getInt(HBaseSparkConf.BATCH_NUM,
-        HBaseSparkConf.defaultBatchNum) != batchNum ||
-        sparkConf.getInt(HBaseSparkConf.CACHE_SIZE,
-          HBaseSparkConf.defaultCachingSize) != cacheSize ||
-        sparkConf.getBoolean(HBaseSparkConf.BLOCK_CACHE_ENABLE,
-          HBaseSparkConf.defaultBlockCacheEnable)
-          != blockCachingEnable) {
+      if (sparkConf.getInt(HBaseSparkConf.QUERY_BATCHSIZE,
+          -1) != batchNum ||
+        sparkConf.getInt(HBaseSparkConf.QUERY_CACHEDROWS,
+          -1) != cacheSize ||
+        sparkConf.getBoolean(HBaseSparkConf.QUERY_CACHEBLOCKS,
+          false) != blockCachingEnable) {
         throw new Exception("HBase Spark configuration cannot be set properly")
       }
       x

http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala
index d33ced9..f47a319 100644
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala
@@ -69,9 +69,9 @@ class PartitionFilterSuite extends FunSuite with
 
     TEST_UTIL.startMiniCluster
     val sparkConf = new SparkConf
-    sparkConf.set(HBaseSparkConf.BLOCK_CACHE_ENABLE, "true")
-    sparkConf.set(HBaseSparkConf.BATCH_NUM, "100")
-    sparkConf.set(HBaseSparkConf.CACHE_SIZE, "100")
+    sparkConf.set(HBaseSparkConf.QUERY_CACHEBLOCKS, "true")
+    sparkConf.set(HBaseSparkConf.QUERY_BATCHSIZE, "100")
+    sparkConf.set(HBaseSparkConf.QUERY_CACHEDROWS, "100")
 
     sc = new SparkContext("local", "test", sparkConf)
     new HBaseContext(sc, TEST_UTIL.getConfiguration)