You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/11/09 04:59:53 UTC
[7/9] hbase git commit: HBASE-18817 pull the hbase-spark module out
of branch-2.
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala
deleted file mode 100644
index 283838f..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import java.util
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.spark.datasources.{BytesEncoder, JavaBytesEncoder}
-import org.apache.hadoop.hbase.spark.datasources.JavaBytesEncoder.JavaBytesEncoder
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.spark.sql.datasources.hbase.{Field, Utils}
-import org.apache.spark.sql.types._
-/**
- * Dynamic logic for SQL push down logic there is an instance for most
- * common operations and a pass through for other operations not covered here
- *
- * Logic can be nested with And or Or operators.
- *
- * A logic tree can be written out as a string and reconstructed from that string
- *
- */
-@InterfaceAudience.Private
-trait DynamicLogicExpression {
- def execute(columnToCurrentRowValueMap: util.HashMap[String, ByteArrayComparable],
- valueFromQueryValueArray:Array[Array[Byte]]): Boolean
- def toExpressionString: String = {
- val strBuilder = new StringBuilder
- appendToExpression(strBuilder)
- strBuilder.toString()
- }
- def filterOps: JavaBytesEncoder = JavaBytesEncoder.Unknown
-
- def appendToExpression(strBuilder:StringBuilder)
-
- var encoder: BytesEncoder = _
-
- def setEncoder(enc: BytesEncoder): DynamicLogicExpression = {
- encoder = enc
- this
- }
-}
-
-@InterfaceAudience.Private
-trait CompareTrait {
- self: DynamicLogicExpression =>
- def columnName: String
- def valueFromQueryIndex: Int
- def execute(columnToCurrentRowValueMap:
- util.HashMap[String, ByteArrayComparable],
- valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
- val currentRowValue = columnToCurrentRowValueMap.get(columnName)
- val valueFromQuery = valueFromQueryValueArray(valueFromQueryIndex)
- currentRowValue != null &&
- encoder.filter(currentRowValue.bytes, currentRowValue.offset, currentRowValue.length,
- valueFromQuery, 0, valueFromQuery.length, filterOps)
- }
-}
-
-@InterfaceAudience.Private
-class AndLogicExpression (val leftExpression:DynamicLogicExpression,
- val rightExpression:DynamicLogicExpression)
- extends DynamicLogicExpression{
- override def execute(columnToCurrentRowValueMap:
- util.HashMap[String, ByteArrayComparable],
- valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
- leftExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray) &&
- rightExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray)
- }
-
- override def appendToExpression(strBuilder: StringBuilder): Unit = {
- strBuilder.append("( ")
- strBuilder.append(leftExpression.toExpressionString)
- strBuilder.append(" AND ")
- strBuilder.append(rightExpression.toExpressionString)
- strBuilder.append(" )")
- }
-}
-
-@InterfaceAudience.Private
-class OrLogicExpression (val leftExpression:DynamicLogicExpression,
- val rightExpression:DynamicLogicExpression)
- extends DynamicLogicExpression{
- override def execute(columnToCurrentRowValueMap:
- util.HashMap[String, ByteArrayComparable],
- valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
- leftExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray) ||
- rightExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray)
- }
- override def appendToExpression(strBuilder: StringBuilder): Unit = {
- strBuilder.append("( ")
- strBuilder.append(leftExpression.toExpressionString)
- strBuilder.append(" OR ")
- strBuilder.append(rightExpression.toExpressionString)
- strBuilder.append(" )")
- }
-}
-
-@InterfaceAudience.Private
-class EqualLogicExpression (val columnName:String,
- val valueFromQueryIndex:Int,
- val isNot:Boolean) extends DynamicLogicExpression{
- override def execute(columnToCurrentRowValueMap:
- util.HashMap[String, ByteArrayComparable],
- valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
- val currentRowValue = columnToCurrentRowValueMap.get(columnName)
- val valueFromQuery = valueFromQueryValueArray(valueFromQueryIndex)
-
- currentRowValue != null &&
- Bytes.equals(valueFromQuery,
- 0, valueFromQuery.length, currentRowValue.bytes,
- currentRowValue.offset, currentRowValue.length) != isNot
- }
- override def appendToExpression(strBuilder: StringBuilder): Unit = {
- val command = if (isNot) "!=" else "=="
- strBuilder.append(columnName + " " + command + " " + valueFromQueryIndex)
- }
-}
-
-@InterfaceAudience.Private
-class IsNullLogicExpression (val columnName:String,
- val isNot:Boolean) extends DynamicLogicExpression{
- override def execute(columnToCurrentRowValueMap:
- util.HashMap[String, ByteArrayComparable],
- valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
- val currentRowValue = columnToCurrentRowValueMap.get(columnName)
-
- (currentRowValue == null) != isNot
- }
- override def appendToExpression(strBuilder: StringBuilder): Unit = {
- val command = if (isNot) "isNotNull" else "isNull"
- strBuilder.append(columnName + " " + command)
- }
-}
-
-@InterfaceAudience.Private
-class GreaterThanLogicExpression (override val columnName:String,
- override val valueFromQueryIndex:Int)
- extends DynamicLogicExpression with CompareTrait{
- override val filterOps = JavaBytesEncoder.Greater
- override def appendToExpression(strBuilder: StringBuilder): Unit = {
- strBuilder.append(columnName + " > " + valueFromQueryIndex)
- }
-}
-
-@InterfaceAudience.Private
-class GreaterThanOrEqualLogicExpression (override val columnName:String,
- override val valueFromQueryIndex:Int)
- extends DynamicLogicExpression with CompareTrait{
- override val filterOps = JavaBytesEncoder.GreaterEqual
- override def appendToExpression(strBuilder: StringBuilder): Unit = {
- strBuilder.append(columnName + " >= " + valueFromQueryIndex)
- }
-}
-
-@InterfaceAudience.Private
-class LessThanLogicExpression (override val columnName:String,
- override val valueFromQueryIndex:Int)
- extends DynamicLogicExpression with CompareTrait {
- override val filterOps = JavaBytesEncoder.Less
- override def appendToExpression(strBuilder: StringBuilder): Unit = {
- strBuilder.append(columnName + " < " + valueFromQueryIndex)
- }
-}
-
-@InterfaceAudience.Private
-class LessThanOrEqualLogicExpression (val columnName:String,
- val valueFromQueryIndex:Int)
- extends DynamicLogicExpression with CompareTrait{
- override val filterOps = JavaBytesEncoder.LessEqual
- override def appendToExpression(strBuilder: StringBuilder): Unit = {
- strBuilder.append(columnName + " <= " + valueFromQueryIndex)
- }
-}
-
-@InterfaceAudience.Private
-class PassThroughLogicExpression() extends DynamicLogicExpression {
- override def execute(columnToCurrentRowValueMap:
- util.HashMap[String, ByteArrayComparable],
- valueFromQueryValueArray: Array[Array[Byte]]): Boolean = true
-
- override def appendToExpression(strBuilder: StringBuilder): Unit = {
- // Fix the offset bug by add dummy to avoid crash the region server.
- // because in the DynamicLogicExpressionBuilder.build function, the command is always retrieved from offset + 1 as below
- // val command = expressionArray(offSet + 1)
- // we have to padding it so that `Pass` is on the right offset.
- strBuilder.append("dummy Pass -1")
- }
-}
-
-@InterfaceAudience.Private
-object DynamicLogicExpressionBuilder {
- def build(expressionString: String, encoder: BytesEncoder): DynamicLogicExpression = {
-
- val expressionAndOffset = build(expressionString.split(' '), 0, encoder)
- expressionAndOffset._1
- }
-
- private def build(expressionArray:Array[String],
- offSet:Int, encoder: BytesEncoder): (DynamicLogicExpression, Int) = {
- val expr = {
- if (expressionArray(offSet).equals("(")) {
- val left = build(expressionArray, offSet + 1, encoder)
- val right = build(expressionArray, left._2 + 1, encoder)
- if (expressionArray(left._2).equals("AND")) {
- (new AndLogicExpression(left._1, right._1), right._2 + 1)
- } else if (expressionArray(left._2).equals("OR")) {
- (new OrLogicExpression(left._1, right._1), right._2 + 1)
- } else {
- throw new Throwable("Unknown gate:" + expressionArray(left._2))
- }
- } else {
- val command = expressionArray(offSet + 1)
- if (command.equals("<")) {
- (new LessThanLogicExpression(expressionArray(offSet),
- expressionArray(offSet + 2).toInt), offSet + 3)
- } else if (command.equals("<=")) {
- (new LessThanOrEqualLogicExpression(expressionArray(offSet),
- expressionArray(offSet + 2).toInt), offSet + 3)
- } else if (command.equals(">")) {
- (new GreaterThanLogicExpression(expressionArray(offSet),
- expressionArray(offSet + 2).toInt), offSet + 3)
- } else if (command.equals(">=")) {
- (new GreaterThanOrEqualLogicExpression(expressionArray(offSet),
- expressionArray(offSet + 2).toInt), offSet + 3)
- } else if (command.equals("==")) {
- (new EqualLogicExpression(expressionArray(offSet),
- expressionArray(offSet + 2).toInt, false), offSet + 3)
- } else if (command.equals("!=")) {
- (new EqualLogicExpression(expressionArray(offSet),
- expressionArray(offSet + 2).toInt, true), offSet + 3)
- } else if (command.equals("isNull")) {
- (new IsNullLogicExpression(expressionArray(offSet), false), offSet + 2)
- } else if (command.equals("isNotNull")) {
- (new IsNullLogicExpression(expressionArray(offSet), true), offSet + 2)
- } else if (command.equals("Pass")) {
- (new PassThroughLogicExpression, offSet + 3)
- } else {
- throw new Throwable("Unknown logic command:" + command)
- }
- }
- }
- expr._1.setEncoder(encoder)
- expr
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala
deleted file mode 100644
index 7a651e1..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.spark
-
-import java.util
-
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * This object is a clean way to store and sort all cells that will be bulk
- * loaded into a single row
- */
-@InterfaceAudience.Public
-class FamiliesQualifiersValues extends Serializable {
- //Tree maps are used because we need the results to
- // be sorted when we read them
- val familyMap = new util.TreeMap[ByteArrayWrapper,
- util.TreeMap[ByteArrayWrapper, Array[Byte]]]()
-
- //normally in a row there are more columns then
- //column families this wrapper is reused for column
- //family look ups
- val reusableWrapper = new ByteArrayWrapper(null)
-
- /**
- * Adds a new cell to an existing row
- * @param family HBase column family
- * @param qualifier HBase column qualifier
- * @param value HBase cell value
- */
- def += (family: Array[Byte], qualifier: Array[Byte], value: Array[Byte]): Unit = {
-
- reusableWrapper.value = family
-
- var qualifierValues = familyMap.get(reusableWrapper)
-
- if (qualifierValues == null) {
- qualifierValues = new util.TreeMap[ByteArrayWrapper, Array[Byte]]()
- familyMap.put(new ByteArrayWrapper(family), qualifierValues)
- }
-
- qualifierValues.put(new ByteArrayWrapper(qualifier), value)
- }
-
- /**
- * A wrapper for "+=" method above, can be used by Java
- * @param family HBase column family
- * @param qualifier HBase column qualifier
- * @param value HBase cell value
- */
- def add(family: Array[Byte], qualifier: Array[Byte], value: Array[Byte]): Unit = {
- this += (family, qualifier, value)
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamilyHFileWriteOptions.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamilyHFileWriteOptions.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamilyHFileWriteOptions.scala
deleted file mode 100644
index 9ee9291..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamilyHFileWriteOptions.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import java.io.Serializable
-
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * This object will hold optional data for how a given column family's
- * writer will work
- *
- * @param compression String to define the Compression to be used in the HFile
- * @param bloomType String to define the bloom type to be used in the HFile
- * @param blockSize The block size to be used in the HFile
- * @param dataBlockEncoding String to define the data block encoding to be used
- * in the HFile
- */
-@InterfaceAudience.Public
-class FamilyHFileWriteOptions( val compression:String,
- val bloomType: String,
- val blockSize: Int,
- val dataBlockEncoding: String) extends Serializable
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
deleted file mode 100644
index 2858da8..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import java.io.IOException
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.client.{Admin, Connection, ConnectionFactory, RegionLocator, Table}
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory
-import org.apache.hadoop.hbase.security.{User, UserProvider}
-import org.apache.hadoop.hbase.spark.datasources.HBaseSparkConf
-import org.apache.hadoop.hbase.{HConstants, TableName}
-import org.apache.spark.Logging
-
-import scala.collection.mutable
-
-private[spark] object HBaseConnectionCache extends Logging {
-
- // A hashmap of Spark-HBase connections. Key is HBaseConnectionKey.
- val connectionMap = new mutable.HashMap[HBaseConnectionKey, SmartConnection]()
-
- val cacheStat = HBaseConnectionCacheStat(0, 0, 0)
-
- // in milliseconds
- private final val DEFAULT_TIME_OUT: Long = HBaseSparkConf.DEFAULT_CONNECTION_CLOSE_DELAY
- private var timeout = DEFAULT_TIME_OUT
- private var closed: Boolean = false
-
- var housekeepingThread = new Thread(new Runnable {
- override def run() {
- while (true) {
- try {
- Thread.sleep(timeout)
- } catch {
- case e: InterruptedException =>
- // setTimeout() and close() may interrupt the sleep and it's safe
- // to ignore the exception
- }
- if (closed)
- return
- performHousekeeping(false)
- }
- }
- })
- housekeepingThread.setDaemon(true)
- housekeepingThread.start()
-
- def getStat: HBaseConnectionCacheStat = {
- connectionMap.synchronized {
- cacheStat.numActiveConnections = connectionMap.size
- cacheStat.copy()
- }
- }
-
- def close(): Unit = {
- try {
- connectionMap.synchronized {
- if (closed)
- return
- closed = true
- housekeepingThread.interrupt()
- housekeepingThread = null
- HBaseConnectionCache.performHousekeeping(true)
- }
- } catch {
- case e: Exception => logWarning("Error in finalHouseKeeping", e)
- }
- }
-
- def performHousekeeping(forceClean: Boolean) = {
- val tsNow: Long = System.currentTimeMillis()
- connectionMap.synchronized {
- connectionMap.foreach {
- x => {
- if(x._2.refCount < 0) {
- logError(s"Bug to be fixed: negative refCount of connection ${x._2}")
- }
-
- if(forceClean || ((x._2.refCount <= 0) && (tsNow - x._2.timestamp > timeout))) {
- try{
- x._2.connection.close()
- } catch {
- case e: IOException => logWarning(s"Fail to close connection ${x._2}", e)
- }
- connectionMap.remove(x._1)
- }
- }
- }
- }
- }
-
- // For testing purpose only
- def getConnection(key: HBaseConnectionKey, conn: => Connection): SmartConnection = {
- connectionMap.synchronized {
- if (closed)
- return null
- cacheStat.numTotalRequests += 1
- val sc = connectionMap.getOrElseUpdate(key, {cacheStat.numActualConnectionsCreated += 1
- new SmartConnection(conn)})
- sc.refCount += 1
- sc
- }
- }
-
- def getConnection(conf: Configuration): SmartConnection =
- getConnection(new HBaseConnectionKey(conf), ConnectionFactory.createConnection(conf))
-
- // For testing purpose only
- def setTimeout(to: Long): Unit = {
- connectionMap.synchronized {
- if (closed)
- return
- timeout = to
- housekeepingThread.interrupt()
- }
- }
-}
-
-private[hbase] case class SmartConnection (
- connection: Connection, var refCount: Int = 0, var timestamp: Long = 0) {
- def getTable(tableName: TableName): Table = connection.getTable(tableName)
- def getRegionLocator(tableName: TableName): RegionLocator = connection.getRegionLocator(tableName)
- def isClosed: Boolean = connection.isClosed
- def getAdmin: Admin = connection.getAdmin
- def close() = {
- HBaseConnectionCache.connectionMap.synchronized {
- refCount -= 1
- if(refCount <= 0)
- timestamp = System.currentTimeMillis()
- }
- }
-}
-
-/**
- * Denotes a unique key to an HBase Connection instance.
- * Please refer to 'org.apache.hadoop.hbase.client.HConnectionKey'.
- *
- * In essence, this class captures the properties in Configuration
- * that may be used in the process of establishing a connection.
- *
- */
-class HBaseConnectionKey(c: Configuration) extends Logging {
- val conf: Configuration = c
- val CONNECTION_PROPERTIES: Array[String] = Array[String](
- HConstants.ZOOKEEPER_QUORUM,
- HConstants.ZOOKEEPER_ZNODE_PARENT,
- HConstants.ZOOKEEPER_CLIENT_PORT,
- HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME,
- HConstants.HBASE_CLIENT_PAUSE,
- HConstants.HBASE_CLIENT_RETRIES_NUMBER,
- HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.HBASE_META_SCANNER_CACHING,
- HConstants.HBASE_CLIENT_INSTANCE_ID,
- HConstants.RPC_CODEC_CONF_KEY,
- HConstants.USE_META_REPLICAS,
- RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY)
-
- var username: String = _
- var m_properties = mutable.HashMap.empty[String, String]
- if (conf != null) {
- for (property <- CONNECTION_PROPERTIES) {
- val value: String = conf.get(property)
- if (value != null) {
- m_properties.+=((property, value))
- }
- }
- try {
- val provider: UserProvider = UserProvider.instantiate(conf)
- val currentUser: User = provider.getCurrent
- if (currentUser != null) {
- username = currentUser.getName
- }
- }
- catch {
- case e: IOException => {
- logWarning("Error obtaining current user, skipping username in HBaseConnectionKey", e)
- }
- }
- }
-
- // make 'properties' immutable
- val properties = m_properties.toMap
-
- override def hashCode: Int = {
- val prime: Int = 31
- var result: Int = 1
- if (username != null) {
- result = username.hashCode
- }
- for (property <- CONNECTION_PROPERTIES) {
- val value: Option[String] = properties.get(property)
- if (value.isDefined) {
- result = prime * result + value.hashCode
- }
- }
- result
- }
-
- override def equals(obj: Any): Boolean = {
- if (obj == null) return false
- if (getClass ne obj.getClass) return false
- val that: HBaseConnectionKey = obj.asInstanceOf[HBaseConnectionKey]
- if (this.username != null && !(this.username == that.username)) {
- return false
- }
- else if (this.username == null && that.username != null) {
- return false
- }
- if (this.properties == null) {
- if (that.properties != null) {
- return false
- }
- }
- else {
- if (that.properties == null) {
- return false
- }
- var flag: Boolean = true
- for (property <- CONNECTION_PROPERTIES) {
- val thisValue: Option[String] = this.properties.get(property)
- val thatValue: Option[String] = that.properties.get(property)
- flag = true
- if (thisValue eq thatValue) {
- flag = false //continue, so make flag be false
- }
- if (flag && (thisValue == null || !(thisValue == thatValue))) {
- return false
- }
- }
- }
- true
- }
-
- override def toString: String = {
- "HBaseConnectionKey{" + "properties=" + properties + ", username='" + username + '\'' + '}'
- }
-}
-
-/**
- * To log the state of 'HBaseConnectionCache'
- *
- * @param numTotalRequests number of total connection requests to the cache
- * @param numActualConnectionsCreated number of actual HBase connections the cache ever created
- * @param numActiveConnections number of current alive HBase connections the cache is holding
- */
-case class HBaseConnectionCacheStat(var numTotalRequests: Long,
- var numActualConnectionsCreated: Long,
- var numActiveConnections: Long)
-
-
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
deleted file mode 100644
index eb0d683..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
+++ /dev/null
@@ -1,1115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import java.net.InetSocketAddress
-import java.util
-import java.util.UUID
-import javax.management.openmbean.KeyAlreadyExistsException
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.fs.HFileSystem
-import org.apache.hadoop.hbase._
-import org.apache.hadoop.hbase.io.compress.Compression
-import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
-import org.apache.hadoop.hbase.io.hfile.{HFile, CacheConfig, HFileContextBuilder, HFileWriterImpl}
-import org.apache.hadoop.hbase.regionserver.{HStore, HStoreFile, StoreFileWriter, BloomType}
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.hadoop.mapred.JobConf
-import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.RDD
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
-import org.apache.hadoop.hbase.client._
-import scala.reflect.ClassTag
-import org.apache.spark.{Logging, SerializableWritable, SparkContext}
-import org.apache.hadoop.hbase.mapreduce.{TableMapReduceUtil,
-TableInputFormat, IdentityTableMapper}
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable
-import org.apache.hadoop.mapreduce.Job
-import org.apache.spark.streaming.dstream.DStream
-import java.io._
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
-import org.apache.hadoop.fs.{Path, FileAlreadyExistsException, FileSystem}
-import scala.collection.mutable
-
-/**
- * HBaseContext is a façade for HBase operations
- * like bulk put, get, increment, delete, and scan
- *
- * HBaseContext will take the responsibilities
- * of disseminating the configuration information
- * to the working and managing the life cycle of Connections.
- */
-@InterfaceAudience.Public
-class HBaseContext(@transient sc: SparkContext,
- @transient val config: Configuration,
- val tmpHdfsConfgFile: String = null)
- extends Serializable with Logging {
-
- @transient var credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
- @transient var tmpHdfsConfiguration:Configuration = config
- @transient var appliedCredentials = false
- @transient val job = Job.getInstance(config)
- TableMapReduceUtil.initCredentials(job)
- val broadcastedConf = sc.broadcast(new SerializableWritable(config))
- val credentialsConf = sc.broadcast(new SerializableWritable(job.getCredentials))
-
- LatestHBaseContextCache.latest = this
-
- if (tmpHdfsConfgFile != null && config != null) {
- val fs = FileSystem.newInstance(config)
- val tmpPath = new Path(tmpHdfsConfgFile)
- if (!fs.exists(tmpPath)) {
- val outputStream = fs.create(tmpPath)
- config.write(outputStream)
- outputStream.close()
- } else {
- logWarning("tmpHdfsConfigDir " + tmpHdfsConfgFile + " exist!!")
- }
- }
-
- /**
- * A simple enrichment of the traditional Spark RDD foreachPartition.
- * This function differs from the original in that it offers the
- * developer access to a already connected Connection object
- *
- * Note: Do not close the Connection object. All Connection
- * management is handled outside this method
- *
- * @param rdd Original RDD with data to iterate over
- * @param f Function to be given a iterator to iterate through
- * the RDD values and a Connection object to interact
- * with HBase
- */
- def foreachPartition[T](rdd: RDD[T],
- f: (Iterator[T], Connection) => Unit):Unit = {
- rdd.foreachPartition(
- it => hbaseForeachPartition(broadcastedConf, it, f))
- }
-
- /**
- * A simple enrichment of the traditional Spark Streaming dStream foreach
- * This function differs from the original in that it offers the
- * developer access to a already connected Connection object
- *
- * Note: Do not close the Connection object. All Connection
- * management is handled outside this method
- *
- * @param dstream Original DStream with data to iterate over
- * @param f Function to be given a iterator to iterate through
- * the DStream values and a Connection object to
- * interact with HBase
- */
- def foreachPartition[T](dstream: DStream[T],
- f: (Iterator[T], Connection) => Unit):Unit = {
- dstream.foreachRDD((rdd, time) => {
- foreachPartition(rdd, f)
- })
- }
-
- /**
- * A simple enrichment of the traditional Spark RDD mapPartition.
- * This function differs from the original in that it offers the
- * developer access to a already connected Connection object
- *
- * Note: Do not close the Connection object. All Connection
- * management is handled outside this method
- *
- * @param rdd Original RDD with data to iterate over
- * @param mp Function to be given a iterator to iterate through
- * the RDD values and a Connection object to interact
- * with HBase
- * @return Returns a new RDD generated by the user definition
- * function just like normal mapPartition
- */
- def mapPartitions[T, R: ClassTag](rdd: RDD[T],
- mp: (Iterator[T], Connection) => Iterator[R]): RDD[R] = {
-
- rdd.mapPartitions[R](it => hbaseMapPartition[T, R](broadcastedConf,
- it,
- mp))
-
- }
-
- /**
- * A simple enrichment of the traditional Spark Streaming DStream
- * foreachPartition.
- *
- * This function differs from the original in that it offers the
- * developer access to a already connected Connection object
- *
- * Note: Do not close the Connection object. All Connection
- * management is handled outside this method
- *
- * Note: Make sure to partition correctly to avoid memory issue when
- * getting data from HBase
- *
- * @param dstream Original DStream with data to iterate over
- * @param f Function to be given a iterator to iterate through
- * the DStream values and a Connection object to
- * interact with HBase
- * @return Returns a new DStream generated by the user
- * definition function just like normal mapPartition
- */
- def streamForeachPartition[T](dstream: DStream[T],
- f: (Iterator[T], Connection) => Unit): Unit = {
-
- dstream.foreachRDD(rdd => this.foreachPartition(rdd, f))
- }
-
- /**
- * A simple enrichment of the traditional Spark Streaming DStream
- * mapPartition.
- *
- * This function differs from the original in that it offers the
- * developer access to a already connected Connection object
- *
- * Note: Do not close the Connection object. All Connection
- * management is handled outside this method
- *
- * Note: Make sure to partition correctly to avoid memory issue when
- * getting data from HBase
- *
- * @param dstream Original DStream with data to iterate over
- * @param f Function to be given a iterator to iterate through
- * the DStream values and a Connection object to
- * interact with HBase
- * @return Returns a new DStream generated by the user
- * definition function just like normal mapPartition
- */
- def streamMapPartitions[T, U: ClassTag](dstream: DStream[T],
- f: (Iterator[T], Connection) => Iterator[U]):
- DStream[U] = {
- dstream.mapPartitions(it => hbaseMapPartition[T, U](
- broadcastedConf,
- it,
- f))
- }
-
- /**
- * A simple abstraction over the HBaseContext.foreachPartition method.
- *
- * It allow addition support for a user to take RDD
- * and generate puts and send them to HBase.
- * The complexity of managing the Connection is
- * removed from the developer
- *
- * @param rdd Original RDD with data to iterate over
- * @param tableName The name of the table to put into
- * @param f Function to convert a value in the RDD to a HBase Put
- */
- def bulkPut[T](rdd: RDD[T], tableName: TableName, f: (T) => Put) {
-
- val tName = tableName.getName
- rdd.foreachPartition(
- it => hbaseForeachPartition[T](
- broadcastedConf,
- it,
- (iterator, connection) => {
- val m = connection.getBufferedMutator(TableName.valueOf(tName))
- iterator.foreach(T => m.mutate(f(T)))
- m.flush()
- m.close()
- }))
- }
-
- def applyCreds[T] (){
- credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
-
- logDebug("appliedCredentials:" + appliedCredentials + ",credentials:" + credentials)
-
- if (!appliedCredentials && credentials != null) {
- appliedCredentials = true
-
- @transient val ugi = UserGroupInformation.getCurrentUser
- ugi.addCredentials(credentials)
- // specify that this is a proxy user
- ugi.setAuthenticationMethod(AuthenticationMethod.PROXY)
-
- ugi.addCredentials(credentialsConf.value.value)
- }
- }
-
- /**
- * A simple abstraction over the HBaseContext.streamMapPartition method.
- *
- * It allow addition support for a user to take a DStream and
- * generate puts and send them to HBase.
- *
- * The complexity of managing the Connection is
- * removed from the developer
- *
- * @param dstream Original DStream with data to iterate over
- * @param tableName The name of the table to put into
- * @param f Function to convert a value in
- * the DStream to a HBase Put
- */
- def streamBulkPut[T](dstream: DStream[T],
- tableName: TableName,
- f: (T) => Put) = {
- val tName = tableName.getName
- dstream.foreachRDD((rdd, time) => {
- bulkPut(rdd, TableName.valueOf(tName), f)
- })
- }
-
- /**
- * A simple abstraction over the HBaseContext.foreachPartition method.
- *
- * It allow addition support for a user to take a RDD and generate delete
- * and send them to HBase. The complexity of managing the Connection is
- * removed from the developer
- *
- * @param rdd Original RDD with data to iterate over
- * @param tableName The name of the table to delete from
- * @param f Function to convert a value in the RDD to a
- * HBase Deletes
- * @param batchSize The number of delete to batch before sending to HBase
- */
- def bulkDelete[T](rdd: RDD[T], tableName: TableName,
- f: (T) => Delete, batchSize: Integer) {
- bulkMutation(rdd, tableName, f, batchSize)
- }
-
- /**
- * A simple abstraction over the HBaseContext.streamBulkMutation method.
- *
- * It allow addition support for a user to take a DStream and
- * generate Delete and send them to HBase.
- *
- * The complexity of managing the Connection is
- * removed from the developer
- *
- * @param dstream Original DStream with data to iterate over
- * @param tableName The name of the table to delete from
- * @param f function to convert a value in the DStream to a
- * HBase Delete
- * @param batchSize The number of deletes to batch before sending to HBase
- */
- def streamBulkDelete[T](dstream: DStream[T],
- tableName: TableName,
- f: (T) => Delete,
- batchSize: Integer) = {
- streamBulkMutation(dstream, tableName, f, batchSize)
- }
-
- /**
- * Under lining function to support all bulk mutations
- *
- * May be opened up if requested
- */
- private def bulkMutation[T](rdd: RDD[T], tableName: TableName,
- f: (T) => Mutation, batchSize: Integer) {
-
- val tName = tableName.getName
- rdd.foreachPartition(
- it => hbaseForeachPartition[T](
- broadcastedConf,
- it,
- (iterator, connection) => {
- val table = connection.getTable(TableName.valueOf(tName))
- val mutationList = new java.util.ArrayList[Mutation]
- iterator.foreach(T => {
- mutationList.add(f(T))
- if (mutationList.size >= batchSize) {
- table.batch(mutationList, null)
- mutationList.clear()
- }
- })
- if (mutationList.size() > 0) {
- table.batch(mutationList, null)
- mutationList.clear()
- }
- table.close()
- }))
- }
-
- /**
- * Under lining function to support all bulk streaming mutations
- *
- * May be opened up if requested
- */
- private def streamBulkMutation[T](dstream: DStream[T],
- tableName: TableName,
- f: (T) => Mutation,
- batchSize: Integer) = {
- val tName = tableName.getName
- dstream.foreachRDD((rdd, time) => {
- bulkMutation(rdd, TableName.valueOf(tName), f, batchSize)
- })
- }
-
- /**
- * A simple abstraction over the HBaseContext.mapPartition method.
- *
- * It allow addition support for a user to take a RDD and generates a
- * new RDD based on Gets and the results they bring back from HBase
- *
- * @param rdd Original RDD with data to iterate over
- * @param tableName The name of the table to get from
- * @param makeGet function to convert a value in the RDD to a
- * HBase Get
- * @param convertResult This will convert the HBase Result object to
- * what ever the user wants to put in the resulting
- * RDD
- * return new RDD that is created by the Get to HBase
- */
- def bulkGet[T, U: ClassTag](tableName: TableName,
- batchSize: Integer,
- rdd: RDD[T],
- makeGet: (T) => Get,
- convertResult: (Result) => U): RDD[U] = {
-
- val getMapPartition = new GetMapPartition(tableName,
- batchSize,
- makeGet,
- convertResult)
-
- rdd.mapPartitions[U](it =>
- hbaseMapPartition[T, U](
- broadcastedConf,
- it,
- getMapPartition.run))
- }
-
- /**
- * A simple abstraction over the HBaseContext.streamMap method.
- *
- * It allow addition support for a user to take a DStream and
- * generates a new DStream based on Gets and the results
- * they bring back from HBase
- *
- * @param tableName The name of the table to get from
- * @param batchSize The number of Gets to be sent in a single batch
- * @param dStream Original DStream with data to iterate over
- * @param makeGet Function to convert a value in the DStream to a
- * HBase Get
- * @param convertResult This will convert the HBase Result object to
- * what ever the user wants to put in the resulting
- * DStream
- * @return A new DStream that is created by the Get to HBase
- */
- def streamBulkGet[T, U: ClassTag](tableName: TableName,
- batchSize: Integer,
- dStream: DStream[T],
- makeGet: (T) => Get,
- convertResult: (Result) => U): DStream[U] = {
-
- val getMapPartition = new GetMapPartition(tableName,
- batchSize,
- makeGet,
- convertResult)
-
- dStream.mapPartitions[U](it => hbaseMapPartition[T, U](
- broadcastedConf,
- it,
- getMapPartition.run))
- }
-
- /**
- * This function will use the native HBase TableInputFormat with the
- * given scan object to generate a new RDD
- *
- * @param tableName the name of the table to scan
- * @param scan the HBase scan object to use to read data from HBase
- * @param f function to convert a Result object from HBase into
- * what the user wants in the final generated RDD
- * @return new RDD with results from scan
- */
- def hbaseRDD[U: ClassTag](tableName: TableName, scan: Scan,
- f: ((ImmutableBytesWritable, Result)) => U): RDD[U] = {
-
- val job: Job = Job.getInstance(getConf(broadcastedConf))
-
- TableMapReduceUtil.initCredentials(job)
- TableMapReduceUtil.initTableMapperJob(tableName, scan,
- classOf[IdentityTableMapper], null, null, job)
-
- val jconf = new JobConf(job.getConfiguration)
- SparkHadoopUtil.get.addCredentials(jconf)
- new NewHBaseRDD(sc,
- classOf[TableInputFormat],
- classOf[ImmutableBytesWritable],
- classOf[Result],
- job.getConfiguration,
- this).map(f)
- }
-
- /**
- * A overloaded version of HBaseContext hbaseRDD that defines the
- * type of the resulting RDD
- *
- * @param tableName the name of the table to scan
- * @param scans the HBase scan object to use to read data from HBase
- * @return New RDD with results from scan
- *
- */
- def hbaseRDD(tableName: TableName, scans: Scan):
- RDD[(ImmutableBytesWritable, Result)] = {
-
- hbaseRDD[(ImmutableBytesWritable, Result)](
- tableName,
- scans,
- (r: (ImmutableBytesWritable, Result)) => r)
- }
-
- /**
- * underlining wrapper all foreach functions in HBaseContext
- */
- private def hbaseForeachPartition[T](configBroadcast:
- Broadcast[SerializableWritable[Configuration]],
- it: Iterator[T],
- f: (Iterator[T], Connection) => Unit) = {
-
- val config = getConf(configBroadcast)
-
- applyCreds
- // specify that this is a proxy user
- val smartConn = HBaseConnectionCache.getConnection(config)
- f(it, smartConn.connection)
- smartConn.close()
- }
-
- private def getConf(configBroadcast: Broadcast[SerializableWritable[Configuration]]):
- Configuration = {
-
- if (tmpHdfsConfiguration == null && tmpHdfsConfgFile != null) {
- val fs = FileSystem.newInstance(SparkHadoopUtil.get.conf)
- val inputStream = fs.open(new Path(tmpHdfsConfgFile))
- tmpHdfsConfiguration = new Configuration(false)
- tmpHdfsConfiguration.readFields(inputStream)
- inputStream.close()
- }
-
- if (tmpHdfsConfiguration == null) {
- try {
- tmpHdfsConfiguration = configBroadcast.value.value
- } catch {
- case ex: Exception => logError("Unable to getConfig from broadcast", ex)
- }
- }
- tmpHdfsConfiguration
- }
-
- /**
- * underlining wrapper all mapPartition functions in HBaseContext
- *
- */
- private def hbaseMapPartition[K, U](
- configBroadcast:
- Broadcast[SerializableWritable[Configuration]],
- it: Iterator[K],
- mp: (Iterator[K], Connection) =>
- Iterator[U]): Iterator[U] = {
-
- val config = getConf(configBroadcast)
- applyCreds
-
- val smartConn = HBaseConnectionCache.getConnection(config)
- val res = mp(it, smartConn.connection)
- smartConn.close()
- res
- }
-
- /**
- * underlining wrapper all get mapPartition functions in HBaseContext
- */
- private class GetMapPartition[T, U](tableName: TableName,
- batchSize: Integer,
- makeGet: (T) => Get,
- convertResult: (Result) => U)
- extends Serializable {
-
- val tName = tableName.getName
-
- def run(iterator: Iterator[T], connection: Connection): Iterator[U] = {
- val table = connection.getTable(TableName.valueOf(tName))
-
- val gets = new java.util.ArrayList[Get]()
- var res = List[U]()
-
- while (iterator.hasNext) {
- gets.add(makeGet(iterator.next()))
-
- if (gets.size() == batchSize) {
- val results = table.get(gets)
- res = res ++ results.map(convertResult)
- gets.clear()
- }
- }
- if (gets.size() > 0) {
- val results = table.get(gets)
- res = res ++ results.map(convertResult)
- gets.clear()
- }
- table.close()
- res.iterator
- }
- }
-
- /**
- * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
- *
- * This method is used to keep ClassTags out of the external Java API, as
- * the Java compiler cannot produce them automatically. While this
- * ClassTag-faking does please the compiler, it can cause problems at runtime
- * if the Scala API relies on ClassTags for correctness.
- *
- * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior,
- * just worse performance or security issues.
- * For instance, an Array of AnyRef can hold any type T, but may lose primitive
- * specialization.
- */
- private[spark]
- def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
-
- /**
- * Spark Implementation of HBase Bulk load for wide rows or when
- * values are not already combined at the time of the map process
- *
- * This will take the content from an existing RDD then sort and shuffle
- * it with respect to region splits. The result of that sort and shuffle
- * will be written to HFiles.
- *
- * After this function is executed the user will have to call
- * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase
- *
- * Also note this version of bulk load is different from past versions in
- * that it includes the qualifier as part of the sort process. The
- * reason for this is to be able to support rows will very large number
- * of columns.
- *
- * @param rdd The RDD we are bulk loading from
- * @param tableName The HBase table we are loading into
- * @param flatMap A flapMap function that will make every
- * row in the RDD
- * into N cells for the bulk load
- * @param stagingDir The location on the FileSystem to bulk load into
- * @param familyHFileWriteOptionsMap Options that will define how the HFile for a
- * column family is written
- * @param compactionExclude Compaction excluded for the HFiles
- * @param maxSize Max size for the HFiles before they roll
- * @tparam T The Type of values in the original RDD
- */
- def bulkLoad[T](rdd:RDD[T],
- tableName: TableName,
- flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])],
- stagingDir:String,
- familyHFileWriteOptionsMap:
- util.Map[Array[Byte], FamilyHFileWriteOptions] =
- new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
- compactionExclude: Boolean = false,
- maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):
- Unit = {
- val stagingPath = new Path(stagingDir)
- val fs = stagingPath.getFileSystem(config)
- if (fs.exists(stagingPath)) {
- throw new FileAlreadyExistsException("Path " + stagingDir + " already exists")
- }
- val conn = HBaseConnectionCache.getConnection(config)
- val regionLocator = conn.getRegionLocator(tableName)
- val startKeys = regionLocator.getStartKeys
- if (startKeys.length == 0) {
- logInfo("Table " + tableName.toString + " was not found")
- }
- val defaultCompressionStr = config.get("hfile.compression",
- Compression.Algorithm.NONE.getName)
- val hfileCompression = HFileWriterImpl
- .compressionByName(defaultCompressionStr)
- val nowTimeStamp = System.currentTimeMillis()
- val tableRawName = tableName.getName
-
- val familyHFileWriteOptionsMapInternal =
- new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
-
- val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()
-
- while (entrySetIt.hasNext) {
- val entry = entrySetIt.next()
- familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
- }
-
- val regionSplitPartitioner =
- new BulkLoadPartitioner(startKeys)
-
- //This is where all the magic happens
- //Here we are going to do the following things
- // 1. FlapMap every row in the RDD into key column value tuples
- // 2. Then we are going to repartition sort and shuffle
- // 3. Finally we are going to write out our HFiles
- rdd.flatMap( r => flatMap(r)).
- repartitionAndSortWithinPartitions(regionSplitPartitioner).
- hbaseForeachPartition(this, (it, conn) => {
-
- val conf = broadcastedConf.value.value
- val fs = FileSystem.get(conf)
- val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
- var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
- var rollOverRequested = false
- val localTableName = TableName.valueOf(tableRawName)
-
- //Here is where we finally iterate through the data in this partition of the
- //RDD that has been sorted and partitioned
- it.foreach{ case (keyFamilyQualifier, cellValue:Array[Byte]) =>
-
- val wl = writeValueToHFile(keyFamilyQualifier.rowKey,
- keyFamilyQualifier.family,
- keyFamilyQualifier.qualifier,
- cellValue,
- nowTimeStamp,
- fs,
- conn,
- localTableName,
- conf,
- familyHFileWriteOptionsMapInternal,
- hfileCompression,
- writerMap,
- stagingDir)
-
- rollOverRequested = rollOverRequested || wl.written > maxSize
-
- //This will only roll if we have at least one column family file that is
- //bigger then maxSize and we have finished a given row key
- if (rollOverRequested && Bytes.compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) {
- rollWriters(fs, writerMap,
- regionSplitPartitioner,
- previousRow,
- compactionExclude)
- rollOverRequested = false
- }
-
- previousRow = keyFamilyQualifier.rowKey
- }
- //We have finished all the data so lets close up the writers
- rollWriters(fs, writerMap,
- regionSplitPartitioner,
- previousRow,
- compactionExclude)
- rollOverRequested = false
- })
- }
-
- /**
- * Spark Implementation of HBase Bulk load for short rows some where less then
- * a 1000 columns. This bulk load should be faster for tables will thinner
- * rows then the other spark implementation of bulk load that puts only one
- * value into a record going into a shuffle
- *
- * This will take the content from an existing RDD then sort and shuffle
- * it with respect to region splits. The result of that sort and shuffle
- * will be written to HFiles.
- *
- * After this function is executed the user will have to call
- * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase
- *
- * In this implementation, only the rowKey is given to the shuffle as the key
- * and all the columns are already linked to the RowKey before the shuffle
- * stage. The sorting of the qualifier is done in memory out side of the
- * shuffle stage
- *
- * Also make sure that incoming RDDs only have one record for every row key.
- *
- * @param rdd The RDD we are bulk loading from
- * @param tableName The HBase table we are loading into
- * @param mapFunction A function that will convert the RDD records to
- * the key value format used for the shuffle to prep
- * for writing to the bulk loaded HFiles
- * @param stagingDir The location on the FileSystem to bulk load into
- * @param familyHFileWriteOptionsMap Options that will define how the HFile for a
- * column family is written
- * @param compactionExclude Compaction excluded for the HFiles
- * @param maxSize Max size for the HFiles before they roll
- * @tparam T The Type of values in the original RDD
- */
- def bulkLoadThinRows[T](rdd:RDD[T],
- tableName: TableName,
- mapFunction: (T) =>
- (ByteArrayWrapper, FamiliesQualifiersValues),
- stagingDir:String,
- familyHFileWriteOptionsMap:
- util.Map[Array[Byte], FamilyHFileWriteOptions] =
- new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
- compactionExclude: Boolean = false,
- maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):
- Unit = {
- val stagingPath = new Path(stagingDir)
- val fs = stagingPath.getFileSystem(config)
- if (fs.exists(stagingPath)) {
- throw new FileAlreadyExistsException("Path " + stagingDir + " already exists")
- }
- val conn = HBaseConnectionCache.getConnection(config)
- val regionLocator = conn.getRegionLocator(tableName)
- val startKeys = regionLocator.getStartKeys
- if (startKeys.length == 0) {
- logInfo("Table " + tableName.toString + " was not found")
- }
- val defaultCompressionStr = config.get("hfile.compression",
- Compression.Algorithm.NONE.getName)
- val defaultCompression = HFileWriterImpl
- .compressionByName(defaultCompressionStr)
- val nowTimeStamp = System.currentTimeMillis()
- val tableRawName = tableName.getName
-
- val familyHFileWriteOptionsMapInternal =
- new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
-
- val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()
-
- while (entrySetIt.hasNext) {
- val entry = entrySetIt.next()
- familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
- }
-
- val regionSplitPartitioner =
- new BulkLoadPartitioner(startKeys)
-
- //This is where all the magic happens
- //Here we are going to do the following things
- // 1. FlapMap every row in the RDD into key column value tuples
- // 2. Then we are going to repartition sort and shuffle
- // 3. Finally we are going to write out our HFiles
- rdd.map( r => mapFunction(r)).
- repartitionAndSortWithinPartitions(regionSplitPartitioner).
- hbaseForeachPartition(this, (it, conn) => {
-
- val conf = broadcastedConf.value.value
- val fs = FileSystem.get(conf)
- val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
- var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
- var rollOverRequested = false
- val localTableName = TableName.valueOf(tableRawName)
-
- //Here is where we finally iterate through the data in this partition of the
- //RDD that has been sorted and partitioned
- it.foreach{ case (rowKey:ByteArrayWrapper,
- familiesQualifiersValues:FamiliesQualifiersValues) =>
-
-
- if (Bytes.compareTo(previousRow, rowKey.value) == 0) {
- throw new KeyAlreadyExistsException("The following key was sent to the " +
- "HFile load more then one: " + Bytes.toString(previousRow))
- }
-
- //The family map is a tree map so the families will be sorted
- val familyIt = familiesQualifiersValues.familyMap.entrySet().iterator()
- while (familyIt.hasNext) {
- val familyEntry = familyIt.next()
-
- val family = familyEntry.getKey.value
-
- val qualifierIt = familyEntry.getValue.entrySet().iterator()
-
- //The qualifier map is a tree map so the families will be sorted
- while (qualifierIt.hasNext) {
-
- val qualifierEntry = qualifierIt.next()
- val qualifier = qualifierEntry.getKey
- val cellValue = qualifierEntry.getValue
-
- writeValueToHFile(rowKey.value,
- family,
- qualifier.value, // qualifier
- cellValue, // value
- nowTimeStamp,
- fs,
- conn,
- localTableName,
- conf,
- familyHFileWriteOptionsMapInternal,
- defaultCompression,
- writerMap,
- stagingDir)
-
- previousRow = rowKey.value
- }
-
- writerMap.values.foreach( wl => {
- rollOverRequested = rollOverRequested || wl.written > maxSize
-
- //This will only roll if we have at least one column family file that is
- //bigger then maxSize and we have finished a given row key
- if (rollOverRequested) {
- rollWriters(fs, writerMap,
- regionSplitPartitioner,
- previousRow,
- compactionExclude)
- rollOverRequested = false
- }
- })
- }
- }
-
- //This will get a writer for the column family
- //If there is no writer for a given column family then
- //it will get created here.
- //We have finished all the data so lets close up the writers
- rollWriters(fs, writerMap,
- regionSplitPartitioner,
- previousRow,
- compactionExclude)
- rollOverRequested = false
- })
- }
-
- /**
- * This will return a new HFile writer when requested
- *
- * @param family column family
- * @param conf configuration to connect to HBase
- * @param favoredNodes nodes that we would like to write too
- * @param fs FileSystem object where we will be writing the HFiles to
- * @return WriterLength object
- */
- private def getNewHFileWriter(family: Array[Byte], conf: Configuration,
- favoredNodes: Array[InetSocketAddress],
- fs:FileSystem,
- familydir:Path,
- familyHFileWriteOptionsMapInternal:
- util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions],
- defaultCompression:Compression.Algorithm): WriterLength = {
-
-
- var familyOptions = familyHFileWriteOptionsMapInternal.get(new ByteArrayWrapper(family))
-
- if (familyOptions == null) {
- familyOptions = new FamilyHFileWriteOptions(defaultCompression.toString,
- BloomType.NONE.toString, HConstants.DEFAULT_BLOCKSIZE, DataBlockEncoding.NONE.toString)
- familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(family), familyOptions)
- }
-
- val tempConf = new Configuration(conf)
- tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f)
- val contextBuilder = new HFileContextBuilder()
- .withCompression(Algorithm.valueOf(familyOptions.compression))
- .withChecksumType(HStore.getChecksumType(conf))
- .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
- .withBlockSize(familyOptions.blockSize)
-
- if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
- contextBuilder.withIncludesTags(true)
- }
-
- contextBuilder.withDataBlockEncoding(DataBlockEncoding.
- valueOf(familyOptions.dataBlockEncoding))
- val hFileContext = contextBuilder.build()
-
- //Add a '_' to the file name because this is a unfinished file. A rename will happen
- // to remove the '_' when the file is closed.
- new WriterLength(0,
- new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
- .withBloomType(BloomType.valueOf(familyOptions.bloomType))
- .withComparator(CellComparator.getInstance()).withFileContext(hFileContext)
- .withFilePath(new Path(familydir, "_" + UUID.randomUUID.toString.replaceAll("-", "")))
- .withFavoredNodes(favoredNodes).build())
-
- }
-
- /**
- * Encompasses the logic to write a value to an HFile
- *
- * @param rowKey The RowKey for the record
- * @param family HBase column family for the record
- * @param qualifier HBase column qualifier for the record
- * @param cellValue HBase cell value
- * @param nowTimeStamp The cell time stamp
- * @param fs Connection to the FileSystem for the HFile
- * @param conn Connection to HBaes
- * @param tableName HBase TableName object
- * @param conf Configuration to be used when making a new HFile
- * @param familyHFileWriteOptionsMapInternal Extra configs for the HFile
- * @param hfileCompression The compression codec for the new HFile
- * @param writerMap HashMap of existing writers and their offsets
- * @param stagingDir The staging directory on the FileSystem to store
- * the HFiles
- * @return The writer for the given HFile that was writen
- * too
- */
- private def writeValueToHFile(rowKey: Array[Byte],
- family: Array[Byte],
- qualifier: Array[Byte],
- cellValue:Array[Byte],
- nowTimeStamp: Long,
- fs: FileSystem,
- conn: Connection,
- tableName: TableName,
- conf: Configuration,
- familyHFileWriteOptionsMapInternal:
- util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions],
- hfileCompression:Compression.Algorithm,
- writerMap:mutable.HashMap[ByteArrayWrapper, WriterLength],
- stagingDir: String
- ): WriterLength = {
-
- val wl = writerMap.getOrElseUpdate(new ByteArrayWrapper(family), {
- val familyDir = new Path(stagingDir, Bytes.toString(family))
-
- fs.mkdirs(familyDir)
-
- val loc:HRegionLocation = {
- try {
- val locator =
- conn.getRegionLocator(tableName)
- locator.getRegionLocation(rowKey)
- } catch {
- case e: Throwable =>
- logWarning("there's something wrong when locating rowkey: " +
- Bytes.toString(rowKey))
- null
- }
- }
- if (null == loc) {
- if (log.isTraceEnabled) {
- logTrace("failed to get region location, so use default writer: " +
- Bytes.toString(rowKey))
- }
- getNewHFileWriter(family = family,
- conf = conf,
- favoredNodes = null,
- fs = fs,
- familydir = familyDir,
- familyHFileWriteOptionsMapInternal,
- hfileCompression)
- } else {
- if (log.isDebugEnabled) {
- logDebug("first rowkey: [" + Bytes.toString(rowKey) + "]")
- }
- val initialIsa =
- new InetSocketAddress(loc.getHostname, loc.getPort)
- if (initialIsa.isUnresolved) {
- if (log.isTraceEnabled) {
- logTrace("failed to resolve bind address: " + loc.getHostname + ":"
- + loc.getPort + ", so use default writer")
- }
- getNewHFileWriter(family,
- conf,
- null,
- fs,
- familyDir,
- familyHFileWriteOptionsMapInternal,
- hfileCompression)
- } else {
- if(log.isDebugEnabled) {
- logDebug("use favored nodes writer: " + initialIsa.getHostString)
- }
- getNewHFileWriter(family,
- conf,
- Array[InetSocketAddress](initialIsa),
- fs,
- familyDir,
- familyHFileWriteOptionsMapInternal,
- hfileCompression)
- }
- }
- })
-
- val keyValue =new KeyValue(rowKey,
- family,
- qualifier,
- nowTimeStamp,cellValue)
-
- wl.writer.append(keyValue)
- wl.written += keyValue.getLength
-
- wl
- }
-
- /**
- * This will roll all Writers
- * @param fs Hadoop FileSystem object
- * @param writerMap HashMap that contains all the writers
- * @param regionSplitPartitioner The partitioner with knowledge of how the
- * Region's are split by row key
- * @param previousRow The last row to fill the HFile ending range metadata
- * @param compactionExclude The exclude compaction metadata flag for the HFile
- */
- private def rollWriters(fs:FileSystem,
- writerMap:mutable.HashMap[ByteArrayWrapper, WriterLength],
- regionSplitPartitioner: BulkLoadPartitioner,
- previousRow: Array[Byte],
- compactionExclude: Boolean): Unit = {
- writerMap.values.foreach( wl => {
- if (wl.writer != null) {
- logDebug("Writer=" + wl.writer.getPath +
- (if (wl.written == 0) "" else ", wrote=" + wl.written))
- closeHFileWriter(fs, wl.writer,
- regionSplitPartitioner,
- previousRow,
- compactionExclude)
- }
- })
- writerMap.clear()
-
- }
-
- /**
- * Function to close an HFile
- * @param fs Hadoop FileSystem object
- * @param w HFile Writer
- * @param regionSplitPartitioner The partitioner with knowledge of how the
- * Region's are split by row key
- * @param previousRow The last row to fill the HFile ending range metadata
- * @param compactionExclude The exclude compaction metadata flag for the HFile
- */
- private def closeHFileWriter(fs:FileSystem,
- w: StoreFileWriter,
- regionSplitPartitioner: BulkLoadPartitioner,
- previousRow: Array[Byte],
- compactionExclude: Boolean): Unit = {
- if (w != null) {
- w.appendFileInfo(HStoreFile.BULKLOAD_TIME_KEY,
- Bytes.toBytes(System.currentTimeMillis()))
- w.appendFileInfo(HStoreFile.BULKLOAD_TASK_KEY,
- Bytes.toBytes(regionSplitPartitioner.getPartition(previousRow)))
- w.appendFileInfo(HStoreFile.MAJOR_COMPACTION_KEY,
- Bytes.toBytes(true))
- w.appendFileInfo(HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
- Bytes.toBytes(compactionExclude))
- w.appendTrackedTimestampsToMetadata()
- w.close()
-
- val srcPath = w.getPath
-
- //In the new path you will see that we are using substring. This is to
- // remove the '_' character in front of the HFile name. '_' is a character
- // that will tell HBase that this file shouldn't be included in the bulk load
- // This feature is to protect for unfinished HFiles being submitted to HBase
- val newPath = new Path(w.getPath.getParent, w.getPath.getName.substring(1))
- if (!fs.rename(srcPath, newPath)) {
- throw new IOException("Unable to rename '" + srcPath +
- "' to " + newPath)
- }
- }
- }
-
- /**
- * This is a wrapper class around StoreFileWriter. The reason for the
- * wrapper is to keep the length of the file along side the writer
- *
- * @param written The writer to be wrapped
- * @param writer The number of bytes written to the writer
- */
- class WriterLength(var written:Long, val writer:StoreFileWriter)
-}
-
-object LatestHBaseContextCache {
- var latest:HBaseContext = null
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala
deleted file mode 100644
index 4edde44..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.spark
-
-import org.apache.hadoop.hbase.TableName
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.client._
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable
-import org.apache.spark.streaming.dstream.DStream
-
-import scala.reflect.ClassTag
-
-/**
- * HBaseDStreamFunctions contains a set of implicit functions that can be
- * applied to a Spark DStream so that we can easily interact with HBase
- */
-@InterfaceAudience.Public
-object HBaseDStreamFunctions {
-
- /**
- * These are implicit methods for a DStream that contains any type of
- * data.
- *
- * @param dStream This is for dStreams of any type
- * @tparam T Type T
- */
- implicit class GenericHBaseDStreamFunctions[T](val dStream: DStream[T]) {
-
- /**
- * Implicit method that gives easy access to HBaseContext's bulk
- * put. This will not return a new Stream. Think of it like a foreach
- *
- * @param hc The hbaseContext object to identify which
- * HBase cluster connection to use
- * @param tableName The tableName that the put will be sent to
- * @param f The function that will turn the DStream values
- * into HBase Put objects.
- */
- def hbaseBulkPut(hc: HBaseContext,
- tableName: TableName,
- f: (T) => Put): Unit = {
- hc.streamBulkPut(dStream, tableName, f)
- }
-
- /**
- * Implicit method that gives easy access to HBaseContext's bulk
- * get. This will return a new DStream. Think about it as a DStream map
- * function. In that every DStream value will get a new value out of
- * HBase. That new value will populate the newly generated DStream.
- *
- * @param hc The hbaseContext object to identify which
- * HBase cluster connection to use
- * @param tableName The tableName that the put will be sent to
- * @param batchSize How many gets to execute in a single batch
- * @param f The function that will turn the RDD values
- * in HBase Get objects
- * @param convertResult The function that will convert a HBase
- * Result object into a value that will go
- * into the resulting DStream
- * @tparam R The type of Object that will be coming
- * out of the resulting DStream
- * @return A resulting DStream with type R objects
- */
- def hbaseBulkGet[R: ClassTag](hc: HBaseContext,
- tableName: TableName,
- batchSize:Int, f: (T) => Get, convertResult: (Result) => R):
- DStream[R] = {
- hc.streamBulkGet[T, R](tableName, batchSize, dStream, f, convertResult)
- }
-
- /**
- * Implicit method that gives easy access to HBaseContext's bulk
- * get. This will return a new DStream. Think about it as a DStream map
- * function. In that every DStream value will get a new value out of
- * HBase. That new value will populate the newly generated DStream.
- *
- * @param hc The hbaseContext object to identify which
- * HBase cluster connection to use
- * @param tableName The tableName that the put will be sent to
- * @param batchSize How many gets to execute in a single batch
- * @param f The function that will turn the RDD values
- * in HBase Get objects
- * @return A resulting DStream with type R objects
- */
- def hbaseBulkGet(hc: HBaseContext,
- tableName: TableName, batchSize:Int,
- f: (T) => Get): DStream[(ImmutableBytesWritable, Result)] = {
- hc.streamBulkGet[T, (ImmutableBytesWritable, Result)](
- tableName, batchSize, dStream, f,
- result => (new ImmutableBytesWritable(result.getRow), result))
- }
-
- /**
- * Implicit method that gives easy access to HBaseContext's bulk
- * Delete. This will not return a new DStream.
- *
- * @param hc The hbaseContext object to identify which HBase
- * cluster connection to use
- * @param tableName The tableName that the deletes will be sent to
- * @param f The function that will convert the DStream value into
- * a HBase Delete Object
- * @param batchSize The number of Deletes to be sent in a single batch
- */
- def hbaseBulkDelete(hc: HBaseContext,
- tableName: TableName,
- f:(T) => Delete, batchSize:Int): Unit = {
- hc.streamBulkDelete(dStream, tableName, f, batchSize)
- }
-
- /**
- * Implicit method that gives easy access to HBaseContext's
- * foreachPartition method. This will ack very much like a normal DStream
- * foreach method but for the fact that you will now have a HBase connection
- * while iterating through the values.
- *
- * @param hc The hbaseContext object to identify which HBase
- * cluster connection to use
- * @param f This function will get an iterator for a Partition of an
- * DStream along with a connection object to HBase
- */
- def hbaseForeachPartition(hc: HBaseContext,
- f: (Iterator[T], Connection) => Unit): Unit = {
- hc.streamForeachPartition(dStream, f)
- }
-
- /**
- * Implicit method that gives easy access to HBaseContext's
- * mapPartitions method. This will ask very much like a normal DStream
- * map partitions method but for the fact that you will now have a
- * HBase connection while iterating through the values
- *
- * @param hc The hbaseContext object to identify which HBase
- * cluster connection to use
- * @param f This function will get an iterator for a Partition of an
- * DStream along with a connection object to HBase
- * @tparam R This is the type of objects that will go into the resulting
- * DStream
- * @return A resulting DStream of type R
- */
- def hbaseMapPartitions[R: ClassTag](hc: HBaseContext,
- f: (Iterator[T], Connection) => Iterator[R]):
- DStream[R] = {
- hc.streamMapPartitions(dStream, f)
- }
- }
-}