You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/12 11:18:05 UTC
[pulsar] branch master updated: [pulsar-io] Add a Pulsar IO
connector for Redis sink (#3700)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c5e3baa [pulsar-io] Add a Pulsar IO connector for Redis sink (#3700)
c5e3baa is described below
commit c5e3baafdadd26790358a80d5cf375726a186409
Author: Fangbin Sun <su...@gmail.com>
AuthorDate: Tue Mar 12 19:17:58 2019 +0800
[pulsar-io] Add a Pulsar IO connector for Redis sink (#3700)
### Motivation
This PR provides a built-in Redis sink Connector, in order to cache messages in Redis [key-value] pairs. This will effectively make Redis a caching system, which other applications can access to get the latest value.
### Modifications
Add a new sub-module in the `pulsar-io` module.
### Verifying this change
This change can be verified as follows:
* deploy the redis sink connector with configuration file containing the following fields:
```
configs:
redisHosts: "localhost:6379"
redisPassword: "redis@123"
redisDatabase: "1"
clientMode: "Standalone"
operationTimeout: "3000"
batchSize: "100"
```
* start a redis instance with auth
* send messages with `NotNull` key/value in the topic declared when deploying the connector
* check in Redis if the message's key-value pairs have been stored in above database
### Documentation
```
# Submit a Redis Sink
$ bin/pulsar-admin sink create --tenant public --namespace default --name redis-test-sink --sink-type redis --sink-config-file examples/redis-sink.yaml --inputs test_redis
# List Sink
$ bin/pulsar-admin sink list --tenant public --namespace default
# Get Sink Info
$ bin/pulsar-admin sink get --tenant public --namespace default --name redis-test-sink
# Get Sink Status
$ bin/pulsar-admin sink status --tenant public --namespace default --name redis-test-sink
# Delete the Redis Sink
$ bin/pulsar-admin sink delete --tenant public --namespace default --name redis-test-sink
```
---
pulsar-io/pom.xml | 1 +
pulsar-io/redis/pom.xml | 100 ++++++++++++
.../pulsar/io/redis/RedisAbstractConfig.java | 124 +++++++++++++++
.../org/apache/pulsar/io/redis/RedisSession.java | 140 +++++++++++++++++
.../org/apache/pulsar/io/redis/sink/RedisSink.java | 173 +++++++++++++++++++++
.../pulsar/io/redis/sink/RedisSinkConfig.java | 84 ++++++++++
.../resources/META-INF/services/pulsar-io.yaml | 21 +++
.../apache/pulsar/io/redis/EmbeddedRedisUtils.java | 63 ++++++++
.../pulsar/io/redis/sink/RedisSinkConfigTest.java | 149 ++++++++++++++++++
.../apache/pulsar/io/redis/sink/RedisSinkTest.java | 118 ++++++++++++++
pulsar-io/redis/src/test/resources/sinkConfig.yaml | 29 ++++
11 files changed, 1002 insertions(+)
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index ffd1e67..d45ba7b 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -53,6 +53,7 @@
<module>hbase</module>
<module>mongo</module>
<module>flume</module>
+ <module>redis</module>
</modules>
</project>
diff --git a/pulsar-io/redis/pom.xml b/pulsar-io/redis/pom.xml
new file mode 100644
index 0000000..62af469
--- /dev/null
+++ b/pulsar-io/redis/pom.xml
@@ -0,0 +1,100 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>pulsar-io</artifactId>
+ <groupId>org.apache.pulsar</groupId>
+ <version>2.4.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>pulsar-io-redis</artifactId>
+ <name>Pulsar IO :: Redis</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>pulsar-io-core</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-functions-instance</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client-original</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.lettuce</groupId>
+ <artifactId>lettuce-core</artifactId>
+ <version>5.0.2.RELEASE</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-yaml</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.4</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ <version>3.2.2</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>buildtools</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.github.kstyrc</groupId>
+ <artifactId>embedded-redis</artifactId>
+ <version>0.6</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-nar-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisAbstractConfig.java b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisAbstractConfig.java
new file mode 100644
index 0000000..519b398
--- /dev/null
+++ b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisAbstractConfig.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.redis;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.net.HostAndPort;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Configuration object for all Redis Sink components.
+ */
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class RedisAbstractConfig implements Serializable {
+
+ private static final long serialVersionUID = -7860917032537872317L;
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "A comma separated list of Redis hosts to connect to")
+ private String redisHosts;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "",
+ help = "The password used to connect to Redis")
+ private String redisPassword;
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "0",
+ help = "The Redis database to connect to")
+ private int redisDatabase = 0;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "Standalone",
+ help = "The client mode to use when interacting with the Redis cluster. Possible values [Standalone, Cluster]")
+ private String clientMode = "Standalone";
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "true",
+ help = "Flag to determine if the Redis client should automatically reconnect")
+ private boolean autoReconnect = true;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "2147483647",
+ help = "The maximum number of queued requests to Redis")
+ private int requestQueue = 2147483647;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "false",
+ help = "Flag to enable TCP no delay should be used")
+ private boolean tcpNoDelay = false;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "false",
+ help = "Flag to enable a keepalive to Redis")
+ private boolean keepAlive = false;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "10000L",
+ help = "The amount of time in milliseconds to wait before timing out when connecting")
+ private long connectTimeout = 10000L;
+
+ public void validate() {
+ Preconditions.checkNotNull(redisHosts, "redisHosts property not set.");
+ Preconditions.checkNotNull(redisDatabase, "redisDatabase property not set.");
+ Preconditions.checkNotNull(clientMode, "clientMode property not set.");
+ }
+
+ public enum ClientMode {
+ STANDALONE,
+ CLUSTER
+ }
+
+ public List<HostAndPort> getHostAndPorts() {
+ List<HostAndPort> hostAndPorts = Lists.newArrayList();;
+ Preconditions.checkNotNull(redisHosts, "redisHosts property not set.");
+ String[] hosts = StringUtils.split(redisHosts, ",");
+ for (String host : hosts) {
+ HostAndPort hostAndPort = HostAndPort.fromString(host);
+ hostAndPorts.add(hostAndPort);
+ }
+ return hostAndPorts;
+ }
+}
diff --git a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisSession.java b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisSession.java
new file mode 100644
index 0000000..a7d0228
--- /dev/null
+++ b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisSession.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.redis;
+
+import com.google.common.collect.Lists;
+import com.google.common.net.HostAndPort;
+import io.lettuce.core.AbstractRedisClient;
+import io.lettuce.core.ClientOptions;
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.RedisURI;
+import io.lettuce.core.SocketOptions;
+import io.lettuce.core.api.StatefulConnection;
+import io.lettuce.core.api.StatefulRedisConnection;
+import io.lettuce.core.cluster.ClusterClientOptions;
+import io.lettuce.core.cluster.RedisClusterClient;
+import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
+import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
+import io.lettuce.core.codec.ByteArrayCodec;
+import io.lettuce.core.codec.RedisCodec;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.io.redis.RedisAbstractConfig.ClientMode;
+import org.apache.pulsar.io.redis.sink.RedisSinkConfig;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+
+public class RedisSession {
+
+ private final AbstractRedisClient client;
+ private final StatefulConnection connection;
+ private final RedisClusterAsyncCommands<byte[], byte[]> asyncCommands;
+
+ public RedisSession(AbstractRedisClient client, StatefulConnection connection, RedisClusterAsyncCommands<byte[], byte[]> asyncCommands) {
+ this.client = client;
+ this.connection = connection;
+ this.asyncCommands = asyncCommands;
+ }
+
+ public AbstractRedisClient client() {
+ return this.client;
+ }
+
+ public StatefulConnection connection() {
+ return this.connection;
+ }
+
+ public RedisClusterAsyncCommands<byte[], byte[]> asyncCommands() {
+ return this.asyncCommands;
+ }
+
+ public void close() throws Exception {
+ if (null != this.connection) {
+ this.connection.close();
+ }
+ if (null != this.client) {
+ this.client.shutdown();
+ }
+ }
+
+ public static RedisSession create(RedisSinkConfig config) {
+ RedisSession redisSession;
+ final RedisCodec<byte[], byte[]> codec = new ByteArrayCodec();
+
+ final SocketOptions socketOptions = SocketOptions.builder()
+ .tcpNoDelay(config.isTcpNoDelay())
+ .connectTimeout(Duration.ofMillis(config.getConnectTimeout()))
+ .keepAlive(config.isKeepAlive())
+ .build();
+
+ final ClientMode clientMode;
+ try {
+ clientMode = ClientMode.valueOf(config.getClientMode().toUpperCase());
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Illegal Redis client mode, valid values are: "
+ + Arrays.asList(ClientMode.values()));
+ }
+
+ List<RedisURI> redisURIs = redisURIs(config.getHostAndPorts(), config);
+
+ if (clientMode == ClientMode.STANDALONE) {
+ ClientOptions.Builder clientOptions = ClientOptions.builder()
+ .socketOptions(socketOptions)
+ .requestQueueSize(config.getRequestQueue())
+ .autoReconnect(config.isAutoReconnect());
+
+ final RedisClient client = RedisClient.create(redisURIs.get(0));
+ client.setOptions(clientOptions.build());
+ final StatefulRedisConnection<byte[], byte[]> connection = client.connect(codec);
+ redisSession = new RedisSession(client, connection, connection.async());
+ } else if (clientMode == ClientMode.CLUSTER) {
+ ClusterClientOptions.Builder clientOptions = ClusterClientOptions.builder()
+ .requestQueueSize(config.getRequestQueue())
+ .autoReconnect(config.isAutoReconnect());
+
+ final RedisClusterClient client = RedisClusterClient.create(redisURIs);
+ client.setOptions(clientOptions.build());
+
+ final StatefulRedisClusterConnection<byte[], byte[]> connection = client.connect(codec);
+ redisSession = new RedisSession(client, connection, connection.async());
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("%s is not supported", config.getClientMode())
+ );
+ }
+
+ return redisSession;
+ }
+
+ private static List<RedisURI> redisURIs(List<HostAndPort> hostAndPorts, RedisSinkConfig config) {
+ List<RedisURI> redisURIs = Lists.newArrayList();
+ for (HostAndPort hostAndPort : hostAndPorts) {
+ RedisURI.Builder builder = RedisURI.builder();
+ builder.withHost(hostAndPort.getHost());
+ builder.withPort(hostAndPort.getPort());
+ builder.withDatabase(config.getRedisDatabase());
+ if (!StringUtils.isBlank(config.getRedisPassword())) {
+ builder.withPassword(config.getRedisPassword());
+ }
+ redisURIs.add(builder.build());
+ }
+ return redisURIs;
+ }
+}
diff --git a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java
new file mode 100644
index 0000000..d27448b
--- /dev/null
+++ b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.redis.sink;
+
+import com.google.common.collect.Lists;
+import io.lettuce.core.RedisFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+import org.apache.pulsar.io.redis.RedisSession;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A Simple Redis sink, which stores the key/value records from Pulsar in redis.
+ * Note that records from Pulsar with null keys or values will be ignored.
+ * This class expects records from Pulsar to have a key and value that are stored as bytes or a string.
+ */
+@Connector(
+ name = "redis",
+ type = IOType.SINK,
+ help = "A sink connector is used for moving messages from Pulsar to Redis.",
+ configClass = RedisSinkConfig.class
+)
+@Slf4j
+public class RedisSink<T> implements Sink<T> {
+
+ private RedisSinkConfig redisSinkConfig;
+
+ private RedisSession redisSession;
+
+ private long batchTimeMs;
+
+ private long operationTimeoutMs;
+
+ private int batchSize;
+
+ private List<Record<T>> incomingList;
+
+ private ScheduledExecutorService flushExecutor;
+
+ @Override
+ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+ log.info("Open Redis Sink");
+
+ redisSinkConfig = RedisSinkConfig.load(config);
+ redisSinkConfig.validate();
+
+ redisSession = RedisSession.create(redisSinkConfig);
+
+ operationTimeoutMs = redisSinkConfig.getOperationTimeout();
+
+ batchTimeMs = redisSinkConfig.getBatchTimeMs();
+ batchSize = redisSinkConfig.getBatchSize();
+ incomingList = Lists.newArrayList();
+ flushExecutor = Executors.newScheduledThreadPool(1);
+ flushExecutor.scheduleAtFixedRate(() -> flush(), batchTimeMs, batchTimeMs, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void write(Record<T> record) throws Exception {
+ int currentSize;
+ synchronized (this) {
+ incomingList.add(record);
+ currentSize = incomingList.size();
+ }
+ if (currentSize == batchSize) {
+ flushExecutor.submit(() -> flush());
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (null != redisSession) {
+ redisSession.close();
+ }
+
+ if (null != flushExecutor) {
+ flushExecutor.shutdown();
+ }
+ }
+
+ private void flush() {
+ final Map<byte[], byte[]> recordsToSet = new ConcurrentHashMap<>();
+ final List<Record<T>> recordsToFlush;
+
+ synchronized (this) {
+ if (incomingList.isEmpty()) {
+ return;
+ }
+ recordsToFlush = incomingList;
+ incomingList = Lists.newArrayList();
+ }
+
+ if (CollectionUtils.isNotEmpty(recordsToFlush)) {
+ for (Record<T> record: recordsToFlush) {
+ try {
+ // records with null keys or values will be ignored
+ byte[] key = toBytes("key", record.getKey().orElse(null));
+ byte[] value = toBytes("value", record.getValue());
+ recordsToSet.put(key, value);
+ } catch (Exception e) {
+ record.fail();
+ recordsToFlush.remove(record);
+ log.warn("Record flush thread was exception ", e);
+ }
+ }
+ }
+
+ try {
+ if (recordsToSet.size() > 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("Calling mset with {} values", recordsToSet.size());
+ }
+
+ RedisFuture<?> future = redisSession.asyncCommands().mset(recordsToSet);
+
+ if (!future.await(operationTimeoutMs, TimeUnit.MILLISECONDS) || future.getError() != null) {
+ log.warn("Operation failed with error {} or timeout {} is exceeded", future.getError(), operationTimeoutMs);
+ recordsToFlush.forEach(tRecord -> tRecord.fail());
+ return;
+ }
+ }
+ recordsToFlush.forEach(tRecord -> tRecord.ack());
+ recordsToSet.clear();
+ recordsToFlush.clear();
+ } catch (InterruptedException e) {
+ recordsToFlush.forEach(tRecord -> tRecord.fail());
+ log.error("Redis mset data interrupted exception ", e);
+ }
+ }
+
+ private byte[] toBytes(String src, Object obj) {
+ final byte[] result;
+ if (obj instanceof String) {
+ String s = (String) obj;
+ result = s.getBytes(StandardCharsets.UTF_8);
+ } else if (obj instanceof byte[]) {
+ result = (byte[]) obj;
+ } else if (null == obj) {
+ result = null;
+ } else {
+ throw new IllegalArgumentException(String.format("The %s for the record must be String or Bytes.", src));
+ }
+ return result;
+ }
+}
diff --git a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSinkConfig.java b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSinkConfig.java
new file mode 100644
index 0000000..4ff3c3b
--- /dev/null
+++ b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSinkConfig.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.redis.sink;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+import org.apache.pulsar.io.redis.RedisAbstractConfig;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode(callSuper = false)
+@ToString
+@Accessors(chain = true)
+public class RedisSinkConfig extends RedisAbstractConfig implements Serializable {
+
+ private static final long serialVersionUID = 4686456460365805717L;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "10000L",
+ help = "The amount of time in milliseconds before an operation is marked as timed out")
+ private long operationTimeout = 10000L;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "1000L",
+ help = "The Redis operation time in milliseconds")
+ private long batchTimeMs = 1000L;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "200",
+ help = "The batch size of write to Redis database"
+ )
+ private int batchSize = 200;
+
+ public static RedisSinkConfig load(String yamlFile) throws IOException {
+ ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ return mapper.readValue(new File(yamlFile), RedisSinkConfig.class);
+ }
+
+ public static RedisSinkConfig load(Map<String, Object> map) throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(new ObjectMapper().writeValueAsString(map), RedisSinkConfig.class);
+ }
+
+ @Override
+ public void validate() {
+ super.validate();
+ Preconditions.checkArgument(operationTimeout > 0, "operationTimeout must be a positive long.");
+ Preconditions.checkArgument(batchTimeMs > 0, "batchTimeMs must be a positive long.");
+ Preconditions.checkArgument(batchSize > 0, "batchSize must be a positive integer.");
+ }
+}
diff --git a/pulsar-io/redis/src/main/resources/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/redis/src/main/resources/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000..39a8629
--- /dev/null
+++ b/pulsar-io/redis/src/main/resources/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+name: redis
+description: Writes data into Redis
+sinkClass: org.apache.pulsar.io.redis.sink.RedisSink
diff --git a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/EmbeddedRedisUtils.java b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/EmbeddedRedisUtils.java
new file mode 100644
index 0000000..1bd5295
--- /dev/null
+++ b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/EmbeddedRedisUtils.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.redis;
+
+import lombok.extern.slf4j.Slf4j;
+import redis.embedded.RedisExecProvider;
+import redis.embedded.RedisServer;
+import redis.embedded.util.Architecture;
+import redis.embedded.util.OS;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+@Slf4j
+public final class EmbeddedRedisUtils {
+
+ private final Path dbPath;
+ private final RedisServer redisServer;
+
+ public EmbeddedRedisUtils(String testId) {
+ dbPath = Paths.get(testId + "/redis");
+ String execFile = "redis-server-2.8.19";
+ RedisExecProvider customProvider = RedisExecProvider
+ .defaultProvider()
+ .override(OS.UNIX, Architecture.x86_64, execFile);
+ redisServer = RedisServer.builder()
+ .redisExecProvider(customProvider)
+ .port(6379)
+ .slaveOf("localhost", 6378)
+ .setting("daemonize no")
+ .setting("appendonly no")
+ .build();
+ }
+
+ public void setUp() throws IOException {
+ redisServer.start();
+ Files.deleteIfExists(dbPath);
+ }
+
+ public void tearDown() throws IOException {
+ redisServer.stop();
+ Files.deleteIfExists(dbPath);
+ }
+
+}
diff --git a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkConfigTest.java b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkConfigTest.java
new file mode 100644
index 0000000..5726bbf
--- /dev/null
+++ b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkConfigTest.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.redis.sink;
+
+import org.apache.pulsar.io.redis.RedisAbstractConfig;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+/**
+ * RedisSinkConfig test
+ */
+public class RedisSinkConfigTest {
+
+ @Test
+ public final void loadFromYamlFileTest() throws IOException {
+ File yamlFile = getFile("sinkConfig.yaml");
+ String path = yamlFile.getAbsolutePath();
+ RedisSinkConfig config = RedisSinkConfig.load(path);
+ assertNotNull(config);
+ assertEquals("localhost:6379", config.getRedisHosts());
+ assertEquals("fake@123", config.getRedisPassword());
+ assertEquals(Integer.parseInt("1"), config.getRedisDatabase());
+ assertEquals("Standalone", config.getClientMode());
+ assertEquals(Long.parseLong("2000"), config.getOperationTimeout());
+ assertEquals(Integer.parseInt("100"), config.getBatchSize());
+ assertEquals(Long.parseLong("1000"), config.getBatchTimeMs());
+ assertEquals(Long.parseLong("3000"), config.getConnectTimeout());
+ }
+
+ @Test
+ public final void loadFromMapTest() throws IOException {
+ Map<String, Object> map = new HashMap<String, Object>();
+ map.put("redisHosts", "localhost:6379");
+ map.put("redisPassword", "fake@123");
+ map.put("redisDatabase", "1");
+ map.put("clientMode", "Standalone");
+ map.put("operationTimeout", "2000");
+ map.put("batchSize", "100");
+ map.put("batchTimeMs", "1000");
+ map.put("connectTimeout", "3000");
+
+ RedisSinkConfig config = RedisSinkConfig.load(map);
+ assertNotNull(config);
+ assertEquals("localhost:6379", config.getRedisHosts());
+ assertEquals("fake@123", config.getRedisPassword());
+ assertEquals(Integer.parseInt("1"), config.getRedisDatabase());
+ assertEquals("Standalone", config.getClientMode());
+ assertEquals(Long.parseLong("2000"), config.getOperationTimeout());
+ assertEquals(Integer.parseInt("100"), config.getBatchSize());
+ assertEquals(Long.parseLong("1000"), config.getBatchTimeMs());
+ assertEquals(Long.parseLong("3000"), config.getConnectTimeout());
+ }
+
+ @Test
+ public final void validValidateTest() throws IOException {
+ Map<String, Object> map = new HashMap<String, Object>();
+ map.put("redisHosts", "localhost:6379");
+ map.put("redisPassword", "fake@123");
+ map.put("redisDatabase", "1");
+ map.put("clientMode", "Standalone");
+ map.put("operationTimeout", "2000");
+ map.put("batchSize", "100");
+ map.put("batchTimeMs", "1000");
+ map.put("connectTimeout", "3000");
+
+ RedisSinkConfig config = RedisSinkConfig.load(map);
+ config.validate();
+ }
+
+ @Test(expectedExceptions = NullPointerException.class,
+ expectedExceptionsMessageRegExp = "redisHosts property not set.")
+ public final void missingValidValidateTableNameTest() throws IOException {
+ Map<String, Object> map = new HashMap<String, Object>();
+ map.put("redisPassword", "fake@123");
+ map.put("redisDatabase", "1");
+ map.put("clientMode", "Standalone");
+ map.put("operationTimeout", "2000");
+ map.put("batchSize", "100");
+ map.put("batchTimeMs", "1000");
+ map.put("connectTimeout", "3000");
+
+ RedisSinkConfig config = RedisSinkConfig.load(map);
+ config.validate();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "batchTimeMs must be a positive long.")
+ public final void invalidBatchTimeMsTest() throws IOException {
+ Map<String, Object> map = new HashMap<String, Object>();
+ map.put("redisHosts", "localhost:6379");
+ map.put("redisPassword", "fake@123");
+ map.put("redisDatabase", "1");
+ map.put("clientMode", "Standalone");
+ map.put("operationTimeout", "2000");
+ map.put("batchSize", "100");
+ map.put("batchTimeMs", "-100");
+ map.put("connectTimeout", "3000");
+
+ RedisSinkConfig config = RedisSinkConfig.load(map);
+ config.validate();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "No enum constant org.apache.pulsar.io.redis.RedisAbstractConfig.ClientMode.NOTSUPPORT")
+ public final void invalidClientModeTest() throws IOException {
+ Map<String, Object> map = new HashMap<String, Object>();
+ map.put("redisHosts", "localhost:6379");
+ map.put("redisPassword", "fake@123");
+ map.put("redisDatabase", "1");
+ map.put("clientMode", "NotSupport");
+ map.put("operationTimeout", "2000");
+ map.put("batchSize", "100");
+ map.put("batchTimeMs", "1000");
+ map.put("connectTimeout", "3000");
+
+ RedisSinkConfig config = RedisSinkConfig.load(map);
+ config.validate();
+
+ RedisAbstractConfig.ClientMode.valueOf(config.getClientMode().toUpperCase());
+ }
+
+ private File getFile(String name) {
+ ClassLoader classLoader = getClass().getClassLoader();
+ return new File(classLoader.getResource(name).getFile());
+ }
+}
diff --git a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
new file mode 100644
index 0000000..455ef89
--- /dev/null
+++ b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.redis.sink;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.source.PulsarRecord;
+import org.apache.pulsar.io.redis.EmbeddedRedisUtils;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * redis Sink test
+ */
+@Slf4j
+public class RedisSinkTest {
+
+ private EmbeddedRedisUtils embeddedRedisUtils;
+
+ /**
+ * A Simple class to test redis class
+ */
+ @Data
+ @ToString
+ @EqualsAndHashCode
+ public static class Foo {
+ private String field1;
+ private String field2;
+ }
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ embeddedRedisUtils = new EmbeddedRedisUtils(getClass().getSimpleName());
+ embeddedRedisUtils.setUp();
+ }
+
+ @AfterMethod
+ public void tearDown() throws Exception {
+ embeddedRedisUtils.tearDown();
+ }
+
+ @Test
+ public void TestOpenAndWriteSink() throws Exception {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put("redisHosts", "localhost:6379");
+ configs.put("redisPassword", "");
+ configs.put("redisDatabase", "1");
+ configs.put("clientMode", "Standalone");
+ configs.put("operationTimeout", "3000");
+ configs.put("batchSize", "10");
+
+ RedisSink sink = new RedisSink();
+
+ // prepare a foo Record
+ Foo obj = new Foo();
+ obj.setField1("FakeFiled1");
+ obj.setField2("FakeFiled1");
+ AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
+
+ byte[] bytes = schema.encode(obj);
+ ByteBuf payload = Unpooled.copiedBuffer(bytes);
+ AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
+ autoConsumeSchema.setSchema(GenericSchemaImpl.of(schema.getSchemaInfo()));
+
+ Message<GenericRecord> message = new MessageImpl("fake_topic_name", "77:777", configs, payload, autoConsumeSchema);
+ Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
+ .message(message)
+ .topicName("fake_topic_name")
+ .build();
+
+ log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
+ obj.toString(),
+ message.getValue().toString(),
+ record.getValue().toString());
+
+ // open should success
+ sink.open(configs, null);
+
+ // write should success.
+ sink.write(record);
+ log.info("executed write");
+
+ // sleep to wait backend flush complete
+ Thread.sleep(1000);
+
+ }
+}
diff --git a/pulsar-io/redis/src/test/resources/sinkConfig.yaml b/pulsar-io/redis/src/test/resources/sinkConfig.yaml
new file mode 100644
index 0000000..4cc1990
--- /dev/null
+++ b/pulsar-io/redis/src/test/resources/sinkConfig.yaml
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+{
+"redisHosts": "localhost:6379",
+"redisPassword": "fake@123",
+"redisDatabase": "1",
+"clientMode": "Standalone",
+"operationTimeout": "2000",
+"batchSize": "100",
+"batchTimeMs": "1000",
+"connectTimeout": "3000"
+}