You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/09/28 04:39:33 UTC

incubator-gearpump git commit: [GEARPUMP-204]add unit test for external_hbase module

Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 71a4e9b8e -> a034e742c


	[GEARPUMP-204]add unit test for external_hbase module

	[GEARPUMP-204]add unit test for external_hbase module

Author: roshanson <doyouta123>
Author: Roshanson <73...@qq.com>
Author: Roshanson <doyouta123>
Author: 736781877@qq.com <doyouta123>

Closes #86 from Roshanson/fix-addHBaseUT.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/a034e742
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/a034e742
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/a034e742

Branch: refs/heads/master
Commit: a034e742c4fcd0811bcef86c6eb7f133c0837a0d
Parents: 71a4e9b
Author: roshanson <doyouta123>
Authored: Wed Sep 28 12:20:28 2016 +0800
Committer: huafengw <fv...@gmail.com>
Committed: Wed Sep 28 12:20:28 2016 +0800

----------------------------------------------------------------------
 .../gearpump/external/hbase/HBaseSink.scala     | 38 ++++++++-------
 .../gearpump/external/hbase/HBaseSinkSpec.scala | 50 +++++++++++++++-----
 2 files changed, 59 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a034e742/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
----------------------------------------------------------------------
diff --git a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
index e4d5633..f85c43b 100644
--- a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
+++ b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
@@ -19,27 +19,32 @@ package org.apache.gearpump.external.hbase
 
 import java.io.{File, ObjectInputStream, ObjectOutputStream}
 
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.util.{Constants, FileUtils}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put}
+import org.apache.hadoop.hbase.security.UserProvider
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
-import org.apache.hadoop.hbase.security.{User, UserProvider}
 import org.apache.hadoop.security.UserGroupInformation
 
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.sink.DataSink
-import org.apache.gearpump.streaming.task.TaskContext
-import org.apache.gearpump.util.{Constants, FileUtils}
+class HBaseSink(userconfig: UserConfig, tableName: String,
+    val conn: (UserConfig, Configuration)
+    => Connection, @transient var configuration: Configuration)
+  extends DataSink {
 
-class HBaseSink(
-    userconfig: UserConfig, tableName: String, @transient var configuration: Configuration)
-  extends DataSink{
-  lazy val connection = HBaseSink.getConnection(userconfig, configuration)
+  lazy val connection = conn(userconfig, configuration)
   lazy val table = connection.getTable(TableName.valueOf(tableName))
 
   override def open(context: TaskContext): Unit = {}
 
+  def this(userconfig: UserConfig, tableName: String, configuration: Configuration) = {
+    this(userconfig, tableName, HBaseSink.getConnection, configuration)
+  }
+
   def this(userconfig: UserConfig, tableName: String) = {
     this(userconfig, tableName, HBaseConfiguration.create())
   }
@@ -51,7 +56,7 @@ class HBaseSink(
 
   def insert(
       rowKey: Array[Byte], columnGroup: Array[Byte], columnName: Array[Byte], value: Array[Byte])
-    : Unit = {
+  : Unit = {
     val put = new Put(rowKey)
     put.addColumn(columnGroup, columnName, value)
     table.put(put)
@@ -115,15 +120,16 @@ object HBaseSink {
   val COLUMN_NAME = "hbase.table.column.name"
   val HBASE_USER = "hbase.user"
 
-  def apply[T](userconfig: UserConfig, tableName: String): HBaseSink = {
-    new HBaseSink(userconfig, tableName)
-  }
-
   def apply[T](userconfig: UserConfig, tableName: String, configuration: Configuration)
-    : HBaseSink = {
+  : HBaseSink = {
     new HBaseSink(userconfig, tableName, configuration)
   }
 
+  def apply[T](userconfig: UserConfig, tableName: String)
+  : HBaseSink = {
+    new HBaseSink(userconfig, tableName)
+  }
+
   private def getConnection(userConfig: UserConfig, configuration: Configuration): Connection = {
     if (UserGroupInformation.isSecurityEnabled) {
       val principal = userConfig.getString(Constants.GEARPUMP_KERBEROS_PRINCIPAL)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a034e742/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala
----------------------------------------------------------------------
diff --git a/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala b/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala
index 24b9646..62da2b1 100644
--- a/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala
+++ b/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala
@@ -17,25 +17,49 @@
  */
 package org.apache.gearpump.external.hbase
 
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.TableName
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.util.Bytes
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
 
-class HBaseSinkSpec extends PropSpec with PropertyChecks with Matchers {
+class HBaseSinkSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
 
   property("HBaseSink should insert a row successfully") {
 
-  //  import Mockito._
-  //  val htable = Mockito.mock(classOf[HTable])
-  //  val row = "row"
-  //  val group = "group"
-  //  val name = "name"
-  //  val value = "1.2"
-  //  val put = new Put(Bytes.toBytes(row))
-  //  put.add(Bytes.toBytes(group), Bytes.toBytes(name), Bytes.toBytes(value))
-  //  val hbaseSink = HBaseSink(htable)
-  //  hbaseSink.insert(put)
-  //  verify(htable).put(put)
+    val table = mock[Table]
+    val config = mock[Configuration]
+    val connection = mock[Connection]
+    val taskContext = mock[TaskContext]
 
+    val map = Map[String, String]("HBASESINK" -> "hbasesink", "TABLE_NAME" -> "hbase.table.name",
+      "COLUMN_FAMILY" -> "hbase.table.column.family", "COLUMN_NAME" -> "hbase.table.column.name",
+      "HBASE_USER" -> "hbase.user", "GEARPUMP_KERBEROS_PRINCIPAL" -> "gearpump.kerberos.principal",
+      "GEARPUMP_KEYTAB_FILE" -> "gearpump.keytab.file"
+    )
+    val userConfig = new UserConfig(map)
+    val tableName = "hbase"
+    val row = "row"
+    val group = "group"
+    val name = "name"
+    val value = "3.0"
+
+    when(connection.getTable(TableName.valueOf(tableName))).thenReturn(table)
+
+    val put = new Put(Bytes.toBytes(row))
+    put.addColumn(Bytes.toBytes(group), Bytes.toBytes(name), Bytes.toBytes(value))
+    val hbaseSink = new HBaseSink(userConfig, tableName, (userConfig, config)
+    => connection, config)
+    hbaseSink.open(taskContext)
+    hbaseSink.insert(Bytes.toBytes(row), Bytes.toBytes(group), Bytes.toBytes(name),
+      Bytes.toBytes(value))
+
+    verify(table).put(MockUtil.argMatch[Put](_.getRow sameElements Bytes.toBytes(row)))
   }
 }
-