You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/10/10 22:10:53 UTC

incubator-gearpump git commit: GEARPUMP-215 Gearpump Redis Integration - RedisSink

Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 9b879456d -> 1b9889121


GEARPUMP-215 Gearpump Redis Integration - RedisSink

[Gearpump Redis Integration - RedisStorage](https://issues.apache.org/jira/browse/GEARPUMP-215)

Author: darionyaphet <da...@gmail.com>

Closes #93 from darionyaphet/GEARPUMP-215.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/1b988912
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/1b988912
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/1b988912

Branch: refs/heads/master
Commit: 1b9889121c0d776c9423544b76e27292838d1300
Parents: 9b87945
Author: darionyaphet <da...@gmail.com>
Authored: Tue Oct 11 06:10:33 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Tue Oct 11 06:10:33 2016 +0800

----------------------------------------------------------------------
 .../apache/gearpump/redis/RedisMessage.scala    | 456 +++++++++++++++++++
 .../org/apache/gearpump/redis/RedisSink.scala   | 119 +++++
 project/Build.scala                             |  12 +
 3 files changed, 587 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1b988912/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala
----------------------------------------------------------------------
diff --git a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala
new file mode 100644
index 0000000..84dec70
--- /dev/null
+++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.redis
+
+import java.nio.charset.Charset
+
+object RedisMessage {
+
+  private def toBytes(strings: List[String]): List[Array[Byte]] =
+    strings.map(string => string.getBytes(Charset.forName("UTF8")))
+
+  private def toBytes(string: String): Array[Byte] =
+    string.getBytes(Charset.forName("UTF8"))
+
+  object Connection {
+
+    /**
+     * Change the selected database for the current connection
+     *
+     * @param index
+     */
+    case class SELECT(index: Int)
+
+  }
+
+  object Geo {
+
+    /**
+     * Add one geospatial item in the geospatial index represented using a sorted set
+     *
+     * @param key
+     * @param longitude
+     * @param latitude
+     * @param member
+     */
+    case class GEOADD(key: Array[Byte], longitude: Double,
+                      latitude: Double, member: Array[Byte]) {
+      def this(key: String, longitude: Double,
+               latitude: Double, member: String) =
+        this(toBytes(key), longitude, latitude, toBytes(member))
+    }
+
+  }
+
+  object Hashes {
+
+    /**
+     * Delete a hash field
+     *
+     * @param key
+     * @param field
+     */
+    case class HDEL(key: Array[Byte], field: Array[Byte]) {
+      def this(key: String, field: String) = this(toBytes(key), toBytes(field))
+    }
+
+    /**
+     * Increment the integer value of a hash field by the given number
+     *
+     * @param key
+     * @param field
+     * @param increment
+     */
+    case class HINCRBY(key: Array[Byte], field: Array[Byte], increment: Long) {
+      def this(key: String, field: String, increment: Long) =
+        this(toBytes(key), toBytes(field), increment)
+    }
+
+    /**
+     * Increment the float value of a hash field by the given amount
+     *
+     * @param key
+     * @param field
+     * @param increment
+     */
+    case class HINCRBYFLOAT(key: Array[Byte], field: Array[Byte], increment: Float) {
+      def this(key: String, field: String, increment: Float) =
+        this(toBytes(key), toBytes(field), increment)
+    }
+
+
+    /**
+     * Set the string value of a hash field
+     *
+     * @param key
+     * @param field
+     * @param value
+     */
+    case class HSET(key: Array[Byte], field: Array[Byte], value: Array[Byte]) {
+      def this(key: String, field: String, value: String) =
+        this(toBytes(key), toBytes(field), toBytes(value))
+    }
+
+    /**
+     * Set the value of a hash field, only if the field does not exist
+     *
+     * @param key
+     * @param field
+     * @param value
+     */
+    case class HSETNX(key: Array[Byte], field: Array[Byte], value: Array[Byte]) {
+      def this(key: String, field: String, value: String) =
+        this(toBytes(key), toBytes(field), toBytes(value))
+    }
+
+  }
+
+  object HyperLogLog {
+
+    /**
+     * Adds the specified elements to the specified HyperLogLog
+     *
+     * @param key
+     * @param element
+     */
+    case class PFADD(key: String, element: String)
+
+  }
+
+  object Lists {
+
+
+    /**
+     * Prepend one or multiple values to a list
+     *
+     * @param key
+     * @param value
+     */
+    case class LPUSH(key: Array[Byte], value: Array[Byte]) {
+
+      def this(key: String, value: String) = this(key, toBytes(value))
+    }
+
+    /**
+     * Prepend a value to a list, only if the list exists
+     *
+     * @param key
+     * @param value
+     */
+    case class LPUSHX(key: Array[Byte], value: Array[Byte]) {
+      def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+    }
+
+    /**
+     * Set the value of an element in a list by its index
+     *
+     * @param key
+     * @param index
+     * @param value
+     */
+    case class LSET(key: Array[Byte], index: Long, value: Array[Byte]) {
+      def this(key: String, index: Long, value: String) = this(toBytes(key), index, toBytes(value))
+    }
+
+    /**
+     * Append one or multiple values to a list
+     *
+     * @param key
+     * @param value
+     */
+    case class RPUSH(key: Array[Byte], value: Array[Byte]) {
+
+      def this(key: String, value: String) = this(key, toBytes(value))
+    }
+
+    /**
+     * Append a value to a list, only if the list exists
+     *
+     * @param key
+     * @param value
+     */
+    case class RPUSHX(key: Array[Byte], value: Array[Byte]) {
+      def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+    }
+
+  }
+
+  object Keys {
+
+    /**
+     * Delete a key
+     *
+     * @param message
+     */
+    case class DEL(message: Array[Byte]) {
+
+      def this(message: String) = this(toBytes(message))
+    }
+
+    /**
+     * Set a key's time to live in seconds
+     *
+     * @param key
+     */
+    case class EXPIRE(key: Array[Byte], seconds: Int) {
+      def this(key: String, seconds: Int) = this(toBytes(key), seconds)
+    }
+
+    /**
+     * Set the expiration for a key as a UNIX timestamp
+     *
+     * @param key
+     * @param timestamp
+     */
+    case class EXPIREAT(key: Array[Byte], timestamp: Long) {
+      def this(key: String, timestamp: Long) = this(toBytes(key), timestamp)
+    }
+
+    /**
+     * Atomically transfer a key from a Redis instance to another one.
+     *
+     * @param host
+     * @param port
+     * @param key
+     * @param database
+     * @param timeout
+     */
+    case class MIGRATE(host: Array[Byte], port: Int, key: Array[Byte], database: Int, timeout: Int) {
+      def this(host: String, port: Int, key: String, database: Int, timeout: Int) =
+        this(toBytes(host), port, toBytes(key), database, timeout)
+    }
+
+    /**
+     * Move a key to another database
+     *
+     * @param key
+     * @param db
+     */
+    case class MOVE(key: Array[Byte], db: Int) {
+      def this(key: String, db: Int) = this(toBytes(key), db)
+    }
+
+    /**
+     * Remove the expiration from a key
+     *
+     * @param key
+     */
+    case class PERSIST(key: Array[Byte]) {
+      def this(key: String) = this(toBytes(key))
+    }
+
+    /**
+     * Set a key's time to live in milliseconds
+     *
+     * @param key
+     * @param milliseconds
+     */
+    case class PEXPIRE(key: Array[Byte], milliseconds: Long) {
+      def this(key: String, milliseconds: Long) = this(toBytes(key), milliseconds)
+    }
+
+    /**
+     * Set the expiration for a key as a UNIX timestamp specified in milliseconds
+     *
+     * @param key
+     * @param timestamp
+     */
+    case class PEXPIREAT(key: Array[Byte], timestamp: Long) {
+      def this(key: String, milliseconds: Long) = this(toBytes(key), milliseconds)
+    }
+
+    /**
+     * Rename a key
+     *
+     * @param key
+     * @param newKey
+     */
+    case class RENAME(key: Array[Byte], newKey: Array[Byte]) {
+      def this(key: String, newKey: String) = this(toBytes(key), toBytes(newKey))
+    }
+
+    /**
+     * Rename a key, only if the new key does not exist
+     *
+     * @param key
+     * @param newKey
+     */
+    case class RENAMENX(key: Array[Byte], newKey: Array[Byte]) {
+      def this(key: String, newKey: String) = this(toBytes(key), toBytes(newKey))
+    }
+
+  }
+
+
+  object Sets {
+
+    /**
+     * Add one or more members to a set
+     *
+     * @param key
+     * @param members
+     */
+    case class SADD(key: Array[Byte], members: Array[Byte]) {
+
+      def this(key: String, members: String) = this(key, toBytes(members))
+    }
+
+
+    /**
+     * Move a member from one set to another
+     *
+     * @param source
+     * @param destination
+     * @param member
+     */
+    case class SMOVE(source: Array[Byte], destination: Array[Byte], member: Array[Byte]) {
+      def this(source: String, destination: String, member: String) =
+        this(toBytes(source), toBytes(destination), toBytes(member))
+    }
+
+
+    /**
+     * Remove one or more members from a set
+     *
+     * @param key
+     * @param member
+     */
+    case class SREM(key: Array[Byte], member: Array[Byte]) {
+
+      def this(key: String, member: String) = this(key, toBytes(member))
+    }
+
+  }
+
+  object String {
+
+    /**
+     * Append a value to a key
+     *
+     * @param key
+     * @param value
+     */
+    case class APPEND(key: Array[Byte], value: Array[Byte]) {
+      def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+    }
+
+    /**
+     * Decrement the integer value of a key by one
+     *
+     * @param key
+     */
+    case class DECR(key: Array[Byte]) {
+      def this(key: String) = this(toBytes(key))
+    }
+
+    /**
+     * Decrement the integer value of a key by the given number
+     *
+     * @param key
+     * @param decrement
+     */
+    case class DECRBY(key: Array[Byte], decrement: Int) {
+      def this(key: String, decrement: Int) = this(toBytes(key), decrement)
+    }
+
+    /**
+     * Increment the integer value of a key by one
+     *
+     * @param key
+     */
+    case class INCR(key: Array[Byte]) {
+      def this(key: String) = this(toBytes(key))
+    }
+
+    /**
+     * Increment the integer value of a key by the given amount
+     *
+     * @param key
+     * @param increment
+     */
+    case class INCRBY(key: Array[Byte], increment: Int) {
+      def this(key: String, increment: Int) = this(toBytes(key), increment)
+    }
+
+    /**
+     * Increment the float value of a key by the given amount
+     *
+     * @param key
+     * @param increment
+     */
+    case class INCRBYFLOAT(key: Array[Byte], increment: Double) {
+      def this(key: String, increment: Number) = this(toBytes(key), increment)
+    }
+
+
+    /**
+     * Set the string value of a key
+     *
+     * @param key
+     * @param value
+     */
+    case class SET(key: Array[Byte], value: Array[Byte]) {
+      def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+    }
+
+    /**
+     * Sets or clears the bit at offset in the string value stored at key
+     *
+     * @param key
+     * @param offset
+     * @param value
+     */
+    case class SETBIT(key: Array[Byte], offset: Long, value: Array[Byte]) {
+      def this(key: String, offset: Long, value: String) = this(toBytes(key), offset, toBytes(value))
+    }
+
+    /**
+     * Set the value and expiration of a key
+     *
+     * @param key
+     * @param seconds
+     * @param value
+     */
+    case class SETEX(key: Array[Byte], seconds: Int, value: Array[Byte]) {
+      def this(key: String, seconds: Int, value: String) = this(toBytes(key), seconds, toBytes(value))
+    }
+
+    /**
+     * Set the value of a key, only if the key does not exist
+     *
+     * @param key
+     * @param value
+     */
+    case class SETNX(key: Array[Byte], value: Array[Byte]) {
+      def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+    }
+
+    /**
+     * Overwrite part of a string at key starting at the specified offset
+     *
+     * @param key
+     * @param offset
+     * @param value
+     */
+    case class SETRANGE(key: Array[Byte], offset: Int, value: Array[Byte]) {
+      def this(key: String, offset: Int, value: String) = this(toBytes(key), offset, toBytes(value))
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1b988912/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
----------------------------------------------------------------------
diff --git a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
new file mode 100644
index 0000000..3f75949
--- /dev/null
+++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.redis
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.redis.RedisMessage.Geo.GEOADD
+import org.apache.gearpump.redis.RedisMessage.Hashes._
+import org.apache.gearpump.redis.RedisMessage.HyperLogLog._
+import org.apache.gearpump.redis.RedisMessage.Keys._
+import org.apache.gearpump.redis.RedisMessage.Lists._
+import org.apache.gearpump.redis.RedisMessage.Sets._
+import org.apache.gearpump.redis.RedisMessage.String._
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.util.LogUtil
+import redis.clients.jedis.Jedis
+import redis.clients.jedis.Protocol.{DEFAULT_DATABASE, DEFAULT_HOST, DEFAULT_PORT, DEFAULT_TIMEOUT}
+
+/**
+  * Save message in Redis Instance
+  *
+  * @param host
+  * @param port
+  * @param timeout
+  * @param database
+  * @param password
+  */
+class RedisSink(
+                    host: String = DEFAULT_HOST,
+                    port: Int = DEFAULT_PORT,
+                    timeout: Int = DEFAULT_TIMEOUT,
+                    database: Int = DEFAULT_DATABASE,
+                    password: String = "") extends DataSink {
+
+  private val LOG = LogUtil.getLogger(getClass)
+  @transient private lazy val client = new Jedis(host, port, timeout)
+
+  override def open(context: TaskContext): Unit = {
+    client.select(database)
+
+    if (password != null && password.length != 0) {
+      client.auth(password)
+    }
+  }
+
+  override def write(message: Message): Unit = {
+
+    message.msg match {
+      // GEO
+      case msg: GEOADD => client.geoadd(msg.key, msg.longitude, msg.latitude, msg.member)
+
+      // Hashes
+      case msg: HDEL => client.hdel(msg.key, msg.field)
+      case msg: HINCRBY => client.hincrBy(msg.key, msg.field, msg.increment)
+      case msg: HINCRBYFLOAT => client.hincrByFloat(msg.key, msg.field, msg.increment)
+      case msg: HSET => client.hset(msg.key, msg.field, msg.value)
+      case msg: HSETNX => client.hsetnx(msg.key, msg.field, msg.value)
+
+      // HyperLogLog
+      case msg: PFADD => client.pfadd(msg.key, msg.element)
+
+      // Lists
+      case msg: LPUSH => client.lpush(msg.key, msg.value)
+      case msg: LPUSHX => client.lpushx(msg.key, msg.value)
+      case msg: LSET => client.lset(msg.key, msg.index, msg.value)
+      case msg: RPUSH => client.rpush(msg.key, msg.value)
+      case msg: RPUSHX => client.rpushx(msg.key, msg.value)
+
+      // Keys
+      case msg: DEL => client.del(msg.message)
+      case msg: EXPIRE => client.expire(msg.key, msg.seconds)
+      case msg: EXPIREAT => client.expireAt(msg.key, msg.timestamp)
+      case msg: MIGRATE => client.migrate(msg.host, msg.port, msg.key, msg.database, msg.timeout)
+      case msg: MOVE => client.move(msg.key, msg.db)
+      case msg: PERSIST => client.persist(msg.key)
+      case msg: PEXPIRE => client.pexpire(msg.key, msg.milliseconds)
+      case msg: PEXPIREAT => client.pexpireAt(msg.key, msg.timestamp)
+      case msg: RENAME => client.rename(msg.key, msg.newKey)
+      case msg: RENAMENX => client.renamenx(msg.key, msg.newKey)
+
+      // Sets
+      case msg: SADD => client.sadd(msg.key, msg.members)
+      case msg: SMOVE => client.smove(msg.source, msg.destination, msg.member)
+      case msg: SREM => client.srem(msg.key, msg.member)
+
+      // String
+      case msg: APPEND => client.append(msg.key, msg.value)
+      case msg: DECR => client.decr(msg.key)
+      case msg: DECRBY => client.decrBy(msg.key, msg.decrement)
+      case msg: INCR => client.incr(msg.key)
+      case msg: INCRBY => client.incrBy(msg.key, msg.increment)
+      case msg: INCRBYFLOAT => client.incrByFloat(msg.key, msg.increment)
+      case msg: SET => client.set(msg.key, msg.value)
+      case msg: SETBIT => client.setbit(msg.key, msg.offset, msg.value)
+      case msg: SETEX => client.setex(msg.key, msg.seconds, msg.value)
+      case msg: SETNX => client.setnx(msg.key, msg.value)
+      case msg: SETRANGE => client.setrange(msg.key, msg.offset, msg.value)
+    }
+  }
+
+  override def close(): Unit = {
+    client.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1b988912/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index fe8ec61..34f0ae2 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -415,6 +415,18 @@ object Build extends sbt.Build {
       ))
       .dependsOn(streaming % "test->test; provided", daemon % "test->test; provided")
 
+  lazy val redis = Project(
+    id = "gearpump-experiments-redis",
+    base = file("experiments/redis"),
+    settings = commonSettings ++ noPublish ++ myAssemblySettings ++
+      Seq(
+        libraryDependencies ++= Seq(
+          "redis.clients" % "jedis" % "2.9.0"
+        ),
+        mainClass in(Compile, packageBin) := Some("org.apache.gearpump.example.Test")
+      ))
+    .dependsOn(streaming % "test->test; provided", daemon % "test->test; provided")
+
   lazy val storm = Project(
     id = "gearpump-experiments-storm",
     base = file("experiments/storm"),