You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2017/03/16 17:12:31 UTC
[03/14] hbase git commit: HBASE-15597 Clean up configuration keys
used in hbase-spark module (Yi Liang)
HBASE-15597 Clean up configuration keys used in hbase-spark module (Yi Liang)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/35d7a0cd
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/35d7a0cd
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/35d7a0cd
Branch: refs/heads/hbase-12439
Commit: 35d7a0cd0798cabe7df5766fcc993512eca6c92e
Parents: fee67bc
Author: Jerry He <je...@apache.org>
Authored: Mon Mar 13 12:02:07 2017 -0700
Committer: Jerry He <je...@apache.org>
Committed: Mon Mar 13 12:02:07 2017 -0700
----------------------------------------------------------------------
.../hadoop/hbase/spark/DefaultSource.scala | 28 ++++-----
.../hbase/spark/HBaseConnectionCache.scala | 2 +-
.../spark/datasources/HBaseSparkConf.scala | 62 ++++++++++++--------
.../hadoop/hbase/spark/DefaultSourceSuite.scala | 16 ++---
.../spark/DynamicLogicExpressionSuite.scala | 2 +-
.../hadoop/hbase/spark/HBaseTestSource.scala | 13 ++--
.../hbase/spark/PartitionFilterSuite.scala | 6 +-
7 files changed, 69 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
index a8b2ab8..b2b646a 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
@@ -97,36 +97,36 @@ case class HBaseRelation (
)(@transient val sqlContext: SQLContext)
extends BaseRelation with PrunedFilteredScan with InsertableRelation with Logging {
val timestamp = parameters.get(HBaseSparkConf.TIMESTAMP).map(_.toLong)
- val minTimestamp = parameters.get(HBaseSparkConf.MIN_TIMESTAMP).map(_.toLong)
- val maxTimestamp = parameters.get(HBaseSparkConf.MAX_TIMESTAMP).map(_.toLong)
+ val minTimestamp = parameters.get(HBaseSparkConf.TIMERANGE_START).map(_.toLong)
+ val maxTimestamp = parameters.get(HBaseSparkConf.TIMERANGE_END).map(_.toLong)
val maxVersions = parameters.get(HBaseSparkConf.MAX_VERSIONS).map(_.toInt)
- val encoderClsName = parameters.get(HBaseSparkConf.ENCODER).getOrElse(HBaseSparkConf.defaultEncoder)
+ val encoderClsName = parameters.get(HBaseSparkConf.QUERY_ENCODER).getOrElse(HBaseSparkConf.DEFAULT_QUERY_ENCODER)
@transient val encoder = JavaBytesEncoder.create(encoderClsName)
val catalog = HBaseTableCatalog(parameters)
def tableName = catalog.name
- val configResources = parameters.getOrElse(HBaseSparkConf.HBASE_CONFIG_RESOURCES_LOCATIONS, "")
- val useHBaseContext = parameters.get(HBaseSparkConf.USE_HBASE_CONTEXT).map(_.toBoolean).getOrElse(true)
- val usePushDownColumnFilter = parameters.get(HBaseSparkConf.PUSH_DOWN_COLUMN_FILTER)
- .map(_.toBoolean).getOrElse(true)
+ val configResources = parameters.getOrElse(HBaseSparkConf.HBASE_CONFIG_LOCATION, "")
+ val useHBaseContext = parameters.get(HBaseSparkConf.USE_HBASECONTEXT).map(_.toBoolean).getOrElse(HBaseSparkConf.DEFAULT_USE_HBASECONTEXT)
+ val usePushDownColumnFilter = parameters.get(HBaseSparkConf.PUSHDOWN_COLUMN_FILTER)
+ .map(_.toBoolean).getOrElse(HBaseSparkConf.DEFAULT_PUSHDOWN_COLUMN_FILTER)
// The user supplied per table parameter will overwrite global ones in SparkConf
- val blockCacheEnable = parameters.get(HBaseSparkConf.BLOCK_CACHE_ENABLE).map(_.toBoolean)
+ val blockCacheEnable = parameters.get(HBaseSparkConf.QUERY_CACHEBLOCKS).map(_.toBoolean)
.getOrElse(
sqlContext.sparkContext.getConf.getBoolean(
- HBaseSparkConf.BLOCK_CACHE_ENABLE, HBaseSparkConf.defaultBlockCacheEnable))
- val cacheSize = parameters.get(HBaseSparkConf.CACHE_SIZE).map(_.toInt)
+ HBaseSparkConf.QUERY_CACHEBLOCKS, HBaseSparkConf.DEFAULT_QUERY_CACHEBLOCKS))
+ val cacheSize = parameters.get(HBaseSparkConf.QUERY_CACHEDROWS).map(_.toInt)
.getOrElse(
sqlContext.sparkContext.getConf.getInt(
- HBaseSparkConf.CACHE_SIZE, HBaseSparkConf.defaultCachingSize))
- val batchNum = parameters.get(HBaseSparkConf.BATCH_NUM).map(_.toInt)
+ HBaseSparkConf.QUERY_CACHEDROWS, -1))
+ val batchNum = parameters.get(HBaseSparkConf.QUERY_BATCHSIZE).map(_.toInt)
.getOrElse(sqlContext.sparkContext.getConf.getInt(
- HBaseSparkConf.BATCH_NUM, HBaseSparkConf.defaultBatchNum))
+ HBaseSparkConf.QUERY_BATCHSIZE, -1))
val bulkGetSize = parameters.get(HBaseSparkConf.BULKGET_SIZE).map(_.toInt)
.getOrElse(sqlContext.sparkContext.getConf.getInt(
- HBaseSparkConf.BULKGET_SIZE, HBaseSparkConf.defaultBulkGetSize))
+ HBaseSparkConf.BULKGET_SIZE, HBaseSparkConf.DEFAULT_BULKGET_SIZE))
//create or get latest HBaseContext
val hbaseContext:HBaseContext = if (useHBaseContext) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
index fb5833e..2858da8 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
@@ -37,7 +37,7 @@ private[spark] object HBaseConnectionCache extends Logging {
val cacheStat = HBaseConnectionCacheStat(0, 0, 0)
// in milliseconds
- private final val DEFAULT_TIME_OUT: Long = HBaseSparkConf.connectionCloseDelay
+ private final val DEFAULT_TIME_OUT: Long = HBaseSparkConf.DEFAULT_CONNECTION_CLOSE_DELAY
private var timeout = DEFAULT_TIME_OUT
private var closed: Boolean = false
http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
index 0f20d1d..8c1cb35 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
@@ -20,35 +20,45 @@ package org.apache.hadoop.hbase.spark.datasources
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
+/**
+ * This is the hbase configuration. User can either set them in SparkConf, which
+ * will take effect globally, or configure it per table, which will overwrite the value
+ * set in SparkConf. If not set, the default value will take effect.
+ */
@InterfaceAudience.Public
@InterfaceStability.Evolving
object HBaseSparkConf{
- // This is the hbase configuration. User can either set them in SparkConf, which
- // will take effect globally, or configure it per table, which will overwrite the value
- // set in SparkConf. If not setted, the default value will take effect.
- val BLOCK_CACHE_ENABLE = "spark.hbase.blockcache.enable"
- // default block cache is set to true by default following hbase convention, but note that
- // this potentially may slow down the system
- val defaultBlockCacheEnable = true
- val CACHE_SIZE = "spark.hbase.cacheSize"
- val defaultCachingSize = 1000
- val BATCH_NUM = "spark.hbase.batchNum"
- val defaultBatchNum = 1000
- val BULKGET_SIZE = "spark.hbase.bulkGetSize"
- val defaultBulkGetSize = 1000
-
- val HBASE_CONFIG_RESOURCES_LOCATIONS = "hbase.config.resources"
- val USE_HBASE_CONTEXT = "hbase.use.hbase.context"
- val PUSH_DOWN_COLUMN_FILTER = "hbase.pushdown.column.filter"
- val defaultPushDownColumnFilter = true
-
+ /** Set to false to disable server-side caching of blocks for this scan,
+ * false by default, since full table scans generate too much BC churn.
+ */
+ val QUERY_CACHEBLOCKS = "hbase.spark.query.cacheblocks"
+ val DEFAULT_QUERY_CACHEBLOCKS = false
+ /** The number of rows for caching that will be passed to scan. */
+ val QUERY_CACHEDROWS = "hbase.spark.query.cachedrows"
+ /** Set the maximum number of values to return for each call to next() in scan. */
+ val QUERY_BATCHSIZE = "hbase.spark.query.batchsize"
+ /** The number of BulkGets send to HBase. */
+ val BULKGET_SIZE = "hbase.spark.bulkget.size"
+ val DEFAULT_BULKGET_SIZE = 1000
+ /** Set to specify the location of hbase configuration file. */
+ val HBASE_CONFIG_LOCATION = "hbase.spark.config.location"
+ /** Set to specify whether create or use latest cached HBaseContext*/
+ val USE_HBASECONTEXT = "hbase.spark.use.hbasecontext"
+ val DEFAULT_USE_HBASECONTEXT = true
+ /** Pushdown the filter to data source engine to increase the performance of queries. */
+ val PUSHDOWN_COLUMN_FILTER = "hbase.spark.pushdown.columnfilter"
+ val DEFAULT_PUSHDOWN_COLUMN_FILTER= true
+ /** Class name of the encoder, which encode data types from Spark to HBase bytes. */
+ val QUERY_ENCODER = "hbase.spark.query.encoder"
+ val DEFAULT_QUERY_ENCODER = classOf[NaiveEncoder].getCanonicalName
+ /** The timestamp used to filter columns with a specific timestamp. */
val TIMESTAMP = "hbase.spark.query.timestamp"
- val MIN_TIMESTAMP = "hbase.spark.query.minTimestamp"
- val MAX_TIMESTAMP = "hbase.spark.query.maxTimestamp"
+ /** The starting timestamp used to filter columns with a specific range of versions. */
+ val TIMERANGE_START = "hbase.spark.query.timerange.start"
+ /** The ending timestamp used to filter columns with a specific range of versions. */
+ val TIMERANGE_END = "hbase.spark.query.timerange.end"
+ /** The maximum number of version to return. */
val MAX_VERSIONS = "hbase.spark.query.maxVersions"
- val ENCODER = "hbase.spark.query.encoder"
- val defaultEncoder = classOf[NaiveEncoder].getCanonicalName
-
- // in milliseconds
- val connectionCloseDelay = 10 * 60 * 1000
+ /** Delayed time to close hbase-spark connection when no reference to this connection, in milliseconds. */
+ val DEFAULT_CONNECTION_CLOSE_DELAY = 10 * 60 * 1000
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
index 7b8b844..3bce041 100644
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
@@ -116,9 +116,9 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
TEST_UTIL.createTable(TableName.valueOf(t2TableName), Bytes.toBytes(columnFamily))
logInfo(" - created table")
val sparkConf = new SparkConf
- sparkConf.set(HBaseSparkConf.BLOCK_CACHE_ENABLE, "true")
- sparkConf.set(HBaseSparkConf.BATCH_NUM, "100")
- sparkConf.set(HBaseSparkConf.CACHE_SIZE, "100")
+ sparkConf.set(HBaseSparkConf.QUERY_CACHEBLOCKS, "true")
+ sparkConf.set(HBaseSparkConf.QUERY_BATCHSIZE, "100")
+ sparkConf.set(HBaseSparkConf.QUERY_CACHEDROWS, "100")
sc = new SparkContext("local", "test", sparkConf)
@@ -791,7 +791,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
|}""".stripMargin
df = sqlContext.load("org.apache.hadoop.hbase.spark",
Map(HBaseTableCatalog.tableCatalog->catalog,
- HBaseSparkConf.PUSH_DOWN_COLUMN_FILTER -> "false"))
+ HBaseSparkConf.PUSHDOWN_COLUMN_FILTER -> "false"))
df.registerTempTable("hbaseNoPushDownTmp")
@@ -913,8 +913,8 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
// Test Getting old stuff -- Full Scan, TimeRange
val oldRange = sqlContext.read
- .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0",
- HBaseSparkConf.MAX_TIMESTAMP -> (oldMs + 100).toString))
+ .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMERANGE_START -> "0",
+ HBaseSparkConf.TIMERANGE_END -> (oldMs + 100).toString))
.format("org.apache.hadoop.hbase.spark")
.load()
assert(oldRange.count() == 101)
@@ -924,8 +924,8 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
// Test Getting middle stuff -- Full Scan, TimeRange
val middleRange = sqlContext.read
- .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0",
- HBaseSparkConf.MAX_TIMESTAMP -> (startMs + 100).toString))
+ .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMERANGE_START -> "0",
+ HBaseSparkConf.TIMERANGE_END -> (startMs + 100).toString))
.format("org.apache.hadoop.hbase.spark")
.load()
assert(middleRange.count() == 256)
http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala
index b9c15ce..bc833e8 100644
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala
@@ -28,7 +28,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
class DynamicLogicExpressionSuite extends FunSuite with
BeforeAndAfterEach with BeforeAndAfterAll with Logging {
- val encoder = JavaBytesEncoder.create(HBaseSparkConf.defaultEncoder)
+ val encoder = JavaBytesEncoder.create(HBaseSparkConf.DEFAULT_QUERY_ENCODER)
test("Basic And Test") {
val leftLogic = new LessThanLogicExpression("Col1", 0)
http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala
index 83465d9..ccb4625 100644
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala
@@ -49,13 +49,12 @@ case class DummyScan(
override def buildScan(): RDD[Row] = sqlContext.sparkContext.parallelize(0 until rowNum)
.map(Row(_))
.map{ x =>
- if (sparkConf.getInt(HBaseSparkConf.BATCH_NUM,
- HBaseSparkConf.defaultBatchNum) != batchNum ||
- sparkConf.getInt(HBaseSparkConf.CACHE_SIZE,
- HBaseSparkConf.defaultCachingSize) != cacheSize ||
- sparkConf.getBoolean(HBaseSparkConf.BLOCK_CACHE_ENABLE,
- HBaseSparkConf.defaultBlockCacheEnable)
- != blockCachingEnable) {
+ if (sparkConf.getInt(HBaseSparkConf.QUERY_BATCHSIZE,
+ -1) != batchNum ||
+ sparkConf.getInt(HBaseSparkConf.QUERY_CACHEDROWS,
+ -1) != cacheSize ||
+ sparkConf.getBoolean(HBaseSparkConf.QUERY_CACHEBLOCKS,
+ false) != blockCachingEnable) {
throw new Exception("HBase Spark configuration cannot be set properly")
}
x
http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala
index d33ced9..f47a319 100644
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala
@@ -69,9 +69,9 @@ class PartitionFilterSuite extends FunSuite with
TEST_UTIL.startMiniCluster
val sparkConf = new SparkConf
- sparkConf.set(HBaseSparkConf.BLOCK_CACHE_ENABLE, "true")
- sparkConf.set(HBaseSparkConf.BATCH_NUM, "100")
- sparkConf.set(HBaseSparkConf.CACHE_SIZE, "100")
+ sparkConf.set(HBaseSparkConf.QUERY_CACHEBLOCKS, "true")
+ sparkConf.set(HBaseSparkConf.QUERY_BATCHSIZE, "100")
+ sparkConf.set(HBaseSparkConf.QUERY_CACHEDROWS, "100")
sc = new SparkContext("local", "test", sparkConf)
new HBaseContext(sc, TEST_UTIL.getConfiguration)