You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/06/05 08:40:14 UTC
[3/3] camel git commit: Simplefied camel-spring-redis
Simplefied camel-spring-redis
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3232a4a4
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3232a4a4
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3232a4a4
Branch: refs/heads/master
Commit: 3232a4a4df0bdc92f062456a1a4bd4f8df347bbe
Parents: 6eab973
Author: Arno Noordover <an...@users.noreply.github.com>
Authored: Sat Jun 4 14:10:07 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Jun 5 09:30:50 2016 +0200
----------------------------------------------------------------------
.../component/redis/CommandDispatcher.java | 483 -------------------
.../component/redis/ExchangeConverter.java | 145 ++++++
.../camel/component/redis/RedisClient.java | 43 +-
.../camel/component/redis/RedisComponent.java | 6 +
.../camel/component/redis/RedisConsumer.java | 2 +-
.../camel/component/redis/RedisEndpoint.java | 10 +-
.../camel/component/redis/RedisProducer.java | 288 +++++++++--
.../camel/component/redis/RedisHashTest.java | 10 +-
.../camel/component/redis/RedisKeyTest.java | 6 +-
.../camel/component/redis/RedisListTest.java | 2 +-
.../camel/component/redis/RedisSetTest.java | 22 +-
.../component/redis/RedisSortedSetTest.java | 14 +-
.../camel/component/redis/RedisStringTest.java | 8 +-
.../component/redis/RedisTransactionTest.java | 2 +-
14 files changed, 470 insertions(+), 571 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/3232a4a4/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/CommandDispatcher.java
----------------------------------------------------------------------
diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/CommandDispatcher.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/CommandDispatcher.java
deleted file mode 100755
index 0b56ba5..0000000
--- a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/CommandDispatcher.java
+++ /dev/null
@@ -1,483 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.redis;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.RuntimeExchangeException;
-
-class CommandDispatcher {
- private final RedisConfiguration configuration;
-
- CommandDispatcher(RedisConfiguration configuration) {
- this.configuration = configuration;
- }
-
- // CHECKSTYLE:OFF
- public void execute(final RedisClient redisClient, Exchange exchange) {
- final Command command = determineCommand(exchange);
- switch (command) {
-
- case PING:
- setResult(exchange, redisClient.ping());
- break;
- case SET:
- redisClient.set(getKey(exchange), getValue(exchange));
- break;
- case GET:
- setResult(exchange, redisClient.get(getKey(exchange)));
- break;
- case QUIT:
- redisClient.quit();
- break;
- case EXISTS:
- setResult(exchange, redisClient.exists(getKey(exchange)));
- break;
- case DEL:
- redisClient.del(getKeys(exchange));
- break;
- case TYPE:
- setResult(exchange, redisClient.type(getKey(exchange)));
- break;
- case KEYS:
- setResult(exchange, redisClient.keys(getPattern(exchange)));
- break;
- case RANDOMKEY:
- setResult(exchange, redisClient.randomkey());
- break;
- case RENAME:
- redisClient.rename(getKey(exchange), getStringValue(exchange));
- break;
- case RENAMENX:
- setResult(exchange, redisClient.renamenx(getKey(exchange), getStringValue(exchange)));
- break;
- case EXPIRE:
- setResult(exchange, redisClient.expire(getKey(exchange), getTimeout(exchange)));
- break;
- case EXPIREAT:
- setResult(exchange, redisClient.expireat(getKey(exchange), getTimestamp(exchange)));
- break;
- case PEXPIRE:
- setResult(exchange, redisClient.pexpire(getKey(exchange), getTimeout(exchange)));
- break;
- case PEXPIREAT:
- setResult(exchange, redisClient.pexpireat(getKey(exchange), getTimestamp(exchange)));
- break;
- case TTL:
- setResult(exchange, redisClient.ttl(getKey(exchange)));
- break;
- case MOVE:
- setResult(exchange, redisClient.move(getKey(exchange), getDb(exchange)));
- break;
- case GETSET:
- setResult(exchange, redisClient.getset(getKey(exchange), getValue(exchange)));
- break;
- case MGET:
- setResult(exchange, redisClient.mget(getFields(exchange)));
- break;
- case SETNX:
- setResult(exchange, redisClient.setnx(getKey(exchange), getValue(exchange)));
- break;
- case SETEX:
- redisClient.setex(getKey(exchange), getValue(exchange), getTimeout(exchange), TimeUnit.SECONDS);
- break;
- case MSET:
- redisClient.mset(getValuesAsMap(exchange));
- break;
- case MSETNX:
- redisClient.msetnx(getValuesAsMap(exchange));
- break;
- case DECRBY:
- setResult(exchange, redisClient.decrby(getKey(exchange), getLongValue(exchange)));
- break;
- case DECR:
- setResult(exchange, redisClient.decr(getKey(exchange)));
- break;
- case INCRBY:
- setResult(exchange, redisClient.incrby(getKey(exchange), getLongValue(exchange)));
- break;
- case INCR:
- setResult(exchange, redisClient.incr(getKey(exchange)));
- break;
- case APPEND:
- setResult(exchange, redisClient.append(getKey(exchange), getStringValue(exchange)));
- break;
- case HSET:
- redisClient.hset(getKey(exchange), getField(exchange), getValue(exchange));
- break;
- case HGET:
- setResult(exchange, redisClient.hget(getKey(exchange), getField(exchange)));
- break;
- case HSETNX:
- setResult(exchange, redisClient.hsetnx(getKey(exchange), getField(exchange), getValue(exchange)));
- break;
- case HMSET:
- redisClient.hmset(getKey(exchange), getValuesAsMap(exchange));
- break;
- case HMGET:
- setResult(exchange, redisClient.hmget(getKey(exchange), getFields(exchange)));
- break;
- case HINCRBY:
- setResult(exchange, redisClient.hincrBy(getKey(exchange), getField(exchange), getValueAsLong(exchange)));
- break;
- case HEXISTS:
- setResult(exchange, redisClient.hexists(getKey(exchange), getField(exchange)));
- break;
- case HDEL:
- redisClient.hdel(getKey(exchange), getField(exchange));
- break;
- case HLEN:
- setResult(exchange, redisClient.hlen(getKey(exchange)));
- break;
- case HKEYS:
- setResult(exchange, redisClient.hkeys(getKey(exchange)));
- break;
- case HVALS:
- setResult(exchange, redisClient.hvals(getKey(exchange)));
- break;
- case HGETALL:
- setResult(exchange, redisClient.hgetAll(getKey(exchange)));
- break;
- case RPUSH:
- setResult(exchange, redisClient.rpush(getKey(exchange), getValue(exchange)));
- break;
- case LPUSH:
- setResult(exchange, redisClient.lpush(getKey(exchange), getValue(exchange)));
- break;
- case LLEN:
- setResult(exchange, redisClient.llen(getKey(exchange)));
- break;
- case LRANGE:
- setResult(exchange, redisClient.lrange(getKey(exchange), getStart(exchange), getEnd(exchange)));
- break;
- case LTRIM:
- redisClient.ltrim(getKey(exchange), getStart(exchange), getEnd(exchange));
- break;
- case LINDEX:
- setResult(exchange, redisClient.lindex(getKey(exchange), getIndex(exchange)));
- break;
- case LSET:
- redisClient.lset(getKey(exchange), getValue(exchange), getIndex(exchange));
- break;
- case LREM:
- setResult(exchange, redisClient.lrem(getKey(exchange), getValue(exchange), getCount(exchange)));
- break;
- case LPOP:
- setResult(exchange, redisClient.lpop(getKey(exchange)));
- break;
- case RPOP:
- setResult(exchange, redisClient.rpop(getKey(exchange)));
- break;
- case RPOPLPUSH:
- setResult(exchange, redisClient.rpoplpush(getKey(exchange), getDestination(exchange)));
- break;
- case SADD:
- setResult(exchange, redisClient.sadd(getKey(exchange), getValue(exchange)));
- break;
- case SMEMBERS:
- setResult(exchange, redisClient.smembers(getKey(exchange)));
- break;
- case SREM:
- setResult(exchange, redisClient.srem(getKey(exchange), getValue(exchange)));
- break;
- case SPOP:
- setResult(exchange, redisClient.spop(getKey(exchange)));
- break;
- case SMOVE:
- setResult(exchange, redisClient.smove(getKey(exchange), getValue(exchange), getDestination(exchange)));
- break;
- case SCARD:
- setResult(exchange, redisClient.scard(getKey(exchange)));
- break;
- case SISMEMBER:
- setResult(exchange, redisClient.sismember(getKey(exchange), getValue(exchange)));
- break;
- case SINTER:
- setResult(exchange, redisClient.sinter(getKey(exchange), getKeys(exchange)));
- break;
- case SINTERSTORE:
- redisClient.sinterstore(getKey(exchange), getKeys(exchange), getDestination(exchange));
- break;
- case SUNION:
- setResult(exchange, redisClient.sunion(getKey(exchange), getKeys(exchange)));
- break;
- case SUNIONSTORE:
- redisClient.sunionstore(getKey(exchange), getKeys(exchange), getDestination(exchange));
- break;
- case SDIFF:
- setResult(exchange, redisClient.sdiff(getKey(exchange), getKeys(exchange)));
- break;
- case SDIFFSTORE:
- redisClient.sdiffstore(getKey(exchange), getKeys(exchange), getDestination(exchange));
- break;
- case SRANDMEMBER:
- setResult(exchange, redisClient.srandmember(getKey(exchange)));
- break;
- case ZADD:
- setResult(exchange, redisClient.zadd(getKey(exchange), getValue(exchange), getScore(exchange)));
- break;
- case ZRANGE:
- setResult(exchange, redisClient.zrange(getKey(exchange), getStart(exchange), getEnd(exchange), getWithScore(exchange)));
- break;
- case ZREM:
- setResult(exchange, redisClient.zrem(getKey(exchange), getValue(exchange)));
- break;
- case ZINCRBY:
- setResult(exchange, redisClient.zincrby(getKey(exchange), getValue(exchange), getIncrement(exchange)));
- break;
- case ZRANK:
- setResult(exchange, redisClient.zrank(getKey(exchange), getValue(exchange)));
- break;
- case ZREVRANK:
- setResult(exchange, redisClient.zrevrank(getKey(exchange), getValue(exchange)));
- break;
- case ZREVRANGE:
- setResult(exchange, redisClient.zrevrange(getKey(exchange), getStart(exchange), getEnd(exchange), getWithScore(exchange)));
- break;
- case ZCARD:
- setResult(exchange, redisClient.zcard(getKey(exchange)));
- break;
- case MULTI:
- redisClient.multi();
- break;
- case DISCARD:
- redisClient.discard();
- break;
- case EXEC:
- redisClient.exec();
- break;
- case WATCH:
- redisClient.watch(getKeys(exchange));
- break;
- case UNWATCH:
- redisClient.unwatch();
- break;
- case SORT:
- setResult(exchange, redisClient.sort(getKey(exchange)));
- break;
- case BLPOP:
- setResult(exchange, redisClient.blpop(getKey(exchange), getTimeout(exchange)));
- break;
- case BRPOP:
- setResult(exchange, redisClient.brpop(getKey(exchange), getTimeout(exchange)));
- break;
- case PUBLISH:
- redisClient.publish(getChannel(exchange), getMessage(exchange));
- break;
- case ZCOUNT:
- setResult(exchange, redisClient.zcount(getKey(exchange), getMin(exchange), getMax(exchange)));
- break;
- case ZRANGEBYSCORE:
- setResult(exchange, redisClient.zrangebyscore(getKey(exchange), getMin(exchange), getMax(exchange)));
- break;
- case ZREVRANGEBYSCORE:
- setResult(exchange, redisClient.zrevrangebyscore(getKey(exchange), getMin(exchange), getMax(exchange)));
- break;
- case ZREMRANGEBYRANK:
- redisClient.zremrangebyrank(getKey(exchange), getStart(exchange), getEnd(exchange));
- break;
- case ZREMRANGEBYSCORE:
- redisClient.zremrangebyscore(getKey(exchange), getStart(exchange), getEnd(exchange));
- break;
- case ZUNIONSTORE:
- redisClient.zunionstore(getKey(exchange), getKeys(exchange), getDestination(exchange));
- break;
- case ZINTERSTORE:
- redisClient.zinterstore(getKey(exchange), getKeys(exchange), getDestination(exchange));
- break;
- case STRLEN:
- setResult(exchange, redisClient.strlen(getKey(exchange)));
- break;
- case PERSIST:
- setResult(exchange, redisClient.persist(getKey(exchange)));
- break;
- case RPUSHX:
- setResult(exchange, redisClient.rpushx(getKey(exchange), getValue(exchange)));
- break;
- case ECHO:
- setResult(exchange, redisClient.echo(getStringValue(exchange)));
- break;
- case LINSERT:
- setResult(exchange, redisClient.linsert(getKey(exchange), getValue(exchange), getPivot(exchange), getPosition(exchange)));
- break;
- case BRPOPLPUSH:
- setResult(exchange, redisClient.brpoplpush(getKey(exchange), getDestination(exchange), getTimeout(exchange)));
- break;
- case SETBIT:
- redisClient.setbit(getKey(exchange), getOffset(exchange), getBooleanValue(exchange));
- break;
- case GETBIT:
- setResult(exchange, redisClient.getbit(getKey(exchange), getOffset(exchange)));
- break;
- case SETRANGE:
- redisClient.setex(getKey(exchange), getValue(exchange), getOffset(exchange));
- break;
- case GETRANGE:
- setResult(exchange, redisClient.getrange(getKey(exchange), getStart(exchange), getEnd(exchange)));
- break;
- default:
- throw new RuntimeExchangeException("Unsupported command: " + command, exchange);
- }
- }
- // CHECKSTYLE:ON
-
- private Command determineCommand(Exchange exchange) {
- Command command = exchange.getIn().getHeader(RedisConstants.COMMAND, Command.class);
- if (command == null) {
- command = configuration.getCommand();
- }
- if (command == null) {
- command = Command.SET;
- }
- return command;
- }
-
- private static <T> T getInHeaderValue(Exchange exchange, String key, Class<T> aClass) {
- return exchange.getIn().getHeader(key, aClass);
- }
-
- private void setResult(Exchange exchange, Object result) {
- Message message;
- if (exchange.getPattern().isOutCapable()) {
- message = exchange.getOut();
- message.copyFrom(exchange.getIn());
- } else {
- message = exchange.getIn();
- }
- message.setBody(result);
- }
-
- public String getDestination(Exchange exchange) {
- return getInHeaderValue(exchange, RedisConstants.DESTINATION, String.class);
- }
-
- private String getChannel(Exchange exchange) {
- return getInHeaderValue(exchange, RedisConstants.CHANNEL, String.class);
- }
-
- private Object getMessage(Exchange exchange) {
- return getInHeaderValue(exchange, RedisConstants.MESSAGE, Object.class);
- }
-
- public Long getIndex(Exchange exchange) {
- return getInHeaderValue(exchange, RedisConstants.INDEX, Long.class);
- }
-
- public String getPivot(Exchange exchange) {
- return getInHeaderValue(exchange, RedisConstants.PIVOT, String.class);
- }
-
- public String getPosition(Exchange exchange) {
- return getInHeaderValue(exchange, RedisConstants.POSITION, String.class);
- }
-
- public Long getCount(Exchange exchange) {
- return getInHeaderValue(exchange, RedisConstants.COUNT, Long.class);
- }
-
- private Long getStart(Exchange exchange) {
- return getInHeaderValue(exchange, RedisConstants.START, Long.class);
- }
-
- private Long getEnd(Exchange exchange) {
- return getInHeaderValue(exchange, RedisConstants.END, Long.class);
- }
-
- private Long getTimeout(Exchange exchange) {
- return getInHeaderValue(exchange, RedisConstants.TIMEOUT, Long.class);
- }
-
- private Long getOffset(Exchange exchange) {
- return getInHeaderValue(exchange, RedisConstants.OFFSET, Long.class);
- }
-
- private Long getValueAsLong(Exchange exchange) {
- return getInHeaderValue(exchange, RedisConstants.VALUE, Long.class);
- }
-
- private Collection<String> getFields(Exchange exchange) {
- return getInHeaderValue(exchange, RedisConstants.FIELDS, Collection.class);
- }
-
- private Map<String, Object> getValuesAsMap(Exchange exchange) {
- return getInHeaderValue(exchange, RedisConstants.VALUES, Map.class);
- }
-
- private String getKey(Exchange exchange) {
- return getInHeaderValue(exchange, RedisConstants.KEY, String.class);
- }
-
- public Collection<String> getKeys(Exchange exchange) {
- return getInHeaderValue(exchange, RedisConstants.KEYS, Collection.class);
- }
-
- private Object getValue(Exchange exchange) {
- return getInHeaderValue(exchange, RedisConstants.VALUE, Object.class);
- }
-
- private String getStringValue(Exchange exchange) {
- return getInHeaderValue(exchange, RedisConstants.VALUE, String.class);
- }
-
- private Long getLongValue(Exchange exchange) {
- return getInHeaderValue(exchange, RedisConstants.VALUE, Long.class);
- }
-
- private Boolean getBooleanValue(Exchange exchange) {
- return getInHeaderValue(exchange, RedisConstants.VALUE, Boolean.class);
- }
-
- private String getField(Exchange exchange) {
- return getInHeaderValue(exchange, RedisConstants.FIELD, String.class);
- }
-
- public Long getTimestamp(Exchange exchange) {
- return getInHeaderValue(exchange, RedisConstants.TIMESTAMP, Long.class);
- }
-
- public String getPattern(Exchange exchange) {
- return getInHeaderValue(exchange, RedisConstants.PATTERN, String.class);
- }
-
- public Integer getDb(Exchange exchange) {
- return getInHeaderValue(exchange, RedisConstants.DB, Integer.class);
- }
-
- public Double getScore(Exchange exchange) {
- return getInHeaderValue(exchange, RedisConstants.SCORE, Double.class);
- }
-
- public Double getMin(Exchange exchange) {
- return getInHeaderValue(exchange, RedisConstants.MIN, Double.class);
- }
-
- public Double getMax(Exchange exchange) {
- return getInHeaderValue(exchange, RedisConstants.MAX, Double.class);
- }
-
- public Double getIncrement(Exchange exchange) {
- return getInHeaderValue(exchange, RedisConstants.INCREMENT, Double.class);
- }
-
- public Boolean getWithScore(Exchange exchange) {
- return getInHeaderValue(exchange, RedisConstants.WITHSCORE, Boolean.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/3232a4a4/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/ExchangeConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/ExchangeConverter.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/ExchangeConverter.java
new file mode 100644
index 0000000..cdae518
--- /dev/null
+++ b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/ExchangeConverter.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.redis;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+
+class ExchangeConverter {
+ String getKey(Exchange exchange) {
+ return getInHeaderValue(exchange, RedisConstants.KEY, String.class);
+ }
+
+ String getStringValue(Exchange exchange) {
+ return getInHeaderValue(exchange, RedisConstants.VALUE, String.class);
+ }
+
+ Long getLongValue(Exchange exchange) {
+ return getInHeaderValue(exchange, RedisConstants.VALUE, Long.class);
+ }
+
+ String getDestination(Exchange exchange) {
+ return getInHeaderValue(exchange, RedisConstants.DESTINATION, String.class);
+ }
+
+ String getChannel(Exchange exchange) {
+ return getInHeaderValue(exchange, RedisConstants.CHANNEL, String.class);
+ }
+
+ Object getMessage(Exchange exchange) {
+ return getInHeaderValue(exchange, RedisConstants.MESSAGE, Object.class);
+ }
+
+ Long getIndex(Exchange exchange) {
+ return getInHeaderValue(exchange, RedisConstants.INDEX, Long.class);
+ }
+
+ String getPivot(Exchange exchange) {
+ return getInHeaderValue(exchange, RedisConstants.PIVOT, String.class);
+ }
+
+ String getPosition(Exchange exchange) {
+ return getInHeaderValue(exchange, RedisConstants.POSITION, String.class);
+ }
+
+ Long getCount(Exchange exchange) {
+ return getInHeaderValue(exchange, RedisConstants.COUNT, Long.class);
+ }
+
+ Long getStart(Exchange exchange) {
+ return getInHeaderValue(exchange, RedisConstants.START, Long.class);
+ }
+
+ Long getEnd(Exchange exchange) {
+ return getInHeaderValue(exchange, RedisConstants.END, Long.class);
+ }
+
+ Long getTimeout(Exchange exchange) {
+ return getInHeaderValue(exchange, RedisConstants.TIMEOUT, Long.class);
+ }
+
+ Long getOffset(Exchange exchange) {
+ return getInHeaderValue(exchange, RedisConstants.OFFSET, Long.class);
+ }
+
+ Long getValueAsLong(Exchange exchange) {
+ return getInHeaderValue(exchange, RedisConstants.VALUE, Long.class);
+ }
+
+ Collection<String> getFields(Exchange exchange) {
+ return getInHeaderValue(exchange, RedisConstants.FIELDS, Collection.class);
+ }
+
+ Map<String, Object> getValuesAsMap(Exchange exchange) {
+ return getInHeaderValue(exchange, RedisConstants.VALUES, Map.class);
+ }
+
+ Collection<String> getKeys(Exchange exchange) {
+ return getInHeaderValue(exchange, RedisConstants.KEYS, Collection.class);
+ }
+
+ Object getValue(Exchange exchange) {
+ return getInHeaderValue(exchange, RedisConstants.VALUE, Object.class);
+ }
+
+ Boolean getBooleanValue(Exchange exchange) {
+ return getInHeaderValue(exchange, RedisConstants.VALUE, Boolean.class);
+ }
+
+ String getField(Exchange exchange) {
+ return getInHeaderValue(exchange, RedisConstants.FIELD, String.class);
+ }
+
+ Long getTimestamp(Exchange exchange) {
+ return getInHeaderValue(exchange, RedisConstants.TIMESTAMP, Long.class);
+ }
+
+ String getPattern(Exchange exchange) {
+ return getInHeaderValue(exchange, RedisConstants.PATTERN, String.class);
+ }
+
+ Integer getDb(Exchange exchange) {
+ return getInHeaderValue(exchange, RedisConstants.DB, Integer.class);
+ }
+
+ Double getScore(Exchange exchange) {
+ return getInHeaderValue(exchange, RedisConstants.SCORE, Double.class);
+ }
+
+ Double getMin(Exchange exchange) {
+ return getInHeaderValue(exchange, RedisConstants.MIN, Double.class);
+ }
+
+ Double getMax(Exchange exchange) {
+ return getInHeaderValue(exchange, RedisConstants.MAX, Double.class);
+ }
+
+ Double getIncrement(Exchange exchange) {
+ return getInHeaderValue(exchange, RedisConstants.INCREMENT, Double.class);
+ }
+
+ Boolean getWithScore(Exchange exchange) {
+ return getInHeaderValue(exchange, RedisConstants.WITHSCORE, Boolean.class);
+ }
+
+ private static <T> T getInHeaderValue(Exchange exchange, String key, Class<T> aClass) {
+ return exchange.getIn().getHeader(key, aClass);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/3232a4a4/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisClient.java
----------------------------------------------------------------------
diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisClient.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisClient.java
index 3613d6a..5a06e42 100755
--- a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisClient.java
+++ b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisClient.java
@@ -82,12 +82,9 @@ public class RedisClient {
}
public void quit() {
- redisTemplate.execute(new RedisCallback<Object>() {
- @Override
- public Object doInRedis(RedisConnection connection) throws DataAccessException {
- connection.close();
- return null;
- }
+ redisTemplate.execute((RedisCallback<Object>) connection -> {
+ connection.close();
+ return null;
});
}
@@ -156,21 +153,15 @@ public class RedisClient {
}
public void setbit(final String key, final Long offset, final Boolean value) {
- redisTemplate.execute(new RedisCallback<Object>() {
- @Override
- public Object doInRedis(RedisConnection connection) throws DataAccessException {
- connection.setBit(key.getBytes(), offset, value);
- return null;
- }
+ redisTemplate.execute((RedisCallback<Object>) connection -> {
+ connection.setBit(key.getBytes(), offset, value);
+ return null;
});
}
public Boolean getbit(final String key, final Long offset) {
- return redisTemplate.execute(new RedisCallback<Boolean>() {
- @Override
- public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
- return connection.getBit(key.getBytes(), offset);
- }
+ return redisTemplate.execute((RedisCallback<Boolean>) connection -> {
+ return connection.getBit(key.getBytes(), offset);
});
}
@@ -255,20 +246,14 @@ public class RedisClient {
}
public String echo(final String value) {
- return redisTemplate.execute(new RedisCallback<String>() {
- @Override
- public String doInRedis(RedisConnection connection) throws DataAccessException {
- return new String(connection.echo(value.getBytes()));
- }
+ return redisTemplate.execute((RedisCallback<String>) connection -> {
+ return new String(connection.echo(value.getBytes()));
});
}
public String ping() {
- return redisTemplate.execute(new RedisCallback<String>() {
- @Override
- public String doInRedis(RedisConnection connection) throws DataAccessException {
- return connection.ping();
- }
+ return redisTemplate.execute((RedisCallback<String>) connection -> {
+ return connection.ping();
});
}
@@ -346,6 +331,10 @@ public class RedisClient {
return redisTemplate.opsForList().leftPush(key, value);
}
+ public Long lpushx(String key, Object value) {
+ return redisTemplate.opsForList().leftPushIfPresent(key, value);
+ }
+
public void del(Collection<String> keys) {
redisTemplate.delete(keys);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/3232a4a4/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisComponent.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisComponent.java
index 568dda3..dca4fd7 100755
--- a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisComponent.java
+++ b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisComponent.java
@@ -26,6 +26,8 @@ import org.apache.camel.impl.UriEndpointComponent;
*/
public class RedisComponent extends UriEndpointComponent {
+ private final ExchangeConverter exchangeConverter = new ExchangeConverter();
+
public RedisComponent() {
super(RedisEndpoint.class);
}
@@ -48,4 +50,8 @@ public class RedisComponent extends UriEndpointComponent {
configuration.setPort(Integer.parseInt(hostAndPort[1]));
}
}
+
+ public ExchangeConverter getExchangeConverter() {
+ return exchangeConverter;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/3232a4a4/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisConsumer.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisConsumer.java
index 84d2338..e0fe095 100755
--- a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisConsumer.java
+++ b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisConsumer.java
@@ -48,7 +48,7 @@ public class RedisConsumer extends DefaultConsumer implements MessageListener {
private Collection<Topic> toTopics(String channels) {
String[] channelsArrays = channels.split(",");
- List<Topic> topics = new ArrayList<Topic>();
+ List<Topic> topics = new ArrayList<>();
for (String channel : channelsArrays) {
String name = channel.trim();
if (Command.PSUBSCRIBE.equals(redisConfiguration.getCommand())) {
http://git-wip-us.apache.org/repos/asf/camel/blob/3232a4a4/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisEndpoint.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisEndpoint.java
index d3a8294..aaa2d33 100755
--- a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisEndpoint.java
+++ b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisEndpoint.java
@@ -38,7 +38,15 @@ public class RedisEndpoint extends DefaultEndpoint {
}
public Producer createProducer() throws Exception {
- return new RedisProducer(this, configuration);
+ Command defaultCommand = configuration.getCommand();
+ if (defaultCommand == null) {
+ defaultCommand = Command.SET;
+ }
+ return new RedisProducer(this,
+ new RedisClient(configuration.getRedisTemplate()),
+ RedisConstants.COMMAND,
+ defaultCommand.name(),
+ ((RedisComponent)getComponent()).getExchangeConverter());
}
public Consumer createConsumer(Processor processor) throws Exception {
http://git-wip-us.apache.org/repos/asf/camel/blob/3232a4a4/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisProducer.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisProducer.java
index bfadab5..3df1ccb 100755
--- a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisProducer.java
+++ b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisProducer.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -16,39 +16,273 @@
*/
package org.apache.camel.component.redis;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
-import org.apache.camel.impl.DefaultProducer;
-import org.apache.camel.util.URISupport;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.HeaderSelectorProducer;
-/**
- * The Redis producer.
- */
-public class RedisProducer extends DefaultProducer {
- private final RedisClient redisClient;
- private final CommandDispatcher commandDispatcher;
-
- private transient String redisProducerToString;
-
- public RedisProducer(RedisEndpoint endpoint, RedisConfiguration configuration) {
- super(endpoint);
- this.redisClient = new RedisClient(configuration.getRedisTemplate());
- this.commandDispatcher = new CommandDispatcher(configuration);
- }
+final class RedisProducer extends HeaderSelectorProducer {
+ private final Map<String, Processor> processors = new HashMap<>();
+
+ public RedisProducer(Endpoint endpoint,
+ RedisClient redisClient,
+ String header,
+ String defaultHeaderValue,
+ ExchangeConverter exchangeConverter) {
+ super(endpoint, header, defaultHeaderValue);
+ //bind key commands
+ bind(Command.APPEND, wrap(exchange -> redisClient.append(exchangeConverter.getKey(exchange),
+ exchangeConverter.getStringValue(exchange))));
+ bind(Command.DECR, wrap(exchange -> redisClient.decr(exchangeConverter.getKey(exchange))));
+ bind(Command.DECRBY, wrap(exchange -> redisClient.decrby(exchangeConverter.getKey(exchange),
+ exchangeConverter.getLongValue(exchange))));
+ bind(Command.GET, wrap(exchange -> redisClient.get(exchangeConverter.getKey(exchange))));
+ bind(Command.GETBIT, wrap(exchange -> redisClient.getbit(exchangeConverter.getKey(exchange),
+ exchangeConverter.getOffset(exchange))));
+ bind(Command.GETRANGE, wrap(exchange -> redisClient.getrange(exchangeConverter.getKey(exchange),
+ exchangeConverter.getStart(exchange),
+ exchangeConverter.getEnd(exchange))));
+ bind(Command.GETSET, wrap(exchange -> redisClient.getset(exchangeConverter.getKey(exchange),
+ exchangeConverter.getValue(exchange))));
+ bind(Command.INCR, wrap(exchange -> redisClient.incr(exchangeConverter.getKey(exchange))));
+ bind(Command.INCRBY, wrap(exchange -> redisClient.incrby(exchangeConverter.getKey(exchange),
+ exchangeConverter.getLongValue(exchange))));
+ bind(Command.MGET, wrap(exchange -> redisClient.mget(exchangeConverter.getFields(exchange))));
+ bind(Command.MSET, exchange -> redisClient.mset(exchangeConverter.getValuesAsMap(exchange)));
+ bind(Command.MSETNX, exchange -> redisClient.msetnx(exchangeConverter.getValuesAsMap(exchange)));
+ bind(Command.SET, exchange -> redisClient.set(exchangeConverter.getKey(exchange),
+ exchangeConverter.getValue(exchange)));
+ bind(Command.SETBIT, exchange -> redisClient.setbit(exchangeConverter.getKey(exchange),
+ exchangeConverter.getOffset(exchange),
+ exchangeConverter.getBooleanValue(exchange)));
+ bind(Command.SETEX, exchange -> redisClient.setex(exchangeConverter.getKey(exchange),
+ exchangeConverter.getValue(exchange),
+ exchangeConverter.getTimeout(exchange),
+ TimeUnit.SECONDS));
+ bind(Command.SETNX, wrap(exchange -> redisClient.setnx(exchangeConverter.getKey(exchange),
+ exchangeConverter.getValue(exchange))));
+ bind(Command.SETRANGE, exchange -> redisClient.setex(exchangeConverter.getKey(exchange),
+ exchangeConverter.getValue(exchange),
+ exchangeConverter.getOffset(exchange)));
+ bind(Command.STRLEN, wrap(exchange -> redisClient.strlen(exchangeConverter.getKey(exchange))));
+ //missing bitcount, bitfield, bitop, bitpos, incrbyfloat, psetex
+
+ //bind sorted set commands
+ bind(Command.ZADD, wrap(exchange -> redisClient.zadd(exchangeConverter.getKey(exchange),
+ exchangeConverter.getValue(exchange),
+ exchangeConverter.getScore(exchange))));
+ bind(Command.ZCARD, wrap(exchange -> redisClient.zcard(exchangeConverter.getKey(exchange))));
+ bind(Command.ZCOUNT, wrap(exchange -> redisClient.zcount(exchangeConverter.getKey(exchange),
+ exchangeConverter.getMin(exchange),
+ exchangeConverter.getMax(exchange))));
+ bind(Command.ZINCRBY, wrap(exchange -> redisClient.zincrby(exchangeConverter.getKey(exchange),
+ exchangeConverter.getValue(exchange),
+ exchangeConverter.getIncrement(exchange))));
+ bind(Command.ZINTERSTORE, exchange -> redisClient.zinterstore(exchangeConverter.getKey(exchange),
+ exchangeConverter.getKeys(exchange),
+ exchangeConverter.getDestination(exchange)));
+ bind(Command.ZRANGE, wrap(exchange -> redisClient.zrange(exchangeConverter.getKey(exchange),
+ exchangeConverter.getStart(exchange),
+ exchangeConverter.getEnd(exchange),
+ exchangeConverter.getWithScore(exchange))));
+ bind(Command.ZRANGEBYSCORE, wrap(exchange -> redisClient.zrangebyscore(exchangeConverter.getKey(exchange),
+ exchangeConverter.getMin(exchange),
+ exchangeConverter.getMax(exchange))));
+ bind(Command.ZRANK, wrap(exchange -> redisClient.zrank(exchangeConverter.getKey(exchange),
+ exchangeConverter.getValue(exchange))));
+ bind(Command.ZREM, wrap(exchange -> redisClient.zrem(exchangeConverter.getKey(exchange),
+ exchangeConverter.getValue(exchange))));
+ bind(Command.ZREMRANGEBYRANK, exchange -> redisClient.zremrangebyrank(exchangeConverter.getKey(exchange),
+ exchangeConverter.getStart(exchange),
+ exchangeConverter.getEnd(exchange)));
+ bind(Command.ZREMRANGEBYSCORE, exchange -> redisClient.zremrangebyscore(exchangeConverter.getKey(exchange),
+ exchangeConverter.getStart(exchange),
+ exchangeConverter.getEnd(exchange)));
+ bind(Command.ZREVRANGE, wrap(exchange -> redisClient.zrevrange(exchangeConverter.getKey(exchange),
+ exchangeConverter.getStart(exchange),
+ exchangeConverter.getEnd(exchange),
+ exchangeConverter.getWithScore(exchange))));
+ bind(Command.ZREVRANGEBYSCORE, wrap(exchange -> redisClient.zrevrangebyscore(exchangeConverter.getKey(exchange),
+ exchangeConverter.getMin(exchange),
+ exchangeConverter.getMax(exchange))));
+ bind(Command.ZREVRANK, wrap(exchange -> redisClient.zrevrank(exchangeConverter.getKey(exchange),
+ exchangeConverter.getValue(exchange))));
+ bind(Command.ZUNIONSTORE, exchange -> redisClient.zunionstore(exchangeConverter.getKey(exchange),
+ exchangeConverter.getKeys(exchange),
+ exchangeConverter.getDestination(exchange)));
+ //missing zlexcount, zrangebylex, zrevrangebylex, zremrangebylex, zscore, zscan
+
+ //bind sets commands
+ bind(Command.SADD, wrap(exchange -> redisClient.sadd(exchangeConverter.getKey(exchange),
+ exchangeConverter.getValue(exchange))));
+ bind(Command.SCARD, wrap(exchange -> redisClient.scard(exchangeConverter.getKey(exchange))));
+ bind(Command.SDIFF, wrap(exchange -> redisClient.sdiff(exchangeConverter.getKey(exchange),
+ exchangeConverter.getKeys(exchange))));
+ bind(Command.SDIFFSTORE, exchange -> redisClient.sdiffstore(exchangeConverter.getKey(exchange),
+ exchangeConverter.getKeys(exchange),
+ exchangeConverter.getDestination(exchange)));
+ bind(Command.SINTER, wrap(exchange -> redisClient.sinter(exchangeConverter.getKey(exchange),
+ exchangeConverter.getKeys(exchange))));
+ bind(Command.SINTERSTORE, exchange -> redisClient.sinterstore(exchangeConverter.getKey(exchange),
+ exchangeConverter.getKeys(exchange),
+ exchangeConverter.getDestination(exchange)));
+ bind(Command.SISMEMBER, wrap(exchange -> redisClient.sismember(exchangeConverter.getKey(exchange),
+ exchangeConverter.getValue(exchange))));
+ bind(Command.SMEMBERS, wrap(exchange -> redisClient.smembers(exchangeConverter.getKey(exchange))));
+ bind(Command.SMOVE, wrap(exchange -> redisClient.smove(exchangeConverter.getKey(exchange),
+ exchangeConverter.getValue(exchange),
+ exchangeConverter.getDestination(exchange))));
+ bind(Command.SPOP, wrap(exchange -> redisClient.spop(exchangeConverter.getKey(exchange))));
+ bind(Command.SRANDMEMBER, wrap(exchange -> redisClient.srandmember(exchangeConverter.getKey(exchange))));
+ bind(Command.SREM, wrap(exchange -> redisClient.srem(exchangeConverter.getKey(exchange),
+ exchangeConverter.getValue(exchange))));
+ bind(Command.SUNION, wrap(exchange -> redisClient.sunion(exchangeConverter.getKey(exchange),
+ exchangeConverter.getKeys(exchange))));
+ bind(Command.SUNIONSTORE, exchange -> redisClient.sunionstore(exchangeConverter.getKey(exchange),
+ exchangeConverter.getKeys(exchange),
+ exchangeConverter.getDestination(exchange)));
+ //missing command sscan
+
+ //bind pubsub commands
+ bind(Command.PUBLISH, exchange -> redisClient.publish(exchangeConverter.getChannel(exchange),
+ exchangeConverter.getMessage(exchange)));
+ //missing psubscribe, pubsub, punsubscribe, subscribe, unsubscribe
+ //psubscribe, subscribe are used in consumer
+
+ //create list commands
+ bind(Command.BLPOP, wrap(exchange -> redisClient.blpop(exchangeConverter.getKey(exchange),
+ exchangeConverter.getTimeout(exchange))));
+ bind(Command.BRPOP, wrap(exchange -> redisClient.brpop(exchangeConverter.getKey(exchange),
+ exchangeConverter.getTimeout(exchange))));
+ bind(Command.BRPOPLPUSH, wrap(exchange -> redisClient.brpoplpush(exchangeConverter.getKey(exchange),
+ exchangeConverter.getDestination(exchange),
+ exchangeConverter.getTimeout(exchange))));
+ bind(Command.LINDEX, wrap(exchange -> redisClient.lindex(exchangeConverter.getKey(exchange),
+ exchangeConverter.getIndex(exchange))));
+ bind(Command.LINSERT, wrap(exchange -> redisClient.linsert(exchangeConverter.getKey(exchange),
+ exchangeConverter.getValue(exchange),
+ exchangeConverter.getPivot(exchange),
+ exchangeConverter.getPosition(exchange))));
+ bind(Command.LLEN, wrap(exchange -> redisClient.llen(exchangeConverter.getKey(exchange))));
+ bind(Command.LPOP, wrap(exchange -> redisClient.lpop(exchangeConverter.getKey(exchange))));
+ bind(Command.LPUSH, wrap(exchange -> redisClient.lpush(exchangeConverter.getKey(exchange),
+ exchangeConverter.getValue(exchange))));
+ //nieuwe actie
+ bind(Command.LPUSHX, wrap(exchange -> redisClient.lpushx(exchangeConverter.getKey(exchange),
+ exchangeConverter.getValue(exchange))));
+ bind(Command.LRANGE, wrap(exchange -> redisClient.lrange(exchangeConverter.getKey(exchange),
+ exchangeConverter.getStart(exchange),
+ exchangeConverter.getEnd(exchange))));
+ bind(Command.LREM, wrap(exchange -> redisClient.lrem(exchangeConverter.getKey(exchange),
+ exchangeConverter.getValue(exchange),
+ exchangeConverter.getCount(exchange))));
+ bind(Command.LSET, exchange -> redisClient.lset(exchangeConverter.getKey(exchange),
+ exchangeConverter.getValue(exchange),
+ exchangeConverter.getIndex(exchange)));
+ bind(Command.LTRIM, exchange -> redisClient.ltrim(exchangeConverter.getKey(exchange),
+ exchangeConverter.getStart(exchange),
+ exchangeConverter.getEnd(exchange)));
+ bind(Command.RPOP, exchange -> setResult(exchange,
+ redisClient.rpop(exchangeConverter.getKey(exchange))));
+ bind(Command.RPOPLPUSH, wrap(exchange -> redisClient.rpoplpush(exchangeConverter.getKey(exchange),
+ exchangeConverter.getDestination(exchange))));
+ bind(Command.RPUSH, wrap(exchange -> redisClient.rpush(exchangeConverter.getKey(exchange),
+ exchangeConverter.getValue(exchange))));
+ bind(Command.RPUSHX, wrap(exchange -> redisClient.rpushx(exchangeConverter.getKey(exchange),
+ exchangeConverter.getValue(exchange))));
- public void process(final Exchange exchange) throws Exception {
- commandDispatcher.execute(redisClient, exchange);
+ //bind hashes commands
+ bind(Command.HDEL, exchange -> redisClient.hdel(exchangeConverter.getKey(exchange),
+ exchangeConverter.getField(exchange)));
+ bind(Command.HEXISTS, wrap(exchange -> redisClient.hexists(exchangeConverter.getKey(exchange),
+ exchangeConverter.getField(exchange))));
+ bind(Command.HGET, wrap(exchange -> redisClient.hget(exchangeConverter.getKey(exchange),
+ exchangeConverter.getField(exchange))));
+ bind(Command.HGETALL, wrap(exchange -> redisClient.hgetAll(exchangeConverter.getKey(exchange))));
+ bind(Command.HINCRBY, wrap(exchange -> redisClient.hincrBy(exchangeConverter.getKey(exchange),
+ exchangeConverter.getField(exchange),
+ exchangeConverter.getValueAsLong(exchange))));
+ bind(Command.HKEYS, wrap(exchange -> redisClient.hkeys(exchangeConverter.getKey(exchange))));
+ bind(Command.HLEN, wrap(exchange -> redisClient.hlen(exchangeConverter.getKey(exchange))));
+ bind(Command.HMGET, wrap(exchange -> redisClient.hmget(exchangeConverter.getKey(exchange),
+ exchangeConverter.getFields(exchange))));
+ bind(Command.HMSET, exchange -> redisClient.hmset(exchangeConverter.getKey(exchange),
+ exchangeConverter.getValuesAsMap(exchange)));
+ bind(Command.HSET, exchange -> redisClient.hset(exchangeConverter.getKey(exchange),
+ exchangeConverter.getField(exchange),
+ exchangeConverter.getValue(exchange)));
+ bind(Command.HSETNX, wrap(exchange -> redisClient.hsetnx(exchangeConverter.getKey(exchange),
+ exchangeConverter.getField(exchange),
+ exchangeConverter.getValue(exchange))));
+ bind(Command.HVALS, wrap(exchange -> redisClient.hvals(exchangeConverter.getKey(exchange))));
+ //missing: hincrbyfloat, hstrlen, hscan
+
+ //bind connection commands
+ bind(Command.ECHO, wrap(exchange -> redisClient.echo(exchangeConverter.getStringValue(exchange))));
+ bind(Command.PING, wrap(exchange -> redisClient.ping()));
+ bind(Command.QUIT, exchange -> redisClient.quit());
+
+ //bind key commands
+ bind(Command.DEL, exchange -> redisClient.del(exchangeConverter.getKeys(exchange)));
+ bind(Command.EXISTS, wrap(exchange -> redisClient.exists(exchangeConverter.getKey(exchange))));
+ bind(Command.EXPIRE, wrap(exchange -> redisClient.expire(exchangeConverter.getKey(exchange),
+ exchangeConverter.getTimeout(exchange))));
+ bind(Command.EXPIREAT, wrap(exchange -> redisClient.expireat(exchangeConverter.getKey(exchange),
+ exchangeConverter.getTimestamp(exchange))));
+ bind(Command.KEYS, wrap(exchange -> redisClient.keys(exchangeConverter.getPattern(exchange))));
+ bind(Command.MOVE, wrap(exchange -> redisClient.move(exchangeConverter.getKey(exchange),
+ exchangeConverter.getDb(exchange))));
+ bind(Command.PERSIST, wrap(exchange -> redisClient.persist(exchangeConverter.getKey(exchange))));
+ bind(Command.PEXPIRE, wrap(exchange -> redisClient.pexpire(exchangeConverter.getKey(exchange),
+ exchangeConverter.getTimeout(exchange))));
+ bind(Command.PEXPIREAT, wrap(exchange -> redisClient.pexpireat(exchangeConverter.getKey(exchange),
+ exchangeConverter.getTimestamp(exchange))));
+ bind(Command.RANDOMKEY, wrap(exchange -> redisClient.randomkey()));
+ bind(Command.RENAME, exchange -> redisClient.rename(exchangeConverter.getKey(exchange),
+ exchangeConverter.getStringValue(exchange)));
+ bind(Command.RENAMENX, wrap(exchange -> redisClient.renamenx(exchangeConverter.getKey(exchange),
+ exchangeConverter.getStringValue(exchange))));
+ bind(Command.SORT, wrap(exchange -> redisClient.sort(exchangeConverter.getKey(exchange))));
+ bind(Command.TTL, wrap(exchange -> redisClient.ttl(exchangeConverter.getKey(exchange))));
+ bind(Command.TYPE, wrap(exchange -> redisClient.type(exchangeConverter.getKey(exchange))));
+ //missing: dump, migrate, object, pttl, restore, wait, scan
+
+ //bind transaction commands
+ bind(Command.DISCARD, exchange -> redisClient.discard());
+ bind(Command.EXEC, exchange -> redisClient.exec());
+ bind(Command.MULTI, exchange -> redisClient.multi());
+ bind(Command.WATCH, exchange -> redisClient.watch(exchangeConverter.getKeys(exchange)));
+ bind(Command.UNWATCH, exchange -> redisClient.unwatch());
}
- @Override
- public RedisEndpoint getEndpoint() {
- return (RedisEndpoint)super.getEndpoint();
+ private void bind(Command command, Processor processor) {
+ String cmd = command.name();
+ bind(cmd, processor);
}
- @Override
- public String toString() {
- if (redisProducerToString == null) {
- redisProducerToString = "RedisProducer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
+
+ private void setResult(Exchange exchange, Object result) {
+ Message message;
+ if (exchange.getPattern().isOutCapable()) {
+ message = exchange.getOut();
+ message.copyFrom(exchange.getIn());
+ } else {
+ message = exchange.getIn();
}
- return redisProducerToString;
+ message.setBody(result);
}
+
+ public Processor wrap(Function<Exchange, Object> supplier) {
+ return exchange -> {
+ Object result = supplier.apply(exchange);
+ setResult(exchange, result);
+ };
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/3232a4a4/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisHashTest.java
----------------------------------------------------------------------
diff --git a/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisHashTest.java b/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisHashTest.java
index 4d14205..da60087 100755
--- a/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisHashTest.java
+++ b/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisHashTest.java
@@ -96,7 +96,7 @@ public class RedisHashTest extends RedisTestSupport {
@Test
public void shouldExecuteHKEYS() throws Exception {
- Set<String> fields = new HashSet<String>(Arrays.asList(new String[] {"field1, field2"}));
+ Set<String> fields = new HashSet<>(Arrays.asList(new String[]{"field1, field2"}));
when(hashOperations.keys(anyString())).thenReturn(fields);
Object result = sendHeaders(
@@ -110,7 +110,7 @@ public class RedisHashTest extends RedisTestSupport {
@Test
public void shouldExecuteHMSET() throws Exception {
- Map<String, String> values = new HashMap<String, String>();
+ Map<String, String> values = new HashMap<>();
values.put("field1", "value1");
values.put("field2", "value");
@@ -124,7 +124,7 @@ public class RedisHashTest extends RedisTestSupport {
@Test
public void shouldExecuteHVALS() throws Exception {
- List<String> values = new ArrayList<String>();
+ List<String> values = new ArrayList<>();
values.add("val1");
values.add("val2");
@@ -192,7 +192,7 @@ public class RedisHashTest extends RedisTestSupport {
@Test
public void shouldExecuteHGETALL() throws Exception {
- HashMap<String, String> values = new HashMap<String, String>();
+ HashMap<String, String> values = new HashMap<>();
values.put("field1", "valu1");
when(hashOperations.entries(anyString())).thenReturn(values);
@@ -206,7 +206,7 @@ public class RedisHashTest extends RedisTestSupport {
@Test
public void shouldExecuteHMGET() throws Exception {
- List<String> fields = new ArrayList<String>();
+ List<String> fields = new ArrayList<>();
fields.add("field1");
when(hashOperations.multiGet(anyString(), anyCollection())).thenReturn(fields);
http://git-wip-us.apache.org/repos/asf/camel/blob/3232a4a4/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisKeyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisKeyTest.java b/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisKeyTest.java
index a8decce..98e301f 100755
--- a/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisKeyTest.java
+++ b/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisKeyTest.java
@@ -57,7 +57,7 @@ public class RedisKeyTest extends RedisTestSupport {
@Test
public void shouldExecuteDEL() throws Exception {
- Collection<String> keys = new HashSet<String>();
+ Collection<String> keys = new HashSet<>();
keys.add("key1");
keys.add("key2");
sendHeaders(
@@ -109,7 +109,7 @@ public class RedisKeyTest extends RedisTestSupport {
@Test
public void shouldExecuteKEYS() throws Exception {
- Set<String> keys = new HashSet<String>();
+ Set<String> keys = new HashSet<>();
keys.add("key1");
keys.add("key2");
when(redisTemplate.keys(anyString())).thenReturn(keys);
@@ -211,7 +211,7 @@ public class RedisKeyTest extends RedisTestSupport {
@Test
public void shouldExecuteSORT() throws Exception {
- List<Integer> list = new ArrayList<Integer>();
+ List<Integer> list = new ArrayList<>();
list.add(5);
when(redisTemplate.sort(any(SortQuery.class))).thenReturn(list);
http://git-wip-us.apache.org/repos/asf/camel/blob/3232a4a4/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisListTest.java
----------------------------------------------------------------------
diff --git a/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisListTest.java b/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisListTest.java
index 9ab6841..78bef4f 100755
--- a/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisListTest.java
+++ b/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisListTest.java
@@ -204,7 +204,7 @@ public class RedisListTest extends RedisTestSupport {
@Test
public void shouldExecuteLRANGE() throws Exception {
- List<String> values = new ArrayList<String>();
+ List<String> values = new ArrayList<>();
values.add("value");
when(listOperations.range(anyString(), anyLong(), anyLong()))
http://git-wip-us.apache.org/repos/asf/camel/blob/3232a4a4/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisSetTest.java
----------------------------------------------------------------------
diff --git a/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisSetTest.java b/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisSetTest.java
index a528c08..3323d0f 100755
--- a/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisSetTest.java
+++ b/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisSetTest.java
@@ -80,12 +80,12 @@ public class RedisSetTest extends RedisTestSupport {
@Test
public void shouldExecuteSDIFF() throws Exception {
- Set<String> difference = new HashSet<String>();
+ Set<String> difference = new HashSet<>();
difference.add("a");
difference.add("b");
when(setOperations.difference(anyString(), anyCollection())).thenReturn(difference);
- Set<String> keys = new HashSet<String>();
+ Set<String> keys = new HashSet<>();
keys.add("key2");
keys.add("key3");
Object result = sendHeaders(
@@ -99,7 +99,7 @@ public class RedisSetTest extends RedisTestSupport {
@Test
public void shouldExecuteSDIFFSTORE() throws Exception {
- Set<String> keys = new HashSet<String>();
+ Set<String> keys = new HashSet<>();
keys.add("key2");
keys.add("key3");
sendHeaders(
@@ -113,12 +113,12 @@ public class RedisSetTest extends RedisTestSupport {
@Test
public void shouldExecuteSINTER() throws Exception {
- Set<String> difference = new HashSet<String>();
+ Set<String> difference = new HashSet<>();
difference.add("a");
difference.add("b");
when(setOperations.intersect(anyString(), anyCollection())).thenReturn(difference);
- Set<String> keys = new HashSet<String>();
+ Set<String> keys = new HashSet<>();
keys.add("key2");
keys.add("key3");
Object result = sendHeaders(
@@ -132,7 +132,7 @@ public class RedisSetTest extends RedisTestSupport {
@Test
public void shouldExecuteSINTERSTORE() throws Exception {
- Set<String> keys = new HashSet<String>();
+ Set<String> keys = new HashSet<>();
keys.add("key2");
keys.add("key3");
sendHeaders(
@@ -159,7 +159,7 @@ public class RedisSetTest extends RedisTestSupport {
@Test
public void shouldExecuteSMEMBERS() throws Exception {
- Set<String> keys = new HashSet<String>();
+ Set<String> keys = new HashSet<>();
keys.add("key2");
keys.add("key3");
@@ -220,18 +220,18 @@ public class RedisSetTest extends RedisTestSupport {
RedisConstants.VALUE, "value");
verify(setOperations).remove("key", "value");
- assertEquals(Long.valueOf(1), result);
+ assertEquals(1L, result);
}
@Test
public void shouldExecuteSUNION() throws Exception {
- Set<String> resultKeys = new HashSet<String>();
+ Set<String> resultKeys = new HashSet<>();
resultKeys.add("key2");
resultKeys.add("key3");
when(setOperations.union(anyString(), anyCollection())).thenReturn(resultKeys);
- Set<String> keys = new HashSet<String>();
+ Set<String> keys = new HashSet<>();
keys.add("key2");
keys.add("key4");
@@ -246,7 +246,7 @@ public class RedisSetTest extends RedisTestSupport {
@Test
public void shouldExecuteSUNIONSTORE() throws Exception {
- Set<String> keys = new HashSet<String>();
+ Set<String> keys = new HashSet<>();
keys.add("key2");
keys.add("key4");
http://git-wip-us.apache.org/repos/asf/camel/blob/3232a4a4/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisSortedSetTest.java
----------------------------------------------------------------------
diff --git a/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisSortedSetTest.java b/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisSortedSetTest.java
index 5e209f8..01cb157 100755
--- a/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisSortedSetTest.java
+++ b/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisSortedSetTest.java
@@ -110,7 +110,7 @@ public class RedisSortedSetTest extends RedisTestSupport {
@Test
public void shouldExecuteZINTERSTORE() {
- Set<String> keys = new HashSet<String>();
+ Set<String> keys = new HashSet<>();
keys.add("key2");
keys.add("key3");
sendHeaders(
@@ -124,7 +124,7 @@ public class RedisSortedSetTest extends RedisTestSupport {
@Test
public void shouldExecuteZRANGE() {
- Set<String> keys = new HashSet<String>();
+ Set<String> keys = new HashSet<>();
keys.add("key2");
keys.add("key3");
when(zSetOperations.range(anyString(), anyLong(), anyLong())).thenReturn(keys);
@@ -158,7 +158,7 @@ public class RedisSortedSetTest extends RedisTestSupport {
@Test
public void shouldExecuteZRANGEBYSCORE() {
- Set<String> keys = new HashSet<String>();
+ Set<String> keys = new HashSet<>();
keys.add("key2");
keys.add("key3");
when(zSetOperations.rangeByScore(anyString(), anyDouble(), anyDouble())).thenReturn(keys);
@@ -197,7 +197,7 @@ public class RedisSortedSetTest extends RedisTestSupport {
RedisConstants.VALUE, "value");
verify(zSetOperations).remove("key", "value");
- assertEquals(Long.valueOf(1), result);
+ assertEquals(1L, result);
}
@@ -226,7 +226,7 @@ public class RedisSortedSetTest extends RedisTestSupport {
@Test
public void shouldExecuteZREVRANGE() {
- Set<String> keys = new HashSet<String>();
+ Set<String> keys = new HashSet<>();
keys.add("key2");
keys.add("key3");
when(zSetOperations.reverseRange(anyString(), anyLong(), anyLong())).thenReturn(keys);
@@ -260,7 +260,7 @@ public class RedisSortedSetTest extends RedisTestSupport {
@Test
public void shouldExecuteZREVRANGEBYSCORE() {
- Set<String> keys = new HashSet<String>();
+ Set<String> keys = new HashSet<>();
keys.add("key2");
keys.add("key3");
when(zSetOperations.reverseRangeByScore(anyString(), anyDouble(), anyDouble())).thenReturn(keys);
@@ -291,7 +291,7 @@ public class RedisSortedSetTest extends RedisTestSupport {
@Test
public void shouldExecuteZUNIONSTORE() {
- Set<String> keys = new HashSet<String>();
+ Set<String> keys = new HashSet<>();
keys.add("key2");
keys.add("key3");
sendHeaders(
http://git-wip-us.apache.org/repos/asf/camel/blob/3232a4a4/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisStringTest.java
----------------------------------------------------------------------
diff --git a/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisStringTest.java b/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisStringTest.java
index a68c3e5..274fe31 100755
--- a/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisStringTest.java
+++ b/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisStringTest.java
@@ -231,10 +231,10 @@ public class RedisStringTest extends RedisTestSupport {
@Test
public void shouldExecuteMGET() throws Exception {
- List<String> fields = new ArrayList<String>();
+ List<String> fields = new ArrayList<>();
fields.add("field1");
- List<String> values = new ArrayList<String>();
+ List<String> values = new ArrayList<>();
values.add("value1");
when(valueOperations.multiGet(fields)).thenReturn(values);
@@ -250,7 +250,7 @@ public class RedisStringTest extends RedisTestSupport {
@Test
public void shouldExecuteMSET() throws Exception {
- Map<String, String> values = new HashMap<String, String>();
+ Map<String, String> values = new HashMap<>();
values.put("field1", "valu1");
sendHeaders(
@@ -263,7 +263,7 @@ public class RedisStringTest extends RedisTestSupport {
@Test
public void shouldExecuteMSETNX() throws Exception {
- Map<String, String> values = new HashMap<String, String>();
+ Map<String, String> values = new HashMap<>();
values.put("field1", "valu1");
sendHeaders(
http://git-wip-us.apache.org/repos/asf/camel/blob/3232a4a4/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisTransactionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisTransactionTest.java b/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisTransactionTest.java
index b333563..64b1039 100755
--- a/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisTransactionTest.java
+++ b/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisTransactionTest.java
@@ -69,7 +69,7 @@ public class RedisTransactionTest extends RedisTestSupport {
@Test
public void shouldExecuteWATCH() throws Exception {
- List<String> keys = new ArrayList<String>();
+ List<String> keys = new ArrayList<>();
keys.add("key");
sendHeaders(