You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/12 11:18:05 UTC

[pulsar] branch master updated: [pulsar-io] Add a Pulsar IO connector for Redis sink (#3700)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c5e3baa  [pulsar-io] Add a Pulsar IO connector for Redis sink (#3700)
c5e3baa is described below

commit c5e3baafdadd26790358a80d5cf375726a186409
Author: Fangbin Sun <su...@gmail.com>
AuthorDate: Tue Mar 12 19:17:58 2019 +0800

    [pulsar-io] Add a Pulsar IO connector for Redis sink (#3700)
    
    ### Motivation
    
    This PR provides a built-in Redis sink Connector, in order to cache messages in Redis [key-value] pairs. This will effectively make Redis a caching system, which other applications can access to get the latest value.
    
    ### Modifications
    
    Add a new sub-module in the `pulsar-io` module.
    
    ### Verifying this change
    
    This change can be verified as follows:
    
    * deploy the redis sink connector with configuration file containing the following fields:
    
    ```
    configs:
        redisHosts: "localhost:6379"
        redisPassword: "redis@123"
        redisDatabase: "1"
        clientMode: "Standalone"
        operationTimeout: "3000"
        batchSize: "100"
    ```
    * start a redis instance with auth
    * send messages with `NotNull` key/value in the topic declared when deploying the connector
    * check in Redis if the message's key-value pairs have been stored in above database
    
    ### Documentation
    ```
    # Submit a Redis Sink
    $ bin/pulsar-admin sink create --tenant public --namespace default --name redis-test-sink --sink-type redis --sink-config-file examples/redis-sink.yaml --inputs test_redis
    
    # List Sink
    $ bin/pulsar-admin sink list --tenant public --namespace default
    
    # Get Sink Info
    $ bin/pulsar-admin sink get --tenant public --namespace default --name redis-test-sink
    
    # Get Sink Status
    $ bin/pulsar-admin sink status --tenant public --namespace default --name redis-test-sink
    
    # Delete the Redis Sink
    $ bin/pulsar-admin sink delete --tenant public --namespace default --name redis-test-sink
    ```
---
 pulsar-io/pom.xml                                  |   1 +
 pulsar-io/redis/pom.xml                            | 100 ++++++++++++
 .../pulsar/io/redis/RedisAbstractConfig.java       | 124 +++++++++++++++
 .../org/apache/pulsar/io/redis/RedisSession.java   | 140 +++++++++++++++++
 .../org/apache/pulsar/io/redis/sink/RedisSink.java | 173 +++++++++++++++++++++
 .../pulsar/io/redis/sink/RedisSinkConfig.java      |  84 ++++++++++
 .../resources/META-INF/services/pulsar-io.yaml     |  21 +++
 .../apache/pulsar/io/redis/EmbeddedRedisUtils.java |  63 ++++++++
 .../pulsar/io/redis/sink/RedisSinkConfigTest.java  | 149 ++++++++++++++++++
 .../apache/pulsar/io/redis/sink/RedisSinkTest.java | 118 ++++++++++++++
 pulsar-io/redis/src/test/resources/sinkConfig.yaml |  29 ++++
 11 files changed, 1002 insertions(+)

diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index ffd1e67..d45ba7b 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -53,6 +53,7 @@
     <module>hbase</module>
     <module>mongo</module>
     <module>flume</module>
+    <module>redis</module>
   </modules>
 
 </project>
diff --git a/pulsar-io/redis/pom.xml b/pulsar-io/redis/pom.xml
new file mode 100644
index 0000000..62af469
--- /dev/null
+++ b/pulsar-io/redis/pom.xml
@@ -0,0 +1,100 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>pulsar-io</artifactId>
+        <groupId>org.apache.pulsar</groupId>
+        <version>2.4.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>pulsar-io-redis</artifactId>
+    <name>Pulsar IO :: Redis</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.parent.groupId}</groupId>
+            <artifactId>pulsar-io-core</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>pulsar-functions-instance</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>pulsar-client-original</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.lettuce</groupId>
+            <artifactId>lettuce-core</artifactId>
+            <version>5.0.2.RELEASE</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.dataformat</groupId>
+            <artifactId>jackson-dataformat-yaml</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.4</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-collections</groupId>
+            <artifactId>commons-collections</artifactId>
+            <version>3.2.2</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.parent.groupId}</groupId>
+            <artifactId>buildtools</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.github.kstyrc</groupId>
+            <artifactId>embedded-redis</artifactId>
+            <version>0.6</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-nar-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisAbstractConfig.java b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisAbstractConfig.java
new file mode 100644
index 0000000..519b398
--- /dev/null
+++ b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisAbstractConfig.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.redis;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.net.HostAndPort;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Configuration object for all Redis Sink components.
+ */
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class RedisAbstractConfig implements Serializable {
+
+    private static final long serialVersionUID = -7860917032537872317L;
+
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "A comma separated list of Redis hosts to connect to")
+    private String redisHosts;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "",
+        help = "The password used to connect to Redis")
+    private String redisPassword;
+
+    @FieldDoc(
+        required = true,
+        defaultValue = "0",
+        help = "The Redis database to connect to")
+    private int redisDatabase = 0;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "Standalone",
+        help = "The client mode to use when interacting with the Redis cluster. Possible values [Standalone, Cluster]")
+    private String clientMode = "Standalone";
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "true",
+        help = "Flag to determine if the Redis client should automatically reconnect")
+    private boolean autoReconnect = true;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "2147483647",
+        help = "The maximum number of queued requests to Redis")
+    private int requestQueue = 2147483647;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "false",
+        help = "Flag to enable TCP no delay should be used")
+    private boolean tcpNoDelay = false;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "false",
+        help = "Flag to enable a keepalive to Redis")
+    private boolean keepAlive = false;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "10000L",
+        help = "The amount of time in milliseconds to wait before timing out when connecting")
+    private long connectTimeout = 10000L;
+
+    public void validate() {
+        Preconditions.checkNotNull(redisHosts, "redisHosts property not set.");
+        Preconditions.checkNotNull(redisDatabase, "redisDatabase property not set.");
+        Preconditions.checkNotNull(clientMode, "clientMode property not set.");
+    }
+
+    public enum ClientMode {
+        STANDALONE,
+        CLUSTER
+    }
+
+    public List<HostAndPort> getHostAndPorts() {
+        List<HostAndPort> hostAndPorts = Lists.newArrayList();;
+        Preconditions.checkNotNull(redisHosts, "redisHosts property not set.");
+        String[] hosts = StringUtils.split(redisHosts, ",");
+        for (String host : hosts) {
+            HostAndPort hostAndPort = HostAndPort.fromString(host);
+            hostAndPorts.add(hostAndPort);
+        }
+        return hostAndPorts;
+    }
+}
diff --git a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisSession.java b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisSession.java
new file mode 100644
index 0000000..a7d0228
--- /dev/null
+++ b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisSession.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.redis;
+
+import com.google.common.collect.Lists;
+import com.google.common.net.HostAndPort;
+import io.lettuce.core.AbstractRedisClient;
+import io.lettuce.core.ClientOptions;
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.RedisURI;
+import io.lettuce.core.SocketOptions;
+import io.lettuce.core.api.StatefulConnection;
+import io.lettuce.core.api.StatefulRedisConnection;
+import io.lettuce.core.cluster.ClusterClientOptions;
+import io.lettuce.core.cluster.RedisClusterClient;
+import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
+import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
+import io.lettuce.core.codec.ByteArrayCodec;
+import io.lettuce.core.codec.RedisCodec;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.io.redis.RedisAbstractConfig.ClientMode;
+import org.apache.pulsar.io.redis.sink.RedisSinkConfig;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+
+public class RedisSession {
+
+    private final AbstractRedisClient client;
+    private final StatefulConnection connection;
+    private final RedisClusterAsyncCommands<byte[], byte[]> asyncCommands;
+
+    public RedisSession(AbstractRedisClient client, StatefulConnection connection, RedisClusterAsyncCommands<byte[], byte[]> asyncCommands) {
+        this.client = client;
+        this.connection = connection;
+        this.asyncCommands = asyncCommands;
+    }
+
+    public AbstractRedisClient client() {
+        return this.client;
+    }
+
+    public StatefulConnection connection() {
+        return this.connection;
+    }
+
+    public RedisClusterAsyncCommands<byte[], byte[]> asyncCommands() {
+        return this.asyncCommands;
+    }
+
+    public void close() throws Exception {
+        if (null != this.connection) {
+            this.connection.close();
+        }
+        if (null != this.client) {
+            this.client.shutdown();
+        }
+    }
+
+    public static RedisSession create(RedisSinkConfig config) {
+        RedisSession redisSession;
+        final RedisCodec<byte[], byte[]> codec = new ByteArrayCodec();
+
+        final SocketOptions socketOptions = SocketOptions.builder()
+            .tcpNoDelay(config.isTcpNoDelay())
+            .connectTimeout(Duration.ofMillis(config.getConnectTimeout()))
+            .keepAlive(config.isKeepAlive())
+            .build();
+
+        final ClientMode clientMode;
+        try {
+            clientMode = ClientMode.valueOf(config.getClientMode().toUpperCase());
+        } catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException("Illegal Redis client mode, valid values are: "
+                + Arrays.asList(ClientMode.values()));
+        }
+
+        List<RedisURI> redisURIs = redisURIs(config.getHostAndPorts(), config);
+
+        if (clientMode == ClientMode.STANDALONE) {
+            ClientOptions.Builder clientOptions = ClientOptions.builder()
+                .socketOptions(socketOptions)
+                .requestQueueSize(config.getRequestQueue())
+                .autoReconnect(config.isAutoReconnect());
+
+            final RedisClient client = RedisClient.create(redisURIs.get(0));
+            client.setOptions(clientOptions.build());
+            final StatefulRedisConnection<byte[], byte[]> connection = client.connect(codec);
+            redisSession = new RedisSession(client, connection, connection.async());
+        } else if (clientMode == ClientMode.CLUSTER) {
+            ClusterClientOptions.Builder clientOptions = ClusterClientOptions.builder()
+                .requestQueueSize(config.getRequestQueue())
+                .autoReconnect(config.isAutoReconnect());
+
+            final RedisClusterClient client = RedisClusterClient.create(redisURIs);
+            client.setOptions(clientOptions.build());
+
+            final StatefulRedisClusterConnection<byte[], byte[]> connection = client.connect(codec);
+            redisSession = new RedisSession(client, connection, connection.async());
+        } else {
+            throw new UnsupportedOperationException(
+                String.format("%s is not supported", config.getClientMode())
+            );
+        }
+
+        return redisSession;
+    }
+
+    private static List<RedisURI> redisURIs(List<HostAndPort> hostAndPorts, RedisSinkConfig config) {
+        List<RedisURI> redisURIs = Lists.newArrayList();
+        for (HostAndPort hostAndPort : hostAndPorts) {
+            RedisURI.Builder builder = RedisURI.builder();
+            builder.withHost(hostAndPort.getHost());
+            builder.withPort(hostAndPort.getPort());
+            builder.withDatabase(config.getRedisDatabase());
+            if (!StringUtils.isBlank(config.getRedisPassword())) {
+                builder.withPassword(config.getRedisPassword());
+            }
+            redisURIs.add(builder.build());
+        }
+        return redisURIs;
+    }
+}
diff --git a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java
new file mode 100644
index 0000000..d27448b
--- /dev/null
+++ b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.redis.sink;
+
+import com.google.common.collect.Lists;
+import io.lettuce.core.RedisFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+import org.apache.pulsar.io.redis.RedisSession;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A Simple Redis sink, which stores the key/value records from Pulsar in redis.
+ * Note that records from Pulsar with null keys or values will be ignored.
+ * This class expects records from Pulsar to have a key and value that are stored as bytes or a string.
+ */
+@Connector(
+    name = "redis",
+    type = IOType.SINK,
+    help = "A sink connector is used for moving messages from Pulsar to Redis.",
+    configClass = RedisSinkConfig.class
+)
+@Slf4j
+public class RedisSink<T> implements Sink<T> {
+
+    private RedisSinkConfig redisSinkConfig;
+
+    private RedisSession redisSession;
+
+    private long batchTimeMs;
+
+    private long operationTimeoutMs;
+
+    private int batchSize;
+
+    private List<Record<T>> incomingList;
+
+    private ScheduledExecutorService flushExecutor;
+
+    @Override
+    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+        log.info("Open Redis Sink");
+
+        redisSinkConfig = RedisSinkConfig.load(config);
+        redisSinkConfig.validate();
+
+        redisSession = RedisSession.create(redisSinkConfig);
+
+        operationTimeoutMs = redisSinkConfig.getOperationTimeout();
+
+        batchTimeMs = redisSinkConfig.getBatchTimeMs();
+        batchSize = redisSinkConfig.getBatchSize();
+        incomingList = Lists.newArrayList();
+        flushExecutor = Executors.newScheduledThreadPool(1);
+        flushExecutor.scheduleAtFixedRate(() -> flush(), batchTimeMs, batchTimeMs, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void write(Record<T> record) throws Exception {
+        int currentSize;
+        synchronized (this) {
+            incomingList.add(record);
+            currentSize = incomingList.size();
+        }
+        if (currentSize == batchSize) {
+            flushExecutor.submit(() -> flush());
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (null != redisSession) {
+            redisSession.close();
+        }
+
+        if (null != flushExecutor) {
+            flushExecutor.shutdown();
+        }
+    }
+
+    private void flush() {
+        final Map<byte[], byte[]> recordsToSet = new ConcurrentHashMap<>();
+        final List<Record<T>> recordsToFlush;
+
+        synchronized (this) {
+            if (incomingList.isEmpty()) {
+                return;
+            }
+            recordsToFlush = incomingList;
+            incomingList = Lists.newArrayList();
+        }
+
+        if (CollectionUtils.isNotEmpty(recordsToFlush)) {
+            for (Record<T> record: recordsToFlush) {
+                try {
+                    // records with null keys or values will be ignored
+                    byte[] key = toBytes("key", record.getKey().orElse(null));
+                    byte[] value = toBytes("value", record.getValue());
+                    recordsToSet.put(key, value);
+                } catch (Exception e) {
+                    record.fail();
+                    recordsToFlush.remove(record);
+                    log.warn("Record flush thread was exception ", e);
+                }
+            }
+        }
+
+        try {
+            if (recordsToSet.size() > 0) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Calling mset with {} values", recordsToSet.size());
+                }
+
+                RedisFuture<?> future = redisSession.asyncCommands().mset(recordsToSet);
+
+                if (!future.await(operationTimeoutMs, TimeUnit.MILLISECONDS) || future.getError() != null) {
+                    log.warn("Operation failed with error {} or timeout {} is exceeded", future.getError(), operationTimeoutMs);
+                    recordsToFlush.forEach(tRecord -> tRecord.fail());
+                    return;
+                }
+            }
+            recordsToFlush.forEach(tRecord -> tRecord.ack());
+            recordsToSet.clear();
+            recordsToFlush.clear();
+        } catch (InterruptedException e) {
+            recordsToFlush.forEach(tRecord -> tRecord.fail());
+            log.error("Redis mset data interrupted exception ", e);
+        }
+    }
+
+    private byte[] toBytes(String src, Object obj) {
+        final byte[] result;
+        if (obj instanceof String) {
+            String s = (String) obj;
+            result = s.getBytes(StandardCharsets.UTF_8);
+        } else if (obj instanceof byte[]) {
+            result = (byte[]) obj;
+        } else if (null == obj) {
+            result = null;
+        } else {
+            throw new IllegalArgumentException(String.format("The %s for the record must be String or Bytes.", src));
+        }
+        return result;
+    }
+}
diff --git a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSinkConfig.java b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSinkConfig.java
new file mode 100644
index 0000000..4ff3c3b
--- /dev/null
+++ b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSinkConfig.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.redis.sink;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+import org.apache.pulsar.io.redis.RedisAbstractConfig;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode(callSuper = false)
+@ToString
+@Accessors(chain = true)
+public class RedisSinkConfig extends RedisAbstractConfig implements Serializable {
+
+    private static final long serialVersionUID = 4686456460365805717L;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "10000L",
+        help = "The amount of time in milliseconds before an operation is marked as timed out")
+    private long operationTimeout = 10000L;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "1000L",
+        help = "The Redis operation time in milliseconds")
+    private long batchTimeMs = 1000L;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "200",
+        help = "The batch size of write to Redis database"
+    )
+    private int batchSize = 200;
+
+    public static RedisSinkConfig load(String yamlFile) throws IOException {
+        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        return mapper.readValue(new File(yamlFile), RedisSinkConfig.class);
+    }
+
+    public static RedisSinkConfig load(Map<String, Object> map) throws IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(new ObjectMapper().writeValueAsString(map), RedisSinkConfig.class);
+    }
+
+    @Override
+    public void validate() {
+        super.validate();
+        Preconditions.checkArgument(operationTimeout > 0, "operationTimeout must be a positive long.");
+        Preconditions.checkArgument(batchTimeMs > 0, "batchTimeMs must be a positive long.");
+        Preconditions.checkArgument(batchSize > 0, "batchSize must be a positive integer.");
+    }
+}
diff --git a/pulsar-io/redis/src/main/resources/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/redis/src/main/resources/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000..39a8629
--- /dev/null
+++ b/pulsar-io/redis/src/main/resources/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+name: redis
+description: Writes data into Redis
+sinkClass: org.apache.pulsar.io.redis.sink.RedisSink
diff --git a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/EmbeddedRedisUtils.java b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/EmbeddedRedisUtils.java
new file mode 100644
index 0000000..1bd5295
--- /dev/null
+++ b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/EmbeddedRedisUtils.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.redis;
+
+import lombok.extern.slf4j.Slf4j;
+import redis.embedded.RedisExecProvider;
+import redis.embedded.RedisServer;
+import redis.embedded.util.Architecture;
+import redis.embedded.util.OS;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+@Slf4j
+public final class EmbeddedRedisUtils {
+
+    private final Path dbPath;
+    private final RedisServer redisServer;
+
+    public EmbeddedRedisUtils(String testId) {
+        dbPath = Paths.get(testId + "/redis");
+        String execFile = "redis-server-2.8.19";
+        RedisExecProvider customProvider = RedisExecProvider
+            .defaultProvider()
+            .override(OS.UNIX, Architecture.x86_64, execFile);
+        redisServer = RedisServer.builder()
+            .redisExecProvider(customProvider)
+            .port(6379)
+            .slaveOf("localhost", 6378)
+            .setting("daemonize no")
+            .setting("appendonly no")
+            .build();
+    }
+
+    public void setUp() throws IOException {
+        redisServer.start();
+        Files.deleteIfExists(dbPath);
+    }
+
+    public void tearDown() throws IOException {
+        redisServer.stop();
+        Files.deleteIfExists(dbPath);
+    }
+
+}
diff --git a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkConfigTest.java b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkConfigTest.java
new file mode 100644
index 0000000..5726bbf
--- /dev/null
+++ b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkConfigTest.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.redis.sink;
+
+import org.apache.pulsar.io.redis.RedisAbstractConfig;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+/**
+ * RedisSinkConfig test
+ */
+public class RedisSinkConfigTest {
+
+    @Test
+    public final void loadFromYamlFileTest() throws IOException {
+        File yamlFile = getFile("sinkConfig.yaml");
+        String path = yamlFile.getAbsolutePath();
+        RedisSinkConfig config = RedisSinkConfig.load(path);
+        assertNotNull(config);
+        assertEquals("localhost:6379", config.getRedisHosts());
+        assertEquals("fake@123", config.getRedisPassword());
+        assertEquals(Integer.parseInt("1"), config.getRedisDatabase());
+        assertEquals("Standalone", config.getClientMode());
+        assertEquals(Long.parseLong("2000"), config.getOperationTimeout());
+        assertEquals(Integer.parseInt("100"), config.getBatchSize());
+        assertEquals(Long.parseLong("1000"), config.getBatchTimeMs());
+        assertEquals(Long.parseLong("3000"), config.getConnectTimeout());
+    }
+
+    @Test
+    public final void loadFromMapTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object>();
+        map.put("redisHosts", "localhost:6379");
+        map.put("redisPassword", "fake@123");
+        map.put("redisDatabase", "1");
+        map.put("clientMode", "Standalone");
+        map.put("operationTimeout", "2000");
+        map.put("batchSize", "100");
+        map.put("batchTimeMs", "1000");
+        map.put("connectTimeout", "3000");
+
+        RedisSinkConfig config = RedisSinkConfig.load(map);
+        assertNotNull(config);
+        assertEquals("localhost:6379", config.getRedisHosts());
+        assertEquals("fake@123", config.getRedisPassword());
+        assertEquals(Integer.parseInt("1"), config.getRedisDatabase());
+        assertEquals("Standalone", config.getClientMode());
+        assertEquals(Long.parseLong("2000"), config.getOperationTimeout());
+        assertEquals(Integer.parseInt("100"), config.getBatchSize());
+        assertEquals(Long.parseLong("1000"), config.getBatchTimeMs());
+        assertEquals(Long.parseLong("3000"), config.getConnectTimeout());
+    }
+
+    @Test
+    public final void validValidateTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object>();
+        map.put("redisHosts", "localhost:6379");
+        map.put("redisPassword", "fake@123");
+        map.put("redisDatabase", "1");
+        map.put("clientMode", "Standalone");
+        map.put("operationTimeout", "2000");
+        map.put("batchSize", "100");
+        map.put("batchTimeMs", "1000");
+        map.put("connectTimeout", "3000");
+
+        RedisSinkConfig config = RedisSinkConfig.load(map);
+        config.validate();
+    }
+
+    @Test(expectedExceptions = NullPointerException.class,
+        expectedExceptionsMessageRegExp = "redisHosts property not set.")
+    public final void missingValidValidateTableNameTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object>();
+        map.put("redisPassword", "fake@123");
+        map.put("redisDatabase", "1");
+        map.put("clientMode", "Standalone");
+        map.put("operationTimeout", "2000");
+        map.put("batchSize", "100");
+        map.put("batchTimeMs", "1000");
+        map.put("connectTimeout", "3000");
+
+        RedisSinkConfig config = RedisSinkConfig.load(map);
+        config.validate();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+        expectedExceptionsMessageRegExp = "batchTimeMs must be a positive long.")
+    public final void invalidBatchTimeMsTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object>();
+        map.put("redisHosts", "localhost:6379");
+        map.put("redisPassword", "fake@123");
+        map.put("redisDatabase", "1");
+        map.put("clientMode", "Standalone");
+        map.put("operationTimeout", "2000");
+        map.put("batchSize", "100");
+        map.put("batchTimeMs", "-100");
+        map.put("connectTimeout", "3000");
+
+        RedisSinkConfig config = RedisSinkConfig.load(map);
+        config.validate();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+        expectedExceptionsMessageRegExp = "No enum constant org.apache.pulsar.io.redis.RedisAbstractConfig.ClientMode.NOTSUPPORT")
+    public final void invalidClientModeTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object>();
+        map.put("redisHosts", "localhost:6379");
+        map.put("redisPassword", "fake@123");
+        map.put("redisDatabase", "1");
+        map.put("clientMode", "NotSupport");
+        map.put("operationTimeout", "2000");
+        map.put("batchSize", "100");
+        map.put("batchTimeMs", "1000");
+        map.put("connectTimeout", "3000");
+
+        RedisSinkConfig config = RedisSinkConfig.load(map);
+        config.validate();
+
+        RedisAbstractConfig.ClientMode.valueOf(config.getClientMode().toUpperCase());
+    }
+
+    private File getFile(String name) {
+        ClassLoader classLoader = getClass().getClassLoader();
+        return new File(classLoader.getResource(name).getFile());
+    }
+}
diff --git a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
new file mode 100644
index 0000000..455ef89
--- /dev/null
+++ b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.redis.sink;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.source.PulsarRecord;
+import org.apache.pulsar.io.redis.EmbeddedRedisUtils;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * redis Sink test
+ */
+@Slf4j
+public class RedisSinkTest {
+
+    private EmbeddedRedisUtils embeddedRedisUtils;
+
+    /**
+     * A Simple class to test redis class
+     */
+    @Data
+    @ToString
+    @EqualsAndHashCode
+    public static class Foo {
+        private String field1;
+        private String field2;
+    }
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        embeddedRedisUtils = new EmbeddedRedisUtils(getClass().getSimpleName());
+        embeddedRedisUtils.setUp();
+    }
+
+    @AfterMethod
+    public void tearDown() throws Exception {
+        embeddedRedisUtils.tearDown();
+    }
+
+    @Test
+    public void TestOpenAndWriteSink() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put("redisHosts", "localhost:6379");
+        configs.put("redisPassword", "");
+        configs.put("redisDatabase", "1");
+        configs.put("clientMode", "Standalone");
+        configs.put("operationTimeout", "3000");
+        configs.put("batchSize", "10");
+
+        RedisSink sink = new RedisSink();
+
+        // prepare a foo Record
+        Foo obj = new Foo();
+        obj.setField1("FakeFiled1");
+        obj.setField2("FakeFiled1");
+        AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
+
+        byte[] bytes = schema.encode(obj);
+        ByteBuf payload = Unpooled.copiedBuffer(bytes);
+        AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
+        autoConsumeSchema.setSchema(GenericSchemaImpl.of(schema.getSchemaInfo()));
+
+        Message<GenericRecord> message = new MessageImpl("fake_topic_name", "77:777", configs, payload, autoConsumeSchema);
+        Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
+            .message(message)
+            .topicName("fake_topic_name")
+            .build();
+
+        log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
+            obj.toString(),
+            message.getValue().toString(),
+            record.getValue().toString());
+
+        // open should success
+        sink.open(configs, null);
+
+        // write should success.
+        sink.write(record);
+        log.info("executed write");
+
+        // sleep to wait backend flush complete
+        Thread.sleep(1000);
+
+    }
+}
diff --git a/pulsar-io/redis/src/test/resources/sinkConfig.yaml b/pulsar-io/redis/src/test/resources/sinkConfig.yaml
new file mode 100644
index 0000000..4cc1990
--- /dev/null
+++ b/pulsar-io/redis/src/test/resources/sinkConfig.yaml
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+{
+"redisHosts": "localhost:6379",
+"redisPassword": "fake@123",
+"redisDatabase": "1",
+"clientMode": "Standalone",
+"operationTimeout": "2000",
+"batchSize": "100",
+"batchTimeMs": "1000",
+"connectTimeout": "3000"
+}