You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/11/09 04:59:53 UTC

[7/9] hbase git commit: HBASE-18817 pull the hbase-spark module out of branch-2.

http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala
deleted file mode 100644
index 283838f..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import java.util
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.spark.datasources.{BytesEncoder, JavaBytesEncoder}
-import org.apache.hadoop.hbase.spark.datasources.JavaBytesEncoder.JavaBytesEncoder
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.spark.sql.datasources.hbase.{Field, Utils}
-import org.apache.spark.sql.types._
-/**
- * Dynamic logic for SQL push down logic there is an instance for most
- * common operations and a pass through for other operations not covered here
- *
- * Logic can be nested with And or Or operators.
- *
- * A logic tree can be written out as a string and reconstructed from that string
- *
- */
-@InterfaceAudience.Private
-trait DynamicLogicExpression {
-  def execute(columnToCurrentRowValueMap: util.HashMap[String, ByteArrayComparable],
-              valueFromQueryValueArray:Array[Array[Byte]]): Boolean
-  def toExpressionString: String = {
-    val strBuilder = new StringBuilder
-    appendToExpression(strBuilder)
-    strBuilder.toString()
-  }
-  def filterOps: JavaBytesEncoder = JavaBytesEncoder.Unknown
-
-  def appendToExpression(strBuilder:StringBuilder)
-
-  var encoder: BytesEncoder = _
-
-  def setEncoder(enc: BytesEncoder): DynamicLogicExpression = {
-    encoder = enc
-    this
-  }
-}
-
-@InterfaceAudience.Private
-trait CompareTrait {
-  self: DynamicLogicExpression =>
-  def columnName: String
-  def valueFromQueryIndex: Int
-  def execute(columnToCurrentRowValueMap:
-              util.HashMap[String, ByteArrayComparable],
-              valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
-    val currentRowValue = columnToCurrentRowValueMap.get(columnName)
-    val valueFromQuery = valueFromQueryValueArray(valueFromQueryIndex)
-    currentRowValue != null &&
-      encoder.filter(currentRowValue.bytes, currentRowValue.offset, currentRowValue.length,
-        valueFromQuery, 0, valueFromQuery.length, filterOps)
-  }
-}
-
-@InterfaceAudience.Private
-class AndLogicExpression (val leftExpression:DynamicLogicExpression,
-                           val rightExpression:DynamicLogicExpression)
-  extends DynamicLogicExpression{
-  override def execute(columnToCurrentRowValueMap:
-                       util.HashMap[String, ByteArrayComparable],
-                       valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
-    leftExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray) &&
-      rightExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray)
-  }
-
-  override def appendToExpression(strBuilder: StringBuilder): Unit = {
-    strBuilder.append("( ")
-    strBuilder.append(leftExpression.toExpressionString)
-    strBuilder.append(" AND ")
-    strBuilder.append(rightExpression.toExpressionString)
-    strBuilder.append(" )")
-  }
-}
-
-@InterfaceAudience.Private
-class OrLogicExpression (val leftExpression:DynamicLogicExpression,
-                          val rightExpression:DynamicLogicExpression)
-  extends DynamicLogicExpression{
-  override def execute(columnToCurrentRowValueMap:
-                       util.HashMap[String, ByteArrayComparable],
-                       valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
-    leftExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray) ||
-      rightExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray)
-  }
-  override def appendToExpression(strBuilder: StringBuilder): Unit = {
-    strBuilder.append("( ")
-    strBuilder.append(leftExpression.toExpressionString)
-    strBuilder.append(" OR ")
-    strBuilder.append(rightExpression.toExpressionString)
-    strBuilder.append(" )")
-  }
-}
-
-@InterfaceAudience.Private
-class EqualLogicExpression (val columnName:String,
-                            val valueFromQueryIndex:Int,
-                            val isNot:Boolean) extends DynamicLogicExpression{
-  override def execute(columnToCurrentRowValueMap:
-                       util.HashMap[String, ByteArrayComparable],
-                       valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
-    val currentRowValue = columnToCurrentRowValueMap.get(columnName)
-    val valueFromQuery = valueFromQueryValueArray(valueFromQueryIndex)
-
-    currentRowValue != null &&
-      Bytes.equals(valueFromQuery,
-        0, valueFromQuery.length, currentRowValue.bytes,
-        currentRowValue.offset, currentRowValue.length) != isNot
-  }
-  override def appendToExpression(strBuilder: StringBuilder): Unit = {
-    val command = if (isNot) "!=" else "=="
-    strBuilder.append(columnName + " " + command + " " + valueFromQueryIndex)
-  }
-}
-
-@InterfaceAudience.Private
-class IsNullLogicExpression (val columnName:String,
-                             val isNot:Boolean) extends DynamicLogicExpression{
-  override def execute(columnToCurrentRowValueMap:
-                       util.HashMap[String, ByteArrayComparable],
-                       valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
-    val currentRowValue = columnToCurrentRowValueMap.get(columnName)
-
-    (currentRowValue == null) != isNot
-  }
-  override def appendToExpression(strBuilder: StringBuilder): Unit = {
-    val command = if (isNot) "isNotNull" else "isNull"
-    strBuilder.append(columnName + " " + command)
-  }
-}
-
-@InterfaceAudience.Private
-class GreaterThanLogicExpression (override val columnName:String,
-                                  override val valueFromQueryIndex:Int)
-  extends DynamicLogicExpression with CompareTrait{
-  override val filterOps = JavaBytesEncoder.Greater
-  override def appendToExpression(strBuilder: StringBuilder): Unit = {
-    strBuilder.append(columnName + " > " + valueFromQueryIndex)
-  }
-}
-
-@InterfaceAudience.Private
-class GreaterThanOrEqualLogicExpression (override val columnName:String,
-                                         override val valueFromQueryIndex:Int)
-  extends DynamicLogicExpression with CompareTrait{
-  override val filterOps = JavaBytesEncoder.GreaterEqual
-  override def appendToExpression(strBuilder: StringBuilder): Unit = {
-    strBuilder.append(columnName + " >= " + valueFromQueryIndex)
-  }
-}
-
-@InterfaceAudience.Private
-class LessThanLogicExpression (override val columnName:String,
-                               override val valueFromQueryIndex:Int)
-  extends DynamicLogicExpression with CompareTrait {
-  override val filterOps = JavaBytesEncoder.Less
-  override def appendToExpression(strBuilder: StringBuilder): Unit = {
-    strBuilder.append(columnName + " < " + valueFromQueryIndex)
-  }
-}
-
-@InterfaceAudience.Private
-class LessThanOrEqualLogicExpression (val columnName:String,
-                                      val valueFromQueryIndex:Int)
-  extends DynamicLogicExpression with CompareTrait{
-  override val filterOps = JavaBytesEncoder.LessEqual
-  override def appendToExpression(strBuilder: StringBuilder): Unit = {
-    strBuilder.append(columnName + " <= " + valueFromQueryIndex)
-  }
-}
-
-@InterfaceAudience.Private
-class PassThroughLogicExpression() extends DynamicLogicExpression {
-  override def execute(columnToCurrentRowValueMap:
-                       util.HashMap[String, ByteArrayComparable],
-                       valueFromQueryValueArray: Array[Array[Byte]]): Boolean = true
-
-  override def appendToExpression(strBuilder: StringBuilder): Unit = {
-    // Fix the offset bug by add dummy to avoid crash the region server.
-    // because in the DynamicLogicExpressionBuilder.build function, the command is always retrieved from offset + 1 as below
-    // val command = expressionArray(offSet + 1)
-    // we have to padding it so that `Pass` is on the right offset.
-    strBuilder.append("dummy Pass -1")
-  }
-}
-
-@InterfaceAudience.Private
-object DynamicLogicExpressionBuilder {
-  def build(expressionString: String, encoder: BytesEncoder): DynamicLogicExpression = {
-
-    val expressionAndOffset = build(expressionString.split(' '), 0, encoder)
-    expressionAndOffset._1
-  }
-
-  private def build(expressionArray:Array[String],
-                    offSet:Int, encoder: BytesEncoder): (DynamicLogicExpression, Int) = {
-    val expr = {
-      if (expressionArray(offSet).equals("(")) {
-        val left = build(expressionArray, offSet + 1, encoder)
-        val right = build(expressionArray, left._2 + 1, encoder)
-        if (expressionArray(left._2).equals("AND")) {
-          (new AndLogicExpression(left._1, right._1), right._2 + 1)
-        } else if (expressionArray(left._2).equals("OR")) {
-          (new OrLogicExpression(left._1, right._1), right._2 + 1)
-        } else {
-          throw new Throwable("Unknown gate:" + expressionArray(left._2))
-        }
-      } else {
-        val command = expressionArray(offSet + 1)
-        if (command.equals("<")) {
-          (new LessThanLogicExpression(expressionArray(offSet),
-            expressionArray(offSet + 2).toInt), offSet + 3)
-        } else if (command.equals("<=")) {
-          (new LessThanOrEqualLogicExpression(expressionArray(offSet),
-            expressionArray(offSet + 2).toInt), offSet + 3)
-        } else if (command.equals(">")) {
-          (new GreaterThanLogicExpression(expressionArray(offSet),
-            expressionArray(offSet + 2).toInt), offSet + 3)
-        } else if (command.equals(">=")) {
-          (new GreaterThanOrEqualLogicExpression(expressionArray(offSet),
-            expressionArray(offSet + 2).toInt), offSet + 3)
-        } else if (command.equals("==")) {
-          (new EqualLogicExpression(expressionArray(offSet),
-            expressionArray(offSet + 2).toInt, false), offSet + 3)
-        } else if (command.equals("!=")) {
-          (new EqualLogicExpression(expressionArray(offSet),
-            expressionArray(offSet + 2).toInt, true), offSet + 3)
-        } else if (command.equals("isNull")) {
-          (new IsNullLogicExpression(expressionArray(offSet), false), offSet + 2)
-        } else if (command.equals("isNotNull")) {
-          (new IsNullLogicExpression(expressionArray(offSet), true), offSet + 2)
-        } else if (command.equals("Pass")) {
-          (new PassThroughLogicExpression, offSet + 3)
-        } else {
-          throw new Throwable("Unknown logic command:" + command)
-        }
-      }
-    }
-    expr._1.setEncoder(encoder)
-    expr
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala
deleted file mode 100644
index 7a651e1..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.spark
-
-import java.util
-
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * This object is a clean way to store and sort all cells that will be bulk
- * loaded into a single row
- */
-@InterfaceAudience.Public
-class FamiliesQualifiersValues extends Serializable {
-  //Tree maps are used because we need the results to
-  // be sorted when we read them
-  val familyMap = new util.TreeMap[ByteArrayWrapper,
-    util.TreeMap[ByteArrayWrapper, Array[Byte]]]()
-
-  //normally in a row there are more columns then
-  //column families this wrapper is reused for column
-  //family look ups
-  val reusableWrapper = new ByteArrayWrapper(null)
-
-  /**
-   * Adds a new cell to an existing row
-   * @param family    HBase column family
-   * @param qualifier HBase column qualifier
-   * @param value     HBase cell value
-   */
-  def += (family: Array[Byte], qualifier: Array[Byte], value: Array[Byte]): Unit = {
-
-    reusableWrapper.value = family
-
-    var qualifierValues = familyMap.get(reusableWrapper)
-
-    if (qualifierValues == null) {
-      qualifierValues = new util.TreeMap[ByteArrayWrapper, Array[Byte]]()
-      familyMap.put(new ByteArrayWrapper(family), qualifierValues)
-    }
-
-    qualifierValues.put(new ByteArrayWrapper(qualifier), value)
-  }
-
-  /**
-    * A wrapper for "+=" method above, can be used by Java
-    * @param family    HBase column family
-    * @param qualifier HBase column qualifier
-    * @param value     HBase cell value
-    */
-  def add(family: Array[Byte], qualifier: Array[Byte], value: Array[Byte]): Unit = {
-    this += (family, qualifier, value)
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamilyHFileWriteOptions.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamilyHFileWriteOptions.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamilyHFileWriteOptions.scala
deleted file mode 100644
index 9ee9291..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamilyHFileWriteOptions.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import java.io.Serializable
-
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * This object will hold optional data for how a given column family's
- * writer will work
- *
- * @param compression       String to define the Compression to be used in the HFile
- * @param bloomType         String to define the bloom type to be used in the HFile
- * @param blockSize         The block size to be used in the HFile
- * @param dataBlockEncoding String to define the data block encoding to be used
- *                          in the HFile
- */
-@InterfaceAudience.Public
-class FamilyHFileWriteOptions( val compression:String,
-                               val bloomType: String,
-                               val blockSize: Int,
-                               val dataBlockEncoding: String) extends Serializable

http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
deleted file mode 100644
index 2858da8..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import java.io.IOException
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.client.{Admin, Connection, ConnectionFactory, RegionLocator, Table}
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory
-import org.apache.hadoop.hbase.security.{User, UserProvider}
-import org.apache.hadoop.hbase.spark.datasources.HBaseSparkConf
-import org.apache.hadoop.hbase.{HConstants, TableName}
-import org.apache.spark.Logging
-
-import scala.collection.mutable
-
-private[spark] object HBaseConnectionCache extends Logging {
-
-  // A hashmap of Spark-HBase connections. Key is HBaseConnectionKey.
-  val connectionMap = new mutable.HashMap[HBaseConnectionKey, SmartConnection]()
-
-  val cacheStat = HBaseConnectionCacheStat(0, 0, 0)
-
-  // in milliseconds
-  private final val DEFAULT_TIME_OUT: Long = HBaseSparkConf.DEFAULT_CONNECTION_CLOSE_DELAY
-  private var timeout = DEFAULT_TIME_OUT
-  private var closed: Boolean = false
-
-  var housekeepingThread = new Thread(new Runnable {
-    override def run() {
-      while (true) {
-        try {
-          Thread.sleep(timeout)
-        } catch {
-          case e: InterruptedException =>
-            // setTimeout() and close() may interrupt the sleep and it's safe
-            // to ignore the exception
-        }
-        if (closed)
-          return
-        performHousekeeping(false)
-      }
-    }
-  })
-  housekeepingThread.setDaemon(true)
-  housekeepingThread.start()
-
-  def getStat: HBaseConnectionCacheStat = {
-    connectionMap.synchronized {
-      cacheStat.numActiveConnections = connectionMap.size
-      cacheStat.copy()
-    }
-  }
-
-  def close(): Unit = {
-    try {
-      connectionMap.synchronized {
-        if (closed)
-          return
-        closed = true
-        housekeepingThread.interrupt()
-        housekeepingThread = null
-        HBaseConnectionCache.performHousekeeping(true)
-      }
-    } catch {
-      case e: Exception => logWarning("Error in finalHouseKeeping", e)
-    }
-  }
-
-  def performHousekeeping(forceClean: Boolean) = {
-    val tsNow: Long = System.currentTimeMillis()
-    connectionMap.synchronized {
-      connectionMap.foreach {
-        x => {
-          if(x._2.refCount < 0) {
-            logError(s"Bug to be fixed: negative refCount of connection ${x._2}")
-          }
-
-          if(forceClean || ((x._2.refCount <= 0) && (tsNow - x._2.timestamp > timeout))) {
-            try{
-              x._2.connection.close()
-            } catch {
-              case e: IOException => logWarning(s"Fail to close connection ${x._2}", e)
-            }
-            connectionMap.remove(x._1)
-          }
-        }
-      }
-    }
-  }
-
-  // For testing purpose only
-  def getConnection(key: HBaseConnectionKey, conn: => Connection): SmartConnection = {
-    connectionMap.synchronized {
-      if (closed)
-        return null
-      cacheStat.numTotalRequests += 1
-      val sc = connectionMap.getOrElseUpdate(key, {cacheStat.numActualConnectionsCreated += 1
-        new SmartConnection(conn)})
-      sc.refCount += 1
-      sc
-    }
-  }
-
-  def getConnection(conf: Configuration): SmartConnection =
-    getConnection(new HBaseConnectionKey(conf), ConnectionFactory.createConnection(conf))
-
-  // For testing purpose only
-  def setTimeout(to: Long): Unit  = {
-    connectionMap.synchronized {
-      if (closed)
-        return
-      timeout = to
-      housekeepingThread.interrupt()
-    }
-  }
-}
-
-private[hbase] case class SmartConnection (
-    connection: Connection, var refCount: Int = 0, var timestamp: Long = 0) {
-  def getTable(tableName: TableName): Table = connection.getTable(tableName)
-  def getRegionLocator(tableName: TableName): RegionLocator = connection.getRegionLocator(tableName)
-  def isClosed: Boolean = connection.isClosed
-  def getAdmin: Admin = connection.getAdmin
-  def close() = {
-    HBaseConnectionCache.connectionMap.synchronized {
-      refCount -= 1
-      if(refCount <= 0)
-        timestamp = System.currentTimeMillis()
-    }
-  }
-}
-
-/**
- * Denotes a unique key to an HBase Connection instance.
- * Please refer to 'org.apache.hadoop.hbase.client.HConnectionKey'.
- *
- * In essence, this class captures the properties in Configuration
- * that may be used in the process of establishing a connection.
- *
- */
-class HBaseConnectionKey(c: Configuration) extends Logging {
-  val conf: Configuration = c
-  val CONNECTION_PROPERTIES: Array[String] = Array[String](
-    HConstants.ZOOKEEPER_QUORUM,
-    HConstants.ZOOKEEPER_ZNODE_PARENT,
-    HConstants.ZOOKEEPER_CLIENT_PORT,
-    HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME,
-    HConstants.HBASE_CLIENT_PAUSE,
-    HConstants.HBASE_CLIENT_RETRIES_NUMBER,
-    HConstants.HBASE_RPC_TIMEOUT_KEY,
-    HConstants.HBASE_META_SCANNER_CACHING,
-    HConstants.HBASE_CLIENT_INSTANCE_ID,
-    HConstants.RPC_CODEC_CONF_KEY,
-    HConstants.USE_META_REPLICAS,
-    RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY)
-
-  var username: String = _
-  var m_properties = mutable.HashMap.empty[String, String]
-  if (conf != null) {
-    for (property <- CONNECTION_PROPERTIES) {
-      val value: String = conf.get(property)
-      if (value != null) {
-        m_properties.+=((property, value))
-      }
-    }
-    try {
-      val provider: UserProvider = UserProvider.instantiate(conf)
-      val currentUser: User = provider.getCurrent
-      if (currentUser != null) {
-        username = currentUser.getName
-      }
-    }
-    catch {
-      case e: IOException => {
-        logWarning("Error obtaining current user, skipping username in HBaseConnectionKey", e)
-      }
-    }
-  }
-
-  // make 'properties' immutable
-  val properties = m_properties.toMap
-
-  override def hashCode: Int = {
-    val prime: Int = 31
-    var result: Int = 1
-    if (username != null) {
-      result = username.hashCode
-    }
-    for (property <- CONNECTION_PROPERTIES) {
-      val value: Option[String] = properties.get(property)
-      if (value.isDefined) {
-        result = prime * result + value.hashCode
-      }
-    }
-    result
-  }
-
-  override def equals(obj: Any): Boolean = {
-    if (obj == null) return false
-    if (getClass ne obj.getClass) return false
-    val that: HBaseConnectionKey = obj.asInstanceOf[HBaseConnectionKey]
-    if (this.username != null && !(this.username == that.username)) {
-      return false
-    }
-    else if (this.username == null && that.username != null) {
-      return false
-    }
-    if (this.properties == null) {
-      if (that.properties != null) {
-        return false
-      }
-    }
-    else {
-      if (that.properties == null) {
-        return false
-      }
-      var flag: Boolean = true
-      for (property <- CONNECTION_PROPERTIES) {
-        val thisValue: Option[String] = this.properties.get(property)
-        val thatValue: Option[String] = that.properties.get(property)
-        flag = true
-        if (thisValue eq thatValue) {
-          flag = false //continue, so make flag be false
-        }
-        if (flag && (thisValue == null || !(thisValue == thatValue))) {
-          return false
-        }
-      }
-    }
-    true
-  }
-
-  override def toString: String = {
-    "HBaseConnectionKey{" + "properties=" + properties + ", username='" + username + '\'' + '}'
-  }
-}
-
-/**
- * To log the state of 'HBaseConnectionCache'
- *
- * @param numTotalRequests number of total connection requests to the cache
- * @param numActualConnectionsCreated number of actual HBase connections the cache ever created
- * @param numActiveConnections number of current alive HBase connections the cache is holding
- */
-case class HBaseConnectionCacheStat(var numTotalRequests: Long,
-                                    var numActualConnectionsCreated: Long,
-                                    var numActiveConnections: Long)
-
-

http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
deleted file mode 100644
index eb0d683..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
+++ /dev/null
@@ -1,1115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import java.net.InetSocketAddress
-import java.util
-import java.util.UUID
-import javax.management.openmbean.KeyAlreadyExistsException
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.fs.HFileSystem
-import org.apache.hadoop.hbase._
-import org.apache.hadoop.hbase.io.compress.Compression
-import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
-import org.apache.hadoop.hbase.io.hfile.{HFile, CacheConfig, HFileContextBuilder, HFileWriterImpl}
-import org.apache.hadoop.hbase.regionserver.{HStore, HStoreFile, StoreFileWriter, BloomType}
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.hadoop.mapred.JobConf
-import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.RDD
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
-import org.apache.hadoop.hbase.client._
-import scala.reflect.ClassTag
-import org.apache.spark.{Logging, SerializableWritable, SparkContext}
-import org.apache.hadoop.hbase.mapreduce.{TableMapReduceUtil,
-TableInputFormat, IdentityTableMapper}
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable
-import org.apache.hadoop.mapreduce.Job
-import org.apache.spark.streaming.dstream.DStream
-import java.io._
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
-import org.apache.hadoop.fs.{Path, FileAlreadyExistsException, FileSystem}
-import scala.collection.mutable
-
-/**
-  * HBaseContext is a façade for HBase operations
-  * like bulk put, get, increment, delete, and scan
-  *
-  * HBaseContext will take the responsibilities
-  * of disseminating the configuration information
-  * to the working and managing the life cycle of Connections.
- */
-@InterfaceAudience.Public
-class HBaseContext(@transient sc: SparkContext,
-                   @transient val config: Configuration,
-                   val tmpHdfsConfgFile: String = null)
-  extends Serializable with Logging {
-
-  @transient var credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
-  @transient var tmpHdfsConfiguration:Configuration = config
-  @transient var appliedCredentials = false
-  @transient val job = Job.getInstance(config)
-  TableMapReduceUtil.initCredentials(job)
-  val broadcastedConf = sc.broadcast(new SerializableWritable(config))
-  val credentialsConf = sc.broadcast(new SerializableWritable(job.getCredentials))
-
-  LatestHBaseContextCache.latest = this
-
-  if (tmpHdfsConfgFile != null && config != null) {
-    val fs = FileSystem.newInstance(config)
-    val tmpPath = new Path(tmpHdfsConfgFile)
-    if (!fs.exists(tmpPath)) {
-      val outputStream = fs.create(tmpPath)
-      config.write(outputStream)
-      outputStream.close()
-    } else {
-      logWarning("tmpHdfsConfigDir " + tmpHdfsConfgFile + " exist!!")
-    }
-  }
-
-  /**
-   * A simple enrichment of the traditional Spark RDD foreachPartition.
-   * This function differs from the original in that it offers the
-   * developer access to a already connected Connection object
-   *
-   * Note: Do not close the Connection object.  All Connection
-   * management is handled outside this method
-   *
-   * @param rdd  Original RDD with data to iterate over
-   * @param f    Function to be given a iterator to iterate through
-   *             the RDD values and a Connection object to interact
-   *             with HBase
-   */
-  def foreachPartition[T](rdd: RDD[T],
-                          f: (Iterator[T], Connection) => Unit):Unit = {
-    rdd.foreachPartition(
-      it => hbaseForeachPartition(broadcastedConf, it, f))
-  }
-
-  /**
-   * A simple enrichment of the traditional Spark Streaming dStream foreach
-   * This function differs from the original in that it offers the
-   * developer access to a already connected Connection object
-   *
-   * Note: Do not close the Connection object.  All Connection
-   * management is handled outside this method
-   *
-   * @param dstream  Original DStream with data to iterate over
-   * @param f        Function to be given a iterator to iterate through
-   *                 the DStream values and a Connection object to
-   *                 interact with HBase
-   */
-  def foreachPartition[T](dstream: DStream[T],
-                    f: (Iterator[T], Connection) => Unit):Unit = {
-    dstream.foreachRDD((rdd, time) => {
-      foreachPartition(rdd, f)
-    })
-  }
-
-  /**
-   * A simple enrichment of the traditional Spark RDD mapPartition.
-   * This function differs from the original in that it offers the
-   * developer access to a already connected Connection object
-   *
-   * Note: Do not close the Connection object.  All Connection
-   * management is handled outside this method
-   *
-   * @param rdd  Original RDD with data to iterate over
-   * @param mp   Function to be given a iterator to iterate through
-   *             the RDD values and a Connection object to interact
-   *             with HBase
-   * @return     Returns a new RDD generated by the user definition
-   *             function just like normal mapPartition
-   */
-  def mapPartitions[T, R: ClassTag](rdd: RDD[T],
-                                   mp: (Iterator[T], Connection) => Iterator[R]): RDD[R] = {
-
-    rdd.mapPartitions[R](it => hbaseMapPartition[T, R](broadcastedConf,
-      it,
-      mp))
-
-  }
-
-  /**
-   * A simple enrichment of the traditional Spark Streaming DStream
-   * foreachPartition.
-   *
-   * This function differs from the original in that it offers the
-   * developer access to a already connected Connection object
-   *
-   * Note: Do not close the Connection object.  All Connection
-   * management is handled outside this method
-   *
-   * Note: Make sure to partition correctly to avoid memory issue when
-   *       getting data from HBase
-   *
-   * @param dstream  Original DStream with data to iterate over
-   * @param f       Function to be given a iterator to iterate through
-   *                 the DStream values and a Connection object to
-   *                 interact with HBase
-   * @return         Returns a new DStream generated by the user
-   *                 definition function just like normal mapPartition
-   */
-  def streamForeachPartition[T](dstream: DStream[T],
-                                f: (Iterator[T], Connection) => Unit): Unit = {
-
-    dstream.foreachRDD(rdd => this.foreachPartition(rdd, f))
-  }
-
-  /**
-   * A simple enrichment of the traditional Spark Streaming DStream
-   * mapPartition.
-   *
-   * This function differs from the original in that it offers the
-   * developer access to a already connected Connection object
-   *
-   * Note: Do not close the Connection object.  All Connection
-   * management is handled outside this method
-   *
-   * Note: Make sure to partition correctly to avoid memory issue when
-   *       getting data from HBase
-   *
-   * @param dstream  Original DStream with data to iterate over
-   * @param f       Function to be given a iterator to iterate through
-   *                 the DStream values and a Connection object to
-   *                 interact with HBase
-   * @return         Returns a new DStream generated by the user
-   *                 definition function just like normal mapPartition
-   */
-  def streamMapPartitions[T, U: ClassTag](dstream: DStream[T],
-                                f: (Iterator[T], Connection) => Iterator[U]):
-  DStream[U] = {
-    dstream.mapPartitions(it => hbaseMapPartition[T, U](
-      broadcastedConf,
-      it,
-      f))
-  }
-
-  /**
-   * A simple abstraction over the HBaseContext.foreachPartition method.
-   *
-   * It allow addition support for a user to take RDD
-   * and generate puts and send them to HBase.
-   * The complexity of managing the Connection is
-   * removed from the developer
-   *
-   * @param rdd       Original RDD with data to iterate over
-   * @param tableName The name of the table to put into
-   * @param f         Function to convert a value in the RDD to a HBase Put
-   */
-  def bulkPut[T](rdd: RDD[T], tableName: TableName, f: (T) => Put) {
-
-    val tName = tableName.getName
-    rdd.foreachPartition(
-      it => hbaseForeachPartition[T](
-        broadcastedConf,
-        it,
-        (iterator, connection) => {
-          val m = connection.getBufferedMutator(TableName.valueOf(tName))
-          iterator.foreach(T => m.mutate(f(T)))
-          m.flush()
-          m.close()
-        }))
-  }
-
-  def applyCreds[T] (){
-    credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
-
-    logDebug("appliedCredentials:" + appliedCredentials + ",credentials:" + credentials)
-
-    if (!appliedCredentials && credentials != null) {
-      appliedCredentials = true
-
-      @transient val ugi = UserGroupInformation.getCurrentUser
-      ugi.addCredentials(credentials)
-      // specify that this is a proxy user
-      ugi.setAuthenticationMethod(AuthenticationMethod.PROXY)
-
-      ugi.addCredentials(credentialsConf.value.value)
-    }
-  }
-
-  /**
-   * A simple abstraction over the HBaseContext.streamMapPartition method.
-   *
-   * It allow addition support for a user to take a DStream and
-   * generate puts and send them to HBase.
-   *
-   * The complexity of managing the Connection is
-   * removed from the developer
-   *
-   * @param dstream    Original DStream with data to iterate over
-   * @param tableName  The name of the table to put into
-   * @param f          Function to convert a value in
-   *                   the DStream to a HBase Put
-   */
-  def streamBulkPut[T](dstream: DStream[T],
-                       tableName: TableName,
-                       f: (T) => Put) = {
-    val tName = tableName.getName
-    dstream.foreachRDD((rdd, time) => {
-      bulkPut(rdd, TableName.valueOf(tName), f)
-    })
-  }
-
-  /**
-   * A simple abstraction over the HBaseContext.foreachPartition method.
-   *
-   * It allow addition support for a user to take a RDD and generate delete
-   * and send them to HBase.  The complexity of managing the Connection is
-   * removed from the developer
-   *
-   * @param rdd       Original RDD with data to iterate over
-   * @param tableName The name of the table to delete from
-   * @param f         Function to convert a value in the RDD to a
-   *                  HBase Deletes
-   * @param batchSize       The number of delete to batch before sending to HBase
-   */
-  def bulkDelete[T](rdd: RDD[T], tableName: TableName,
-                    f: (T) => Delete, batchSize: Integer) {
-    bulkMutation(rdd, tableName, f, batchSize)
-  }
-
-  /**
-   * A simple abstraction over the HBaseContext.streamBulkMutation method.
-   *
-   * It allow addition support for a user to take a DStream and
-   * generate Delete and send them to HBase.
-   *
-   * The complexity of managing the Connection is
-   * removed from the developer
-   *
-   * @param dstream    Original DStream with data to iterate over
-   * @param tableName  The name of the table to delete from
-   * @param f          function to convert a value in the DStream to a
-   *                   HBase Delete
-   * @param batchSize        The number of deletes to batch before sending to HBase
-   */
-  def streamBulkDelete[T](dstream: DStream[T],
-                          tableName: TableName,
-                          f: (T) => Delete,
-                          batchSize: Integer) = {
-    streamBulkMutation(dstream, tableName, f, batchSize)
-  }
-
-  /**
-   *  Under lining function to support all bulk mutations
-   *
-   *  May be opened up if requested
-   */
-  private def bulkMutation[T](rdd: RDD[T], tableName: TableName,
-                              f: (T) => Mutation, batchSize: Integer) {
-
-    val tName = tableName.getName
-    rdd.foreachPartition(
-      it => hbaseForeachPartition[T](
-        broadcastedConf,
-        it,
-        (iterator, connection) => {
-          val table = connection.getTable(TableName.valueOf(tName))
-          val mutationList = new java.util.ArrayList[Mutation]
-          iterator.foreach(T => {
-            mutationList.add(f(T))
-            if (mutationList.size >= batchSize) {
-              table.batch(mutationList, null)
-              mutationList.clear()
-            }
-          })
-          if (mutationList.size() > 0) {
-            table.batch(mutationList, null)
-            mutationList.clear()
-          }
-          table.close()
-        }))
-  }
-
-  /**
-   *  Under lining function to support all bulk streaming mutations
-   *
-   *  May be opened up if requested
-   */
-  private def streamBulkMutation[T](dstream: DStream[T],
-                                    tableName: TableName,
-                                    f: (T) => Mutation,
-                                    batchSize: Integer) = {
-    val tName = tableName.getName
-    dstream.foreachRDD((rdd, time) => {
-      bulkMutation(rdd, TableName.valueOf(tName), f, batchSize)
-    })
-  }
-
-  /**
-   * A simple abstraction over the HBaseContext.mapPartition method.
-   *
-   * It allow addition support for a user to take a RDD and generates a
-   * new RDD based on Gets and the results they bring back from HBase
-   *
-   * @param rdd     Original RDD with data to iterate over
-   * @param tableName        The name of the table to get from
-   * @param makeGet    function to convert a value in the RDD to a
-   *                   HBase Get
-   * @param convertResult This will convert the HBase Result object to
-   *                   what ever the user wants to put in the resulting
-   *                   RDD
-   * return            new RDD that is created by the Get to HBase
-   */
-  def bulkGet[T, U: ClassTag](tableName: TableName,
-                    batchSize: Integer,
-                    rdd: RDD[T],
-                    makeGet: (T) => Get,
-                    convertResult: (Result) => U): RDD[U] = {
-
-    val getMapPartition = new GetMapPartition(tableName,
-      batchSize,
-      makeGet,
-      convertResult)
-
-    rdd.mapPartitions[U](it =>
-      hbaseMapPartition[T, U](
-        broadcastedConf,
-        it,
-        getMapPartition.run))
-  }
-
-  /**
-   * A simple abstraction over the HBaseContext.streamMap method.
-   *
-   * It allow addition support for a user to take a DStream and
-   * generates a new DStream based on Gets and the results
-   * they bring back from HBase
-   *
-   * @param tableName     The name of the table to get from
-   * @param batchSize     The number of Gets to be sent in a single batch
-   * @param dStream       Original DStream with data to iterate over
-   * @param makeGet       Function to convert a value in the DStream to a
-   *                      HBase Get
-   * @param convertResult This will convert the HBase Result object to
-   *                      what ever the user wants to put in the resulting
-   *                      DStream
-   * @return              A new DStream that is created by the Get to HBase
-   */
-  def streamBulkGet[T, U: ClassTag](tableName: TableName,
-                                    batchSize: Integer,
-                                    dStream: DStream[T],
-                                    makeGet: (T) => Get,
-                                    convertResult: (Result) => U): DStream[U] = {
-
-    val getMapPartition = new GetMapPartition(tableName,
-      batchSize,
-      makeGet,
-      convertResult)
-
-    dStream.mapPartitions[U](it => hbaseMapPartition[T, U](
-      broadcastedConf,
-      it,
-      getMapPartition.run))
-  }
-
-  /**
-   * This function will use the native HBase TableInputFormat with the
-   * given scan object to generate a new RDD
-   *
-   *  @param tableName the name of the table to scan
-   *  @param scan      the HBase scan object to use to read data from HBase
-   *  @param f         function to convert a Result object from HBase into
-   *                   what the user wants in the final generated RDD
-   *  @return          new RDD with results from scan
-   */
-  def hbaseRDD[U: ClassTag](tableName: TableName, scan: Scan,
-                            f: ((ImmutableBytesWritable, Result)) => U): RDD[U] = {
-
-    val job: Job = Job.getInstance(getConf(broadcastedConf))
-
-    TableMapReduceUtil.initCredentials(job)
-    TableMapReduceUtil.initTableMapperJob(tableName, scan,
-      classOf[IdentityTableMapper], null, null, job)
-
-    val jconf = new JobConf(job.getConfiguration)
-    SparkHadoopUtil.get.addCredentials(jconf)
-    new NewHBaseRDD(sc,
-      classOf[TableInputFormat],
-      classOf[ImmutableBytesWritable],
-      classOf[Result],
-      job.getConfiguration,
-      this).map(f)
-  }
-
-  /**
-   * A overloaded version of HBaseContext hbaseRDD that defines the
-   * type of the resulting RDD
-   *
-   *  @param tableName the name of the table to scan
-   *  @param scans     the HBase scan object to use to read data from HBase
-   *  @return          New RDD with results from scan
-   *
-   */
-  def hbaseRDD(tableName: TableName, scans: Scan):
-  RDD[(ImmutableBytesWritable, Result)] = {
-
-    hbaseRDD[(ImmutableBytesWritable, Result)](
-      tableName,
-      scans,
-      (r: (ImmutableBytesWritable, Result)) => r)
-  }
-
-  /**
-   *  underlining wrapper all foreach functions in HBaseContext
-   */
-  private def hbaseForeachPartition[T](configBroadcast:
-                                       Broadcast[SerializableWritable[Configuration]],
-                                        it: Iterator[T],
-                                        f: (Iterator[T], Connection) => Unit) = {
-
-    val config = getConf(configBroadcast)
-
-    applyCreds
-    // specify that this is a proxy user
-    val smartConn = HBaseConnectionCache.getConnection(config)
-    f(it, smartConn.connection)
-    smartConn.close()
-  }
-
-  private def getConf(configBroadcast: Broadcast[SerializableWritable[Configuration]]):
-  Configuration = {
-
-    if (tmpHdfsConfiguration == null && tmpHdfsConfgFile != null) {
-      val fs = FileSystem.newInstance(SparkHadoopUtil.get.conf)
-      val inputStream = fs.open(new Path(tmpHdfsConfgFile))
-      tmpHdfsConfiguration = new Configuration(false)
-      tmpHdfsConfiguration.readFields(inputStream)
-      inputStream.close()
-    }
-
-    if (tmpHdfsConfiguration == null) {
-      try {
-        tmpHdfsConfiguration = configBroadcast.value.value
-      } catch {
-        case ex: Exception => logError("Unable to getConfig from broadcast", ex)
-      }
-    }
-    tmpHdfsConfiguration
-  }
-
-  /**
-   *  underlining wrapper all mapPartition functions in HBaseContext
-   *
-   */
-  private def hbaseMapPartition[K, U](
-                                       configBroadcast:
-                                       Broadcast[SerializableWritable[Configuration]],
-                                       it: Iterator[K],
-                                       mp: (Iterator[K], Connection) =>
-                                         Iterator[U]): Iterator[U] = {
-
-    val config = getConf(configBroadcast)
-    applyCreds
-
-    val smartConn = HBaseConnectionCache.getConnection(config)
-    val res = mp(it, smartConn.connection)
-    smartConn.close()
-    res
-  }
-
-  /**
-   *  underlining wrapper all get mapPartition functions in HBaseContext
-   */
-  private class GetMapPartition[T, U](tableName: TableName,
-                                      batchSize: Integer,
-                                      makeGet: (T) => Get,
-                                      convertResult: (Result) => U)
-    extends Serializable {
-
-    val tName = tableName.getName
-
-    def run(iterator: Iterator[T], connection: Connection): Iterator[U] = {
-      val table = connection.getTable(TableName.valueOf(tName))
-
-      val gets = new java.util.ArrayList[Get]()
-      var res = List[U]()
-
-      while (iterator.hasNext) {
-        gets.add(makeGet(iterator.next()))
-
-        if (gets.size() == batchSize) {
-          val results = table.get(gets)
-          res = res ++ results.map(convertResult)
-          gets.clear()
-        }
-      }
-      if (gets.size() > 0) {
-        val results = table.get(gets)
-        res = res ++ results.map(convertResult)
-        gets.clear()
-      }
-      table.close()
-      res.iterator
-    }
-  }
-
-  /**
-   * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
-   *
-   * This method is used to keep ClassTags out of the external Java API, as
-   * the Java compiler cannot produce them automatically. While this
-   * ClassTag-faking does please the compiler, it can cause problems at runtime
-   * if the Scala API relies on ClassTags for correctness.
-   *
-   * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior,
-   * just worse performance or security issues.
-   * For instance, an Array of AnyRef can hold any type T, but may lose primitive
-   * specialization.
-   */
-  private[spark]
-  def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
-
-  /**
-   * Spark Implementation of HBase Bulk load for wide rows or when
-   * values are not already combined at the time of the map process
-   *
-   * This will take the content from an existing RDD then sort and shuffle
-   * it with respect to region splits.  The result of that sort and shuffle
-   * will be written to HFiles.
-   *
-   * After this function is executed the user will have to call
-   * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase
-   *
-   * Also note this version of bulk load is different from past versions in
-   * that it includes the qualifier as part of the sort process. The
-   * reason for this is to be able to support rows will very large number
-   * of columns.
-   *
-   * @param rdd                            The RDD we are bulk loading from
-   * @param tableName                      The HBase table we are loading into
-   * @param flatMap                        A flapMap function that will make every
-   *                                       row in the RDD
-   *                                       into N cells for the bulk load
-   * @param stagingDir                     The location on the FileSystem to bulk load into
-   * @param familyHFileWriteOptionsMap     Options that will define how the HFile for a
-   *                                       column family is written
-   * @param compactionExclude              Compaction excluded for the HFiles
-   * @param maxSize                        Max size for the HFiles before they roll
-   * @tparam T                             The Type of values in the original RDD
-   */
-  def bulkLoad[T](rdd:RDD[T],
-                  tableName: TableName,
-                  flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])],
-                  stagingDir:String,
-                  familyHFileWriteOptionsMap:
-                  util.Map[Array[Byte], FamilyHFileWriteOptions] =
-                  new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
-                  compactionExclude: Boolean = false,
-                  maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):
-  Unit = {
-    val stagingPath = new Path(stagingDir)
-    val fs = stagingPath.getFileSystem(config)
-    if (fs.exists(stagingPath)) {
-      throw new FileAlreadyExistsException("Path " + stagingDir + " already exists")
-    }
-    val conn = HBaseConnectionCache.getConnection(config)
-    val regionLocator = conn.getRegionLocator(tableName)
-    val startKeys = regionLocator.getStartKeys
-    if (startKeys.length == 0) {
-      logInfo("Table " + tableName.toString + " was not found")
-    }
-    val defaultCompressionStr = config.get("hfile.compression",
-      Compression.Algorithm.NONE.getName)
-    val hfileCompression = HFileWriterImpl
-      .compressionByName(defaultCompressionStr)
-    val nowTimeStamp = System.currentTimeMillis()
-    val tableRawName = tableName.getName
-
-    val familyHFileWriteOptionsMapInternal =
-      new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
-
-    val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()
-
-    while (entrySetIt.hasNext) {
-      val entry = entrySetIt.next()
-      familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
-    }
-
-    val regionSplitPartitioner =
-      new BulkLoadPartitioner(startKeys)
-
-    //This is where all the magic happens
-    //Here we are going to do the following things
-    // 1. FlapMap every row in the RDD into key column value tuples
-    // 2. Then we are going to repartition sort and shuffle
-    // 3. Finally we are going to write out our HFiles
-    rdd.flatMap( r => flatMap(r)).
-      repartitionAndSortWithinPartitions(regionSplitPartitioner).
-      hbaseForeachPartition(this, (it, conn) => {
-
-      val conf = broadcastedConf.value.value
-      val fs = FileSystem.get(conf)
-      val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
-      var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
-      var rollOverRequested = false
-      val localTableName = TableName.valueOf(tableRawName)
-
-      //Here is where we finally iterate through the data in this partition of the
-      //RDD that has been sorted and partitioned
-      it.foreach{ case (keyFamilyQualifier, cellValue:Array[Byte]) =>
-
-        val wl = writeValueToHFile(keyFamilyQualifier.rowKey,
-          keyFamilyQualifier.family,
-          keyFamilyQualifier.qualifier,
-          cellValue,
-          nowTimeStamp,
-          fs,
-          conn,
-          localTableName,
-          conf,
-          familyHFileWriteOptionsMapInternal,
-          hfileCompression,
-          writerMap,
-          stagingDir)
-
-        rollOverRequested = rollOverRequested || wl.written > maxSize
-
-        //This will only roll if we have at least one column family file that is
-        //bigger then maxSize and we have finished a given row key
-        if (rollOverRequested && Bytes.compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) {
-          rollWriters(fs, writerMap,
-            regionSplitPartitioner,
-            previousRow,
-            compactionExclude)
-          rollOverRequested = false
-        }
-
-        previousRow = keyFamilyQualifier.rowKey
-      }
-      //We have finished all the data so lets close up the writers
-      rollWriters(fs, writerMap,
-        regionSplitPartitioner,
-        previousRow,
-        compactionExclude)
-      rollOverRequested = false
-    })
-  }
-
-  /**
-   * Spark Implementation of HBase Bulk load for short rows some where less then
-   * a 1000 columns.  This bulk load should be faster for tables will thinner
-   * rows then the other spark implementation of bulk load that puts only one
-   * value into a record going into a shuffle
-   *
-   * This will take the content from an existing RDD then sort and shuffle
-   * it with respect to region splits.  The result of that sort and shuffle
-   * will be written to HFiles.
-   *
-   * After this function is executed the user will have to call
-   * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase
-   *
-   * In this implementation, only the rowKey is given to the shuffle as the key
-   * and all the columns are already linked to the RowKey before the shuffle
-   * stage.  The sorting of the qualifier is done in memory out side of the
-   * shuffle stage
-   *
-   * Also make sure that incoming RDDs only have one record for every row key.
-   *
-   * @param rdd                            The RDD we are bulk loading from
-   * @param tableName                      The HBase table we are loading into
-   * @param mapFunction                    A function that will convert the RDD records to
-   *                                       the key value format used for the shuffle to prep
-   *                                       for writing to the bulk loaded HFiles
-   * @param stagingDir                     The location on the FileSystem to bulk load into
-   * @param familyHFileWriteOptionsMap     Options that will define how the HFile for a
-   *                                       column family is written
-   * @param compactionExclude              Compaction excluded for the HFiles
-   * @param maxSize                        Max size for the HFiles before they roll
-   * @tparam T                             The Type of values in the original RDD
-   */
-  def bulkLoadThinRows[T](rdd:RDD[T],
-                  tableName: TableName,
-                  mapFunction: (T) =>
-                    (ByteArrayWrapper, FamiliesQualifiersValues),
-                  stagingDir:String,
-                  familyHFileWriteOptionsMap:
-                  util.Map[Array[Byte], FamilyHFileWriteOptions] =
-                  new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
-                  compactionExclude: Boolean = false,
-                  maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):
-  Unit = {
-    val stagingPath = new Path(stagingDir)
-    val fs = stagingPath.getFileSystem(config)
-    if (fs.exists(stagingPath)) {
-      throw new FileAlreadyExistsException("Path " + stagingDir + " already exists")
-    }
-    val conn = HBaseConnectionCache.getConnection(config)
-    val regionLocator = conn.getRegionLocator(tableName)
-    val startKeys = regionLocator.getStartKeys
-    if (startKeys.length == 0) {
-      logInfo("Table " + tableName.toString + " was not found")
-    }
-    val defaultCompressionStr = config.get("hfile.compression",
-      Compression.Algorithm.NONE.getName)
-    val defaultCompression = HFileWriterImpl
-      .compressionByName(defaultCompressionStr)
-    val nowTimeStamp = System.currentTimeMillis()
-    val tableRawName = tableName.getName
-
-    val familyHFileWriteOptionsMapInternal =
-      new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
-
-    val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()
-
-    while (entrySetIt.hasNext) {
-      val entry = entrySetIt.next()
-      familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
-    }
-
-    val regionSplitPartitioner =
-      new BulkLoadPartitioner(startKeys)
-
-    //This is where all the magic happens
-    //Here we are going to do the following things
-    // 1. FlapMap every row in the RDD into key column value tuples
-    // 2. Then we are going to repartition sort and shuffle
-    // 3. Finally we are going to write out our HFiles
-    rdd.map( r => mapFunction(r)).
-      repartitionAndSortWithinPartitions(regionSplitPartitioner).
-      hbaseForeachPartition(this, (it, conn) => {
-
-      val conf = broadcastedConf.value.value
-      val fs = FileSystem.get(conf)
-      val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
-      var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
-      var rollOverRequested = false
-      val localTableName = TableName.valueOf(tableRawName)
-
-      //Here is where we finally iterate through the data in this partition of the
-      //RDD that has been sorted and partitioned
-      it.foreach{ case (rowKey:ByteArrayWrapper,
-      familiesQualifiersValues:FamiliesQualifiersValues) =>
-
-
-        if (Bytes.compareTo(previousRow, rowKey.value) == 0) {
-          throw new KeyAlreadyExistsException("The following key was sent to the " +
-            "HFile load more then one: " + Bytes.toString(previousRow))
-        }
-
-        //The family map is a tree map so the families will be sorted
-        val familyIt = familiesQualifiersValues.familyMap.entrySet().iterator()
-        while (familyIt.hasNext) {
-          val familyEntry = familyIt.next()
-
-          val family = familyEntry.getKey.value
-
-          val qualifierIt = familyEntry.getValue.entrySet().iterator()
-
-          //The qualifier map is a tree map so the families will be sorted
-          while (qualifierIt.hasNext) {
-
-            val qualifierEntry = qualifierIt.next()
-            val qualifier = qualifierEntry.getKey
-            val cellValue = qualifierEntry.getValue
-
-            writeValueToHFile(rowKey.value,
-              family,
-              qualifier.value, // qualifier
-              cellValue, // value
-              nowTimeStamp,
-              fs,
-              conn,
-              localTableName,
-              conf,
-              familyHFileWriteOptionsMapInternal,
-              defaultCompression,
-              writerMap,
-              stagingDir)
-
-            previousRow = rowKey.value
-          }
-
-          writerMap.values.foreach( wl => {
-            rollOverRequested = rollOverRequested || wl.written > maxSize
-
-            //This will only roll if we have at least one column family file that is
-            //bigger then maxSize and we have finished a given row key
-            if (rollOverRequested) {
-              rollWriters(fs, writerMap,
-                regionSplitPartitioner,
-                previousRow,
-                compactionExclude)
-              rollOverRequested = false
-            }
-          })
-        }
-      }
-
-      //This will get a writer for the column family
-      //If there is no writer for a given column family then
-      //it will get created here.
-      //We have finished all the data so lets close up the writers
-      rollWriters(fs, writerMap,
-        regionSplitPartitioner,
-        previousRow,
-        compactionExclude)
-      rollOverRequested = false
-    })
-  }
-
-  /**
-   *  This will return a new HFile writer when requested
-   *
-   * @param family       column family
-   * @param conf         configuration to connect to HBase
-   * @param favoredNodes nodes that we would like to write too
-   * @param fs           FileSystem object where we will be writing the HFiles to
-   * @return WriterLength object
-   */
-  private def getNewHFileWriter(family: Array[Byte], conf: Configuration,
-                   favoredNodes: Array[InetSocketAddress],
-                   fs:FileSystem,
-                   familydir:Path,
-                   familyHFileWriteOptionsMapInternal:
-                   util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions],
-                   defaultCompression:Compression.Algorithm): WriterLength = {
-
-
-    var familyOptions = familyHFileWriteOptionsMapInternal.get(new ByteArrayWrapper(family))
-
-    if (familyOptions == null) {
-      familyOptions = new FamilyHFileWriteOptions(defaultCompression.toString,
-        BloomType.NONE.toString, HConstants.DEFAULT_BLOCKSIZE, DataBlockEncoding.NONE.toString)
-      familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(family), familyOptions)
-    }
-
-    val tempConf = new Configuration(conf)
-    tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f)
-    val contextBuilder = new HFileContextBuilder()
-      .withCompression(Algorithm.valueOf(familyOptions.compression))
-      .withChecksumType(HStore.getChecksumType(conf))
-      .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
-      .withBlockSize(familyOptions.blockSize)
-
-    if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
-      contextBuilder.withIncludesTags(true)
-    }
-
-    contextBuilder.withDataBlockEncoding(DataBlockEncoding.
-      valueOf(familyOptions.dataBlockEncoding))
-    val hFileContext = contextBuilder.build()
-
-    //Add a '_' to the file name because this is a unfinished file.  A rename will happen
-    // to remove the '_' when the file is closed.
-    new WriterLength(0,
-      new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
-        .withBloomType(BloomType.valueOf(familyOptions.bloomType))
-        .withComparator(CellComparator.getInstance()).withFileContext(hFileContext)
-        .withFilePath(new Path(familydir, "_" + UUID.randomUUID.toString.replaceAll("-", "")))
-        .withFavoredNodes(favoredNodes).build())
-
-  }
-
-  /**
-   * Encompasses the logic to write a value to an HFile
-   *
-   * @param rowKey                             The RowKey for the record
-   * @param family                             HBase column family for the record
-   * @param qualifier                          HBase column qualifier for the record
-   * @param cellValue                          HBase cell value
-   * @param nowTimeStamp                       The cell time stamp
-   * @param fs                                 Connection to the FileSystem for the HFile
-   * @param conn                               Connection to HBaes
-   * @param tableName                          HBase TableName object
-   * @param conf                               Configuration to be used when making a new HFile
-   * @param familyHFileWriteOptionsMapInternal Extra configs for the HFile
-   * @param hfileCompression                   The compression codec for the new HFile
-   * @param writerMap                          HashMap of existing writers and their offsets
-   * @param stagingDir                         The staging directory on the FileSystem to store
-   *                                           the HFiles
-   * @return                                   The writer for the given HFile that was writen
-   *                                           too
-   */
-  private def writeValueToHFile(rowKey: Array[Byte],
-                        family: Array[Byte],
-                        qualifier: Array[Byte],
-                        cellValue:Array[Byte],
-                        nowTimeStamp: Long,
-                        fs: FileSystem,
-                        conn: Connection,
-                        tableName: TableName,
-                        conf: Configuration,
-                        familyHFileWriteOptionsMapInternal:
-                        util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions],
-                        hfileCompression:Compression.Algorithm,
-                        writerMap:mutable.HashMap[ByteArrayWrapper, WriterLength],
-                        stagingDir: String
-                         ): WriterLength = {
-
-    val wl = writerMap.getOrElseUpdate(new ByteArrayWrapper(family), {
-      val familyDir = new Path(stagingDir, Bytes.toString(family))
-
-      fs.mkdirs(familyDir)
-
-      val loc:HRegionLocation = {
-        try {
-          val locator =
-            conn.getRegionLocator(tableName)
-          locator.getRegionLocation(rowKey)
-        } catch {
-          case e: Throwable =>
-            logWarning("there's something wrong when locating rowkey: " +
-              Bytes.toString(rowKey))
-            null
-        }
-      }
-      if (null == loc) {
-        if (log.isTraceEnabled) {
-          logTrace("failed to get region location, so use default writer: " +
-            Bytes.toString(rowKey))
-        }
-        getNewHFileWriter(family = family,
-          conf = conf,
-          favoredNodes = null,
-          fs = fs,
-          familydir = familyDir,
-          familyHFileWriteOptionsMapInternal,
-          hfileCompression)
-      } else {
-        if (log.isDebugEnabled) {
-          logDebug("first rowkey: [" + Bytes.toString(rowKey) + "]")
-        }
-        val initialIsa =
-          new InetSocketAddress(loc.getHostname, loc.getPort)
-        if (initialIsa.isUnresolved) {
-          if (log.isTraceEnabled) {
-            logTrace("failed to resolve bind address: " + loc.getHostname + ":"
-              + loc.getPort + ", so use default writer")
-          }
-          getNewHFileWriter(family,
-            conf,
-            null,
-            fs,
-            familyDir,
-            familyHFileWriteOptionsMapInternal,
-            hfileCompression)
-        } else {
-          if(log.isDebugEnabled) {
-            logDebug("use favored nodes writer: " + initialIsa.getHostString)
-          }
-          getNewHFileWriter(family,
-            conf,
-            Array[InetSocketAddress](initialIsa),
-            fs,
-            familyDir,
-            familyHFileWriteOptionsMapInternal,
-            hfileCompression)
-        }
-      }
-    })
-
-    val keyValue =new KeyValue(rowKey,
-      family,
-      qualifier,
-      nowTimeStamp,cellValue)
-
-    wl.writer.append(keyValue)
-    wl.written += keyValue.getLength
-
-    wl
-  }
-
-  /**
-   * This will roll all Writers
-   * @param fs                     Hadoop FileSystem object
-   * @param writerMap              HashMap that contains all the writers
-   * @param regionSplitPartitioner The partitioner with knowledge of how the
-   *                               Region's are split by row key
-   * @param previousRow            The last row to fill the HFile ending range metadata
-   * @param compactionExclude      The exclude compaction metadata flag for the HFile
-   */
-  private def rollWriters(fs:FileSystem,
-                          writerMap:mutable.HashMap[ByteArrayWrapper, WriterLength],
-                  regionSplitPartitioner: BulkLoadPartitioner,
-                  previousRow: Array[Byte],
-                  compactionExclude: Boolean): Unit = {
-    writerMap.values.foreach( wl => {
-      if (wl.writer != null) {
-        logDebug("Writer=" + wl.writer.getPath +
-          (if (wl.written == 0) "" else ", wrote=" + wl.written))
-        closeHFileWriter(fs, wl.writer,
-          regionSplitPartitioner,
-          previousRow,
-          compactionExclude)
-      }
-    })
-    writerMap.clear()
-
-  }
-
-  /**
-   * Function to close an HFile
-   * @param fs                     Hadoop FileSystem object
-   * @param w                      HFile Writer
-   * @param regionSplitPartitioner The partitioner with knowledge of how the
-   *                               Region's are split by row key
-   * @param previousRow            The last row to fill the HFile ending range metadata
-   * @param compactionExclude      The exclude compaction metadata flag for the HFile
-   */
-  private def closeHFileWriter(fs:FileSystem,
-                               w: StoreFileWriter,
-                               regionSplitPartitioner: BulkLoadPartitioner,
-                               previousRow: Array[Byte],
-                               compactionExclude: Boolean): Unit = {
-    if (w != null) {
-      w.appendFileInfo(HStoreFile.BULKLOAD_TIME_KEY,
-        Bytes.toBytes(System.currentTimeMillis()))
-      w.appendFileInfo(HStoreFile.BULKLOAD_TASK_KEY,
-        Bytes.toBytes(regionSplitPartitioner.getPartition(previousRow)))
-      w.appendFileInfo(HStoreFile.MAJOR_COMPACTION_KEY,
-        Bytes.toBytes(true))
-      w.appendFileInfo(HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
-        Bytes.toBytes(compactionExclude))
-      w.appendTrackedTimestampsToMetadata()
-      w.close()
-
-      val srcPath = w.getPath
-
-      //In the new path you will see that we are using substring.  This is to
-      // remove the '_' character in front of the HFile name.  '_' is a character
-      // that will tell HBase that this file shouldn't be included in the bulk load
-      // This feature is to protect for unfinished HFiles being submitted to HBase
-      val newPath = new Path(w.getPath.getParent, w.getPath.getName.substring(1))
-      if (!fs.rename(srcPath, newPath)) {
-        throw new IOException("Unable to rename '" + srcPath +
-          "' to " + newPath)
-      }
-    }
-  }
-
-  /**
-   * This is a wrapper class around StoreFileWriter.  The reason for the
-   * wrapper is to keep the length of the file along side the writer
-   *
-   * @param written The writer to be wrapped
-   * @param writer  The number of bytes written to the writer
-   */
-  class WriterLength(var written:Long, val writer:StoreFileWriter)
-}
-
-object LatestHBaseContextCache {
-  var latest:HBaseContext = null
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala
deleted file mode 100644
index 4edde44..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.spark
-
-import org.apache.hadoop.hbase.TableName
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.client._
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable
-import org.apache.spark.streaming.dstream.DStream
-
-import scala.reflect.ClassTag
-
-/**
- * HBaseDStreamFunctions contains a set of implicit functions that can be
- * applied to a Spark DStream so that we can easily interact with HBase
- */
-@InterfaceAudience.Public
-object HBaseDStreamFunctions {
-
-  /**
-   * These are implicit methods for a DStream that contains any type of
-   * data.
-   *
-   * @param dStream  This is for dStreams of any type
-   * @tparam T       Type T
-   */
-  implicit class GenericHBaseDStreamFunctions[T](val dStream: DStream[T]) {
-
-    /**
-     * Implicit method that gives easy access to HBaseContext's bulk
-     * put.  This will not return a new Stream.  Think of it like a foreach
-     *
-     * @param hc         The hbaseContext object to identify which
-     *                   HBase cluster connection to use
-     * @param tableName  The tableName that the put will be sent to
-     * @param f          The function that will turn the DStream values
-     *                   into HBase Put objects.
-     */
-    def hbaseBulkPut(hc: HBaseContext,
-                     tableName: TableName,
-                     f: (T) => Put): Unit = {
-      hc.streamBulkPut(dStream, tableName, f)
-    }
-
-    /**
-     * Implicit method that gives easy access to HBaseContext's bulk
-     * get.  This will return a new DStream.  Think about it as a DStream map
-     * function.  In that every DStream value will get a new value out of
-     * HBase.  That new value will populate the newly generated DStream.
-     *
-     * @param hc             The hbaseContext object to identify which
-     *                       HBase cluster connection to use
-     * @param tableName      The tableName that the put will be sent to
-     * @param batchSize      How many gets to execute in a single batch
-     * @param f              The function that will turn the RDD values
-     *                       in HBase Get objects
-     * @param convertResult  The function that will convert a HBase
-     *                       Result object into a value that will go
-     *                       into the resulting DStream
-     * @tparam R             The type of Object that will be coming
-     *                       out of the resulting DStream
-     * @return               A resulting DStream with type R objects
-     */
-    def hbaseBulkGet[R: ClassTag](hc: HBaseContext,
-                     tableName: TableName,
-                     batchSize:Int, f: (T) => Get, convertResult: (Result) => R):
-    DStream[R] = {
-      hc.streamBulkGet[T, R](tableName, batchSize, dStream, f, convertResult)
-    }
-
-    /**
-     * Implicit method that gives easy access to HBaseContext's bulk
-     * get.  This will return a new DStream.  Think about it as a DStream map
-     * function.  In that every DStream value will get a new value out of
-     * HBase.  That new value will populate the newly generated DStream.
-     *
-     * @param hc             The hbaseContext object to identify which
-     *                       HBase cluster connection to use
-     * @param tableName      The tableName that the put will be sent to
-     * @param batchSize      How many gets to execute in a single batch
-     * @param f              The function that will turn the RDD values
-     *                       in HBase Get objects
-     * @return               A resulting DStream with type R objects
-     */
-    def hbaseBulkGet(hc: HBaseContext,
-                     tableName: TableName, batchSize:Int,
-                     f: (T) => Get): DStream[(ImmutableBytesWritable, Result)] = {
-        hc.streamBulkGet[T, (ImmutableBytesWritable, Result)](
-          tableName, batchSize, dStream, f,
-          result => (new ImmutableBytesWritable(result.getRow), result))
-    }
-
-    /**
-     * Implicit method that gives easy access to HBaseContext's bulk
-     * Delete.  This will not return a new DStream.
-     *
-     * @param hc         The hbaseContext object to identify which HBase
-     *                   cluster connection to use
-     * @param tableName  The tableName that the deletes will be sent to
-     * @param f          The function that will convert the DStream value into
-     *                   a HBase Delete Object
-     * @param batchSize  The number of Deletes to be sent in a single batch
-     */
-    def hbaseBulkDelete(hc: HBaseContext,
-                        tableName: TableName,
-                        f:(T) => Delete, batchSize:Int): Unit = {
-      hc.streamBulkDelete(dStream, tableName, f, batchSize)
-    }
-
-    /**
-     * Implicit method that gives easy access to HBaseContext's
-     * foreachPartition method.  This will ack very much like a normal DStream
-     * foreach method but for the fact that you will now have a HBase connection
-     * while iterating through the values.
-     *
-     * @param hc  The hbaseContext object to identify which HBase
-     *            cluster connection to use
-     * @param f   This function will get an iterator for a Partition of an
-     *            DStream along with a connection object to HBase
-     */
-    def hbaseForeachPartition(hc: HBaseContext,
-                              f: (Iterator[T], Connection) => Unit): Unit = {
-      hc.streamForeachPartition(dStream, f)
-    }
-
-    /**
-     * Implicit method that gives easy access to HBaseContext's
-     * mapPartitions method.  This will ask very much like a normal DStream
-     * map partitions method but for the fact that you will now have a
-     * HBase connection while iterating through the values
-     *
-     * @param hc  The hbaseContext object to identify which HBase
-     *            cluster connection to use
-     * @param f   This function will get an iterator for a Partition of an
-     *            DStream along with a connection object to HBase
-     * @tparam R  This is the type of objects that will go into the resulting
-     *            DStream
-     * @return    A resulting DStream of type R
-     */
-    def hbaseMapPartitions[R: ClassTag](hc: HBaseContext,
-                                        f: (Iterator[T], Connection) => Iterator[R]):
-    DStream[R] = {
-      hc.streamMapPartitions(dStream, f)
-    }
-  }
-}