You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/09/28 04:39:33 UTC
incubator-gearpump git commit: [GEARPUMP-204]add unit test for
external_hbase module
Repository: incubator-gearpump
Updated Branches:
refs/heads/master 71a4e9b8e -> a034e742c
[GEARPUMP-204]add unit test for external_hbase module
[GEARPUMP-204]add unit test for external_hbase module
Author: roshanson <doyouta123>
Author: Roshanson <73...@qq.com>
Author: Roshanson <doyouta123>
Author: 736781877@qq.com <doyouta123>
Closes #86 from Roshanson/fix-addHBaseUT.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/a034e742
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/a034e742
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/a034e742
Branch: refs/heads/master
Commit: a034e742c4fcd0811bcef86c6eb7f133c0837a0d
Parents: 71a4e9b
Author: roshanson <doyouta123>
Authored: Wed Sep 28 12:20:28 2016 +0800
Committer: huafengw <fv...@gmail.com>
Committed: Wed Sep 28 12:20:28 2016 +0800
----------------------------------------------------------------------
.../gearpump/external/hbase/HBaseSink.scala | 38 ++++++++-------
.../gearpump/external/hbase/HBaseSinkSpec.scala | 50 +++++++++++++++-----
2 files changed, 59 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a034e742/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
----------------------------------------------------------------------
diff --git a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
index e4d5633..f85c43b 100644
--- a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
+++ b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
@@ -19,27 +19,32 @@ package org.apache.gearpump.external.hbase
import java.io.{File, ObjectInputStream, ObjectOutputStream}
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.util.{Constants, FileUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put}
+import org.apache.hadoop.hbase.security.UserProvider
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
-import org.apache.hadoop.hbase.security.{User, UserProvider}
import org.apache.hadoop.security.UserGroupInformation
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.sink.DataSink
-import org.apache.gearpump.streaming.task.TaskContext
-import org.apache.gearpump.util.{Constants, FileUtils}
+class HBaseSink(userconfig: UserConfig, tableName: String,
+ val conn: (UserConfig, Configuration)
+ => Connection, @transient var configuration: Configuration)
+ extends DataSink {
-class HBaseSink(
- userconfig: UserConfig, tableName: String, @transient var configuration: Configuration)
- extends DataSink{
- lazy val connection = HBaseSink.getConnection(userconfig, configuration)
+ lazy val connection = conn(userconfig, configuration)
lazy val table = connection.getTable(TableName.valueOf(tableName))
override def open(context: TaskContext): Unit = {}
+ def this(userconfig: UserConfig, tableName: String, configuration: Configuration) = {
+ this(userconfig, tableName, HBaseSink.getConnection, configuration)
+ }
+
def this(userconfig: UserConfig, tableName: String) = {
this(userconfig, tableName, HBaseConfiguration.create())
}
@@ -51,7 +56,7 @@ class HBaseSink(
def insert(
rowKey: Array[Byte], columnGroup: Array[Byte], columnName: Array[Byte], value: Array[Byte])
- : Unit = {
+ : Unit = {
val put = new Put(rowKey)
put.addColumn(columnGroup, columnName, value)
table.put(put)
@@ -115,15 +120,16 @@ object HBaseSink {
val COLUMN_NAME = "hbase.table.column.name"
val HBASE_USER = "hbase.user"
- def apply[T](userconfig: UserConfig, tableName: String): HBaseSink = {
- new HBaseSink(userconfig, tableName)
- }
-
def apply[T](userconfig: UserConfig, tableName: String, configuration: Configuration)
- : HBaseSink = {
+ : HBaseSink = {
new HBaseSink(userconfig, tableName, configuration)
}
+ def apply[T](userconfig: UserConfig, tableName: String)
+ : HBaseSink = {
+ new HBaseSink(userconfig, tableName)
+ }
+
private def getConnection(userConfig: UserConfig, configuration: Configuration): Connection = {
if (UserGroupInformation.isSecurityEnabled) {
val principal = userConfig.getString(Constants.GEARPUMP_KERBEROS_PRINCIPAL)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a034e742/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala
----------------------------------------------------------------------
diff --git a/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala b/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala
index 24b9646..62da2b1 100644
--- a/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala
+++ b/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala
@@ -17,25 +17,49 @@
*/
package org.apache.gearpump.external.hbase
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.TableName
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.util.Bytes
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
import org.scalatest.prop.PropertyChecks
import org.scalatest.{Matchers, PropSpec}
-class HBaseSinkSpec extends PropSpec with PropertyChecks with Matchers {
+class HBaseSinkSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
property("HBaseSink should insert a row successfully") {
- // import Mockito._
- // val htable = Mockito.mock(classOf[HTable])
- // val row = "row"
- // val group = "group"
- // val name = "name"
- // val value = "1.2"
- // val put = new Put(Bytes.toBytes(row))
- // put.add(Bytes.toBytes(group), Bytes.toBytes(name), Bytes.toBytes(value))
- // val hbaseSink = HBaseSink(htable)
- // hbaseSink.insert(put)
- // verify(htable).put(put)
+ val table = mock[Table]
+ val config = mock[Configuration]
+ val connection = mock[Connection]
+ val taskContext = mock[TaskContext]
+ val map = Map[String, String]("HBASESINK" -> "hbasesink", "TABLE_NAME" -> "hbase.table.name",
+ "COLUMN_FAMILY" -> "hbase.table.column.family", "COLUMN_NAME" -> "hbase.table.column.name",
+ "HBASE_USER" -> "hbase.user", "GEARPUMP_KERBEROS_PRINCIPAL" -> "gearpump.kerberos.principal",
+ "GEARPUMP_KEYTAB_FILE" -> "gearpump.keytab.file"
+ )
+ val userConfig = new UserConfig(map)
+ val tableName = "hbase"
+ val row = "row"
+ val group = "group"
+ val name = "name"
+ val value = "3.0"
+
+ when(connection.getTable(TableName.valueOf(tableName))).thenReturn(table)
+
+ val put = new Put(Bytes.toBytes(row))
+ put.addColumn(Bytes.toBytes(group), Bytes.toBytes(name), Bytes.toBytes(value))
+ val hbaseSink = new HBaseSink(userConfig, tableName, (userConfig, config)
+ => connection, config)
+ hbaseSink.open(taskContext)
+ hbaseSink.insert(Bytes.toBytes(row), Bytes.toBytes(group), Bytes.toBytes(name),
+ Bytes.toBytes(value))
+
+ verify(table).put(MockUtil.argMatch[Put](_.getRow sameElements Bytes.toBytes(row)))
}
}
-