You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/10/10 22:10:53 UTC
incubator-gearpump git commit: GEARPUMP-215 Gearpump Redis
Integration - RedisSink
Repository: incubator-gearpump
Updated Branches:
refs/heads/master 9b879456d -> 1b9889121
GEARPUMP-215 Gearpump Redis Integration - RedisSink
[Gearpump Redis Integration - RedisStorage](https://issues.apache.org/jira/browse/GEARPUMP-215)
Author: darionyaphet <da...@gmail.com>
Closes #93 from darionyaphet/GEARPUMP-215.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/1b988912
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/1b988912
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/1b988912
Branch: refs/heads/master
Commit: 1b9889121c0d776c9423544b76e27292838d1300
Parents: 9b87945
Author: darionyaphet <da...@gmail.com>
Authored: Tue Oct 11 06:10:33 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Tue Oct 11 06:10:33 2016 +0800
----------------------------------------------------------------------
.../apache/gearpump/redis/RedisMessage.scala | 456 +++++++++++++++++++
.../org/apache/gearpump/redis/RedisSink.scala | 119 +++++
project/Build.scala | 12 +
3 files changed, 587 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1b988912/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala
----------------------------------------------------------------------
diff --git a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala
new file mode 100644
index 0000000..84dec70
--- /dev/null
+++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.redis
+
+import java.nio.charset.Charset
+
+object RedisMessage {
+
+ private def toBytes(strings: List[String]): List[Array[Byte]] =
+ strings.map(string => string.getBytes(Charset.forName("UTF8")))
+
+ private def toBytes(string: String): Array[Byte] =
+ string.getBytes(Charset.forName("UTF8"))
+
+ object Connection {
+
+ /**
+ * Change the selected database for the current connection
+ *
+ * @param index
+ */
+ case class SELECT(index: Int)
+
+ }
+
+ object Geo {
+
+ /**
+ * Add one geospatial item in the geospatial index represented using a sorted set
+ *
+ * @param key
+ * @param longitude
+ * @param latitude
+ * @param member
+ */
+ case class GEOADD(key: Array[Byte], longitude: Double,
+ latitude: Double, member: Array[Byte]) {
+ def this(key: String, longitude: Double,
+ latitude: Double, member: String) =
+ this(toBytes(key), longitude, latitude, toBytes(member))
+ }
+
+ }
+
+ object Hashes {
+
+ /**
+ * Delete a hash field
+ *
+ * @param key
+ * @param field
+ */
+ case class HDEL(key: Array[Byte], field: Array[Byte]) {
+ def this(key: String, field: String) = this(toBytes(key), toBytes(field))
+ }
+
+ /**
+ * Increment the integer value of a hash field by the given number
+ *
+ * @param key
+ * @param field
+ * @param increment
+ */
+ case class HINCRBY(key: Array[Byte], field: Array[Byte], increment: Long) {
+ def this(key: String, field: String, increment: Long) =
+ this(toBytes(key), toBytes(field), increment)
+ }
+
+ /**
+ * Increment the float value of a hash field by the given amount
+ *
+ * @param key
+ * @param field
+ * @param increment
+ */
+ case class HINCRBYFLOAT(key: Array[Byte], field: Array[Byte], increment: Float) {
+ def this(key: String, field: String, increment: Float) =
+ this(toBytes(key), toBytes(field), increment)
+ }
+
+
+ /**
+ * Set the string value of a hash field
+ *
+ * @param key
+ * @param field
+ * @param value
+ */
+ case class HSET(key: Array[Byte], field: Array[Byte], value: Array[Byte]) {
+ def this(key: String, field: String, value: String) =
+ this(toBytes(key), toBytes(field), toBytes(value))
+ }
+
+ /**
+ * Set the value of a hash field, only if the field does not exist
+ *
+ * @param key
+ * @param field
+ * @param value
+ */
+ case class HSETNX(key: Array[Byte], field: Array[Byte], value: Array[Byte]) {
+ def this(key: String, field: String, value: String) =
+ this(toBytes(key), toBytes(field), toBytes(value))
+ }
+
+ }
+
+ object HyperLogLog {
+
+ /**
+ * Adds the specified elements to the specified HyperLogLog
+ *
+ * @param key
+ * @param element
+ */
+ case class PFADD(key: String, element: String)
+
+ }
+
+ object Lists {
+
+
+ /**
+ * Prepend one or multiple values to a list
+ *
+ * @param key
+ * @param value
+ */
+ case class LPUSH(key: Array[Byte], value: Array[Byte]) {
+
+ def this(key: String, value: String) = this(key, toBytes(value))
+ }
+
+ /**
+ * Prepend a value to a list, only if the list exists
+ *
+ * @param key
+ * @param value
+ */
+ case class LPUSHX(key: Array[Byte], value: Array[Byte]) {
+ def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+ }
+
+ /**
+ * Set the value of an element in a list by its index
+ *
+ * @param key
+ * @param index
+ * @param value
+ */
+ case class LSET(key: Array[Byte], index: Long, value: Array[Byte]) {
+ def this(key: String, index: Long, value: String) = this(toBytes(key), index, toBytes(value))
+ }
+
+ /**
+ * Append one or multiple values to a list
+ *
+ * @param key
+ * @param value
+ */
+ case class RPUSH(key: Array[Byte], value: Array[Byte]) {
+
+ def this(key: String, value: String) = this(key, toBytes(value))
+ }
+
+ /**
+ * Append a value to a list, only if the list exists
+ *
+ * @param key
+ * @param value
+ */
+ case class RPUSHX(key: Array[Byte], value: Array[Byte]) {
+ def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+ }
+
+ }
+
+ object Keys {
+
+ /**
+ * Delete a key
+ *
+ * @param message
+ */
+ case class DEL(message: Array[Byte]) {
+
+ def this(message: String) = this(toBytes(message))
+ }
+
+ /**
+ * Set a key's time to live in seconds
+ *
+ * @param key
+ */
+ case class EXPIRE(key: Array[Byte], seconds: Int) {
+ def this(key: String, seconds: Int) = this(toBytes(key), seconds)
+ }
+
+ /**
+ * Set the expiration for a key as a UNIX timestamp
+ *
+ * @param key
+ * @param timestamp
+ */
+ case class EXPIREAT(key: Array[Byte], timestamp: Long) {
+ def this(key: String, timestamp: Long) = this(toBytes(key), timestamp)
+ }
+
+ /**
+ * Atomically transfer a key from a Redis instance to another one.
+ *
+ * @param host
+ * @param port
+ * @param key
+ * @param database
+ * @param timeout
+ */
+ case class MIGRATE(host: Array[Byte], port: Int, key: Array[Byte], database: Int, timeout: Int) {
+ def this(host: String, port: Int, key: String, database: Int, timeout: Int) =
+ this(toBytes(host), port, toBytes(key), database, timeout)
+ }
+
+ /**
+ * Move a key to another database
+ *
+ * @param key
+ * @param db
+ */
+ case class MOVE(key: Array[Byte], db: Int) {
+ def this(key: String, db: Int) = this(toBytes(key), db)
+ }
+
+ /**
+ * Remove the expiration from a key
+ *
+ * @param key
+ */
+ case class PERSIST(key: Array[Byte]) {
+ def this(key: String) = this(toBytes(key))
+ }
+
+ /**
+ * Set a key's time to live in milliseconds
+ *
+ * @param key
+ * @param milliseconds
+ */
+ case class PEXPIRE(key: Array[Byte], milliseconds: Long) {
+ def this(key: String, milliseconds: Long) = this(toBytes(key), milliseconds)
+ }
+
+ /**
+ * Set the expiration for a key as a UNIX timestamp specified in milliseconds
+ *
+ * @param key
+ * @param timestamp
+ */
+ case class PEXPIREAT(key: Array[Byte], timestamp: Long) {
+ def this(key: String, milliseconds: Long) = this(toBytes(key), milliseconds)
+ }
+
+ /**
+ * Rename a key
+ *
+ * @param key
+ * @param newKey
+ */
+ case class RENAME(key: Array[Byte], newKey: Array[Byte]) {
+ def this(key: String, newKey: String) = this(toBytes(key), toBytes(newKey))
+ }
+
+ /**
+ * Rename a key, only if the new key does not exist
+ *
+ * @param key
+ * @param newKey
+ */
+ case class RENAMENX(key: Array[Byte], newKey: Array[Byte]) {
+ def this(key: String, newKey: String) = this(toBytes(key), toBytes(newKey))
+ }
+
+ }
+
+
+ object Sets {
+
+ /**
+ * Add one or more members to a set
+ *
+ * @param key
+ * @param members
+ */
+ case class SADD(key: Array[Byte], members: Array[Byte]) {
+
+ def this(key: String, members: String) = this(key, toBytes(members))
+ }
+
+
+ /**
+ * Move a member from one set to another
+ *
+ * @param source
+ * @param destination
+ * @param member
+ */
+ case class SMOVE(source: Array[Byte], destination: Array[Byte], member: Array[Byte]) {
+ def this(source: String, destination: String, member: String) =
+ this(toBytes(source), toBytes(destination), toBytes(member))
+ }
+
+
+ /**
+ * Remove one or more members from a set
+ *
+ * @param key
+ * @param member
+ */
+ case class SREM(key: Array[Byte], member: Array[Byte]) {
+
+ def this(key: String, member: String) = this(key, toBytes(member))
+ }
+
+ }
+
+ object String {
+
+ /**
+ * Append a value to a key
+ *
+ * @param key
+ * @param value
+ */
+ case class APPEND(key: Array[Byte], value: Array[Byte]) {
+ def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+ }
+
+ /**
+ * Decrement the integer value of a key by one
+ *
+ * @param key
+ */
+ case class DECR(key: Array[Byte]) {
+ def this(key: String) = this(toBytes(key))
+ }
+
+ /**
+ * Decrement the integer value of a key by the given number
+ *
+ * @param key
+ * @param decrement
+ */
+ case class DECRBY(key: Array[Byte], decrement: Int) {
+ def this(key: String, decrement: Int) = this(toBytes(key), decrement)
+ }
+
+ /**
+ * Increment the integer value of a key by one
+ *
+ * @param key
+ */
+ case class INCR(key: Array[Byte]) {
+ def this(key: String) = this(toBytes(key))
+ }
+
+ /**
+ * Increment the integer value of a key by the given amount
+ *
+ * @param key
+ * @param increment
+ */
+ case class INCRBY(key: Array[Byte], increment: Int) {
+ def this(key: String, increment: Int) = this(toBytes(key), increment)
+ }
+
+ /**
+ * Increment the float value of a key by the given amount
+ *
+ * @param key
+ * @param increment
+ */
+ case class INCRBYFLOAT(key: Array[Byte], increment: Double) {
+ def this(key: String, increment: Number) = this(toBytes(key), increment)
+ }
+
+
+ /**
+ * Set the string value of a key
+ *
+ * @param key
+ * @param value
+ */
+ case class SET(key: Array[Byte], value: Array[Byte]) {
+ def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+ }
+
+ /**
+ * Sets or clears the bit at offset in the string value stored at key
+ *
+ * @param key
+ * @param offset
+ * @param value
+ */
+ case class SETBIT(key: Array[Byte], offset: Long, value: Array[Byte]) {
+ def this(key: String, offset: Long, value: String) = this(toBytes(key), offset, toBytes(value))
+ }
+
+ /**
+ * Set the value and expiration of a key
+ *
+ * @param key
+ * @param seconds
+ * @param value
+ */
+ case class SETEX(key: Array[Byte], seconds: Int, value: Array[Byte]) {
+ def this(key: String, seconds: Int, value: String) = this(toBytes(key), seconds, toBytes(value))
+ }
+
+ /**
+ * Set the value of a key, only if the key does not exist
+ *
+ * @param key
+ * @param value
+ */
+ case class SETNX(key: Array[Byte], value: Array[Byte]) {
+ def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+ }
+
+ /**
+ * Overwrite part of a string at key starting at the specified offset
+ *
+ * @param key
+ * @param offset
+ * @param value
+ */
+ case class SETRANGE(key: Array[Byte], offset: Int, value: Array[Byte]) {
+ def this(key: String, offset: Int, value: String) = this(toBytes(key), offset, toBytes(value))
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1b988912/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
----------------------------------------------------------------------
diff --git a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
new file mode 100644
index 0000000..3f75949
--- /dev/null
+++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.redis
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.redis.RedisMessage.Geo.GEOADD
+import org.apache.gearpump.redis.RedisMessage.Hashes._
+import org.apache.gearpump.redis.RedisMessage.HyperLogLog._
+import org.apache.gearpump.redis.RedisMessage.Keys._
+import org.apache.gearpump.redis.RedisMessage.Lists._
+import org.apache.gearpump.redis.RedisMessage.Sets._
+import org.apache.gearpump.redis.RedisMessage.String._
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.util.LogUtil
+import redis.clients.jedis.Jedis
+import redis.clients.jedis.Protocol.{DEFAULT_DATABASE, DEFAULT_HOST, DEFAULT_PORT, DEFAULT_TIMEOUT}
+
+/**
+ * Save message in Redis Instance
+ *
+ * @param host
+ * @param port
+ * @param timeout
+ * @param database
+ * @param password
+ */
+class RedisSink(
+ host: String = DEFAULT_HOST,
+ port: Int = DEFAULT_PORT,
+ timeout: Int = DEFAULT_TIMEOUT,
+ database: Int = DEFAULT_DATABASE,
+ password: String = "") extends DataSink {
+
+ private val LOG = LogUtil.getLogger(getClass)
+ @transient private lazy val client = new Jedis(host, port, timeout)
+
+ override def open(context: TaskContext): Unit = {
+ client.select(database)
+
+ if (password != null && password.length != 0) {
+ client.auth(password)
+ }
+ }
+
+ override def write(message: Message): Unit = {
+
+ message.msg match {
+ // GEO
+ case msg: GEOADD => client.geoadd(msg.key, msg.longitude, msg.latitude, msg.member)
+
+ // Hashes
+ case msg: HDEL => client.hdel(msg.key, msg.field)
+ case msg: HINCRBY => client.hincrBy(msg.key, msg.field, msg.increment)
+ case msg: HINCRBYFLOAT => client.hincrByFloat(msg.key, msg.field, msg.increment)
+ case msg: HSET => client.hset(msg.key, msg.field, msg.value)
+ case msg: HSETNX => client.hsetnx(msg.key, msg.field, msg.value)
+
+ // HyperLogLog
+ case msg: PFADD => client.pfadd(msg.key, msg.element)
+
+ // Lists
+ case msg: LPUSH => client.lpush(msg.key, msg.value)
+ case msg: LPUSHX => client.lpushx(msg.key, msg.value)
+ case msg: LSET => client.lset(msg.key, msg.index, msg.value)
+ case msg: RPUSH => client.rpush(msg.key, msg.value)
+ case msg: RPUSHX => client.rpushx(msg.key, msg.value)
+
+ // Keys
+ case msg: DEL => client.del(msg.message)
+ case msg: EXPIRE => client.expire(msg.key, msg.seconds)
+ case msg: EXPIREAT => client.expireAt(msg.key, msg.timestamp)
+ case msg: MIGRATE => client.migrate(msg.host, msg.port, msg.key, msg.database, msg.timeout)
+ case msg: MOVE => client.move(msg.key, msg.db)
+ case msg: PERSIST => client.persist(msg.key)
+ case msg: PEXPIRE => client.pexpire(msg.key, msg.milliseconds)
+ case msg: PEXPIREAT => client.pexpireAt(msg.key, msg.timestamp)
+ case msg: RENAME => client.rename(msg.key, msg.newKey)
+ case msg: RENAMENX => client.renamenx(msg.key, msg.newKey)
+
+ // Sets
+ case msg: SADD => client.sadd(msg.key, msg.members)
+ case msg: SMOVE => client.smove(msg.source, msg.destination, msg.member)
+ case msg: SREM => client.srem(msg.key, msg.member)
+
+ // String
+ case msg: APPEND => client.append(msg.key, msg.value)
+ case msg: DECR => client.decr(msg.key)
+ case msg: DECRBY => client.decrBy(msg.key, msg.decrement)
+ case msg: INCR => client.incr(msg.key)
+ case msg: INCRBY => client.incrBy(msg.key, msg.increment)
+ case msg: INCRBYFLOAT => client.incrByFloat(msg.key, msg.increment)
+ case msg: SET => client.set(msg.key, msg.value)
+ case msg: SETBIT => client.setbit(msg.key, msg.offset, msg.value)
+ case msg: SETEX => client.setex(msg.key, msg.seconds, msg.value)
+ case msg: SETNX => client.setnx(msg.key, msg.value)
+ case msg: SETRANGE => client.setrange(msg.key, msg.offset, msg.value)
+ }
+ }
+
+ override def close(): Unit = {
+ client.close()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1b988912/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index fe8ec61..34f0ae2 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -415,6 +415,18 @@ object Build extends sbt.Build {
))
.dependsOn(streaming % "test->test; provided", daemon % "test->test; provided")
+ lazy val redis = Project(
+ id = "gearpump-experiments-redis",
+ base = file("experiments/redis"),
+ settings = commonSettings ++ noPublish ++ myAssemblySettings ++
+ Seq(
+ libraryDependencies ++= Seq(
+ "redis.clients" % "jedis" % "2.9.0"
+ ),
+ mainClass in(Compile, packageBin) := Some("org.apache.gearpump.example.Test")
+ ))
+ .dependsOn(streaming % "test->test; provided", daemon % "test->test; provided")
+
lazy val storm = Project(
id = "gearpump-experiments-storm",
base = file("experiments/storm"),