You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pa...@apache.org on 2015/04/01 19:49:33 UTC
[02/45] storm git commit: STORM-691 Add basic lookup / persist bolts
STORM-691 Add basic lookup / persist bolts
* Add Basic lookup / persist Bolts
** support data types : string, list, hash, set, sorted set, hyperloglog
* rename util package to common
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f7c0bf8a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f7c0bf8a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f7c0bf8a
Branch: refs/heads/nimbus-ha-branch
Commit: f7c0bf8a7c843c6e555ee982a85e3952d1c28b33
Parents: 64d7ac6
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Feb 28 22:29:32 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Feb 28 22:29:32 2015 +0900
----------------------------------------------------------------------
.../storm/redis/bolt/AbstractRedisBolt.java | 8 +-
.../storm/redis/bolt/RedisLookupBolt.java | 109 ++++++++++++++++++
.../apache/storm/redis/bolt/RedisStoreBolt.java | 97 ++++++++++++++++
.../redis/common/config/JedisClusterConfig.java | 82 +++++++++++++
.../redis/common/config/JedisPoolConfig.java | 97 ++++++++++++++++
.../common/container/JedisClusterContainer.java | 47 ++++++++
.../JedisCommandsContainerBuilder.java | 38 ++++++
.../JedisCommandsInstanceContainer.java | 25 ++++
.../redis/common/container/JedisContainer.java | 65 +++++++++++
.../common/mapper/RedisDataTypeDescription.java | 33 ++++++
.../redis/common/mapper/RedisLookupMapper.java | 40 +++++++
.../storm/redis/common/mapper/RedisMapper.java | 5 +
.../redis/common/mapper/RedisStoreMapper.java | 21 ++++
.../storm/redis/common/mapper/TupleMapper.java | 27 +++++
.../trident/mapper/TridentTupleMapper.java | 27 -----
.../trident/state/RedisClusterMapState.java | 2 +-
.../redis/trident/state/RedisClusterState.java | 2 +-
.../trident/state/RedisClusterStateQuerier.java | 10 +-
.../trident/state/RedisClusterStateUpdater.java | 10 +-
.../redis/trident/state/RedisMapState.java | 2 +-
.../storm/redis/trident/state/RedisState.java | 2 +-
.../redis/trident/state/RedisStateQuerier.java | 10 +-
.../state/RedisStateSetCountQuerier.java | 10 +-
.../trident/state/RedisStateSetUpdater.java | 10 +-
.../redis/trident/state/RedisStateUpdater.java | 10 +-
.../redis/util/config/JedisClusterConfig.java | 82 -------------
.../redis/util/config/JedisPoolConfig.java | 97 ----------------
.../util/container/JedisClusterContainer.java | 47 --------
.../JedisCommandsContainerBuilder.java | 38 ------
.../JedisCommandsInstanceContainer.java | 25 ----
.../redis/util/container/JedisContainer.java | 65 -----------
.../storm/redis/topology/LookupWordCount.java | 115 +++++++++++++------
.../redis/topology/PersistentWordCount.java | 46 +++++++-
.../storm/redis/topology/WordCounter.java | 19 ++-
.../redis/trident/WordCountTridentRedis.java | 7 +-
.../trident/WordCountTridentRedisCluster.java | 6 +-
.../WordCountTridentRedisClusterMap.java | 8 +-
.../redis/trident/WordCountTridentRedisMap.java | 9 +-
.../redis/trident/WordCountTupleMapper.java | 10 +-
39 files changed, 872 insertions(+), 491 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
index 0b2a7f3..158fcaa 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
@@ -20,10 +20,10 @@ package org.apache.storm.redis.bolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseRichBolt;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
-import org.apache.storm.redis.util.container.JedisCommandsContainerBuilder;
-import org.apache.storm.redis.util.container.JedisCommandsInstanceContainer;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.container.JedisCommandsContainerBuilder;
+import org.apache.storm.redis.common.container.JedisCommandsInstanceContainer;
import redis.clients.jedis.JedisCommands;
import java.util.Map;
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
new file mode 100644
index 0000000..c40e983
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.bolt;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import redis.clients.jedis.JedisCommands;
+
+import java.util.List;
+
+public class RedisLookupBolt extends AbstractRedisBolt {
+ private final RedisLookupMapper lookupMapper;
+ private final RedisDataTypeDescription.RedisDataType dataType;
+ private final String additionalKey;
+
+ public RedisLookupBolt(JedisPoolConfig config, RedisLookupMapper lookupMapper) {
+ super(config);
+
+ this.lookupMapper = lookupMapper;
+
+ RedisDataTypeDescription dataTypeDescription = lookupMapper.getDataTypeDescription();
+ this.dataType = dataTypeDescription.getDataType();
+ this.additionalKey = dataTypeDescription.getAdditionalKey();
+ }
+
+ public RedisLookupBolt(JedisClusterConfig config, RedisLookupMapper lookupMapper) {
+ super(config);
+
+ this.lookupMapper = lookupMapper;
+
+ RedisDataTypeDescription dataTypeDescription = lookupMapper.getDataTypeDescription();
+ this.dataType = dataTypeDescription.getDataType();
+ this.additionalKey = dataTypeDescription.getAdditionalKey();
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ String key = lookupMapper.getKeyFromTuple(input);
+ Object lookupValue = null;
+
+ JedisCommands jedisCommand = null;
+ try {
+ jedisCommand = getInstance();
+
+ switch (dataType) {
+ case STRING:
+ lookupValue = jedisCommand.get(key);
+ break;
+
+ case LIST:
+ lookupValue = jedisCommand.lpop(key);
+ break;
+
+ case HASH:
+ lookupValue = jedisCommand.hget(additionalKey, key);
+ break;
+
+ case SET:
+ lookupValue = jedisCommand.scard(key);
+ break;
+
+ case SORTED_SET:
+ lookupValue = jedisCommand.zscore(additionalKey, key);
+ break;
+
+ case HYPER_LOG_LOG:
+ lookupValue = jedisCommand.pfcount(key);
+ break;
+ }
+
+ List<Values> values = lookupMapper.toTuple(input, lookupValue);
+ for (Values value : values) {
+ collector.emit(input, value);
+ }
+
+ collector.ack(input);
+ } catch (Exception e) {
+ this.collector.reportError(e);
+ this.collector.fail(input);
+ } finally {
+ returnInstance(jedisCommand);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ lookupMapper.declareOutputFields(declarer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
new file mode 100644
index 0000000..5602c44
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.bolt;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
+import redis.clients.jedis.JedisCommands;
+
+public class RedisStoreBolt extends AbstractRedisBolt {
+ private final RedisStoreMapper storeMapper;
+ private final RedisDataTypeDescription.RedisDataType dataType;
+ private final String additionalKey;
+
+ public RedisStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) {
+ super(config);
+ this.storeMapper = storeMapper;
+
+ RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
+ this.dataType = dataTypeDescription.getDataType();
+ this.additionalKey = dataTypeDescription.getAdditionalKey();
+ }
+
+ public RedisStoreBolt(JedisClusterConfig config, RedisStoreMapper storeMapper) {
+ super(config);
+ this.storeMapper = storeMapper;
+
+ RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
+ this.dataType = dataTypeDescription.getDataType();
+ this.additionalKey = dataTypeDescription.getAdditionalKey();
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ String key = storeMapper.getKeyFromTuple(input);
+ String value = storeMapper.getValueFromTuple(input);
+
+ JedisCommands jedisCommand = null;
+ try {
+ jedisCommand = getInstance();
+
+ switch (dataType) {
+ case STRING:
+ jedisCommand.set(key, value);
+ break;
+
+ case LIST:
+ jedisCommand.rpush(key, value);
+ break;
+
+ case HASH:
+ jedisCommand.hset(additionalKey, key, value);
+ break;
+
+ case SET:
+ jedisCommand.sadd(key, value);
+ break;
+
+ case SORTED_SET:
+ jedisCommand.zadd(additionalKey, Double.valueOf(value), key);
+
+ case HYPER_LOG_LOG:
+ jedisCommand.pfadd(key, value);
+ break;
+ }
+
+ collector.ack(input);
+ } catch (Exception e) {
+ this.collector.reportError(e);
+ this.collector.fail(input);
+ } finally {
+ returnInstance(jedisCommand);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java
new file mode 100644
index 0000000..a13eced
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.config;
+
+import com.google.common.base.Preconditions;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+public class JedisClusterConfig implements Serializable {
+ private Set<InetSocketAddress> nodes;
+ private int timeout;
+ private int maxRedirections;
+
+ public JedisClusterConfig(Set<InetSocketAddress> nodes, int timeout, int maxRedirections) {
+ this.nodes = nodes;
+ this.timeout = timeout;
+ this.maxRedirections = maxRedirections;
+ }
+
+ public Set<HostAndPort> getNodes() {
+ Set<HostAndPort> ret = new HashSet<HostAndPort>();
+ for (InetSocketAddress node : nodes) {
+ ret.add(new HostAndPort(node.getHostName(), node.getPort()));
+ }
+ return ret;
+ }
+
+ public int getTimeout() {
+ return timeout;
+ }
+
+ public int getMaxRedirections() {
+ return maxRedirections;
+ }
+
+ public static class Builder {
+ private Set<InetSocketAddress> nodes;
+ private int timeout = Protocol.DEFAULT_TIMEOUT;
+ private int maxRedirections = 5;
+
+ public Builder setNodes(Set<InetSocketAddress> nodes) {
+ this.nodes = nodes;
+ return this;
+ }
+
+ public Builder setTimeout(int timeout) {
+ this.timeout = timeout;
+ return this;
+ }
+
+ public Builder setMaxRedirections(int maxRedirections) {
+ this.maxRedirections = maxRedirections;
+ return this;
+ }
+
+ public JedisClusterConfig build() {
+ Preconditions.checkNotNull(this.nodes, "Node information should be presented");
+
+ return new JedisClusterConfig(nodes, timeout, maxRedirections);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisPoolConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisPoolConfig.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisPoolConfig.java
new file mode 100644
index 0000000..cc5f6e4
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisPoolConfig.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.config;
+
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+
+public class JedisPoolConfig implements Serializable {
+ public static final String DEFAULT_HOST = "127.0.0.1";
+
+ private String host;
+ private int port;
+ private int timeout;
+ private int database;
+ private String password;
+
+ public JedisPoolConfig(String host, int port, int timeout, String password, int database) {
+ this.host = host;
+ this.port = port;
+ this.timeout = timeout;
+ this.database = database;
+ this.password = password;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public int getTimeout() {
+ return timeout;
+ }
+
+ public int getDatabase() {
+ return database;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public static class Builder {
+ private String host = DEFAULT_HOST;
+ private int port = Protocol.DEFAULT_PORT;
+ private int timeout = Protocol.DEFAULT_TIMEOUT;
+ private int database = Protocol.DEFAULT_DATABASE;
+ private String password;
+
+ public Builder setHost(String host) {
+ this.host = host;
+ return this;
+ }
+
+ public Builder setPort(int port) {
+ this.port = port;
+ return this;
+ }
+
+ public Builder setTimeout(int timeout) {
+ this.timeout = timeout;
+ return this;
+ }
+
+ public Builder setDatabase(int database) {
+ this.database = database;
+ return this;
+ }
+
+ public Builder setPassword(String password) {
+ this.password = password;
+ return this;
+ }
+
+ public JedisPoolConfig build() {
+ return new JedisPoolConfig(host, port, timeout, password, database);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java
new file mode 100644
index 0000000..a1ff19f
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.container;
+
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisCommands;
+
+import java.io.Closeable;
+
+public class JedisClusterContainer implements JedisCommandsInstanceContainer, Closeable {
+
+ private JedisCluster jedisCluster;
+
+ public JedisClusterContainer(JedisCluster jedisCluster) {
+ this.jedisCluster = jedisCluster;
+ }
+
+ @Override
+ public JedisCommands getInstance() {
+ return this.jedisCluster;
+ }
+
+ @Override
+ public void returnInstance(JedisCommands jedisCommands) {
+ // do nothing
+ }
+
+ @Override
+ public void close() {
+ this.jedisCluster.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java
new file mode 100644
index 0000000..a2f8c2e
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.container;
+
+import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+
+public class JedisCommandsContainerBuilder {
+
+ public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
+
+ public static JedisCommandsInstanceContainer build(JedisPoolConfig config) {
+ JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG, config.getHost(), config.getPort(), config.getTimeout(), config.getPassword(), config.getDatabase());
+ return new JedisContainer(jedisPool);
+ }
+
+ public static JedisCommandsInstanceContainer build(JedisClusterConfig config) {
+ JedisCluster jedisCluster = new JedisCluster(config.getNodes(), config.getTimeout(), config.getMaxRedirections(), DEFAULT_POOL_CONFIG);
+ return new JedisClusterContainer(jedisCluster);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java
new file mode 100644
index 0000000..9ec32b9
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.container;
+
+import redis.clients.jedis.JedisCommands;
+
+public interface JedisCommandsInstanceContainer {
+ JedisCommands getInstance();
+ void returnInstance(JedisCommands jedisCommands);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java
new file mode 100644
index 0000000..621c05b
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisCommands;
+import redis.clients.jedis.JedisPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public class JedisContainer implements JedisCommandsInstanceContainer, Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(JedisContainer.class);
+
+ private JedisPool jedisPool;
+
+ public JedisContainer(JedisPool jedisPool) {
+ this.jedisPool = jedisPool;
+ }
+
+ @Override
+ public JedisCommands getInstance() {
+ return jedisPool.getResource();
+ }
+
+ @Override
+ public void returnInstance(JedisCommands jedisCommands) {
+ if (jedisCommands == null) {
+ return;
+ }
+
+ try {
+ ((Closeable) jedisCommands).close();
+ } catch (IOException e) {
+ LOG.warn("Failed to close (return) instance to pool");
+ try {
+ jedisPool.returnBrokenResource((Jedis) jedisCommands);
+ } catch (Exception e2) {
+ LOG.error("Failed to discard instance from pool");
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ jedisPool.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisDataTypeDescription.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisDataTypeDescription.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisDataTypeDescription.java
new file mode 100644
index 0000000..d2a4af2
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisDataTypeDescription.java
@@ -0,0 +1,33 @@
+package org.apache.storm.redis.common.mapper;
+
+import java.io.Serializable;
+
+public class RedisDataTypeDescription implements Serializable {
+ public enum RedisDataType { STRING, HASH, LIST, SET, SORTED_SET, HYPER_LOG_LOG }
+
+ private RedisDataType dataType;
+ private String additionalKey;
+
+ public RedisDataTypeDescription(RedisDataType dataType) {
+ this(dataType, null);
+ }
+
+ public RedisDataTypeDescription(RedisDataType dataType, String additionalKey) {
+ this.dataType = dataType;
+ this.additionalKey = additionalKey;
+
+ if (dataType == RedisDataType.HASH || dataType == RedisDataType.SORTED_SET) {
+ if (additionalKey == null) {
+ throw new IllegalArgumentException("Hash and Sorted Set should have additional key");
+ }
+ }
+ }
+
+ public RedisDataType getDataType() {
+ return dataType;
+ }
+
+ public String getAdditionalKey() {
+ return additionalKey;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java
new file mode 100644
index 0000000..880aea1
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.mapper;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.ITuple;
+import backtype.storm.tuple.Values;
+
+import java.util.List;
+
+public interface RedisLookupMapper extends TupleMapper, RedisMapper {
+ /**
+ * Converts return value from Redis to a list of storm values that can be emitted.
+ * @param input the input tuple.
+ * @param value Redis query response value. Can be String, Boolean, Long regarding of data type.
+ * @return a List of storm values that can be emitted. Each item in list is emitted as an output tuple.
+ */
+ public List<Values> toTuple(ITuple input, Object value);
+
+ /**
+ * declare what are the fields that this code will output.
+ * @param declarer
+ */
+ void declareOutputFields(OutputFieldsDeclarer declarer);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisMapper.java
new file mode 100644
index 0000000..d19acaa
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisMapper.java
@@ -0,0 +1,5 @@
+package org.apache.storm.redis.common.mapper;
+
+public interface RedisMapper {
+ public RedisDataTypeDescription getDataTypeDescription();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisStoreMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisStoreMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisStoreMapper.java
new file mode 100644
index 0000000..b3d7adf
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisStoreMapper.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.mapper;
+
+public interface RedisStoreMapper extends TupleMapper, RedisMapper {
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java
new file mode 100644
index 0000000..86664b8
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.mapper;
+
+import backtype.storm.tuple.ITuple;
+
+import java.io.Serializable;
+
+public interface TupleMapper extends Serializable {
+ public String getKeyFromTuple(ITuple tuple);
+ public String getValueFromTuple(ITuple tuple);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/mapper/TridentTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/mapper/TridentTupleMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/mapper/TridentTupleMapper.java
deleted file mode 100644
index 4c10143..0000000
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/mapper/TridentTupleMapper.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.redis.trident.mapper;
-
-import storm.trident.tuple.TridentTuple;
-
-import java.io.Serializable;
-
-public interface TridentTupleMapper extends Serializable {
- public String getKeyFromTridentTuple(TridentTuple tuple);
- public String getValueFromTridentTuple(TridentTuple tuple);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
index 24c1df1..1154376 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
@@ -23,7 +23,7 @@ import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCluster;
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
index 493ffdd..d74e838 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
@@ -18,7 +18,7 @@
package org.apache.storm.redis.trident.state;
import backtype.storm.task.IMetricsContext;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCluster;
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
index e0207e2..17614a1 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
@@ -19,7 +19,7 @@ package org.apache.storm.redis.trident.state;
import backtype.storm.tuple.Values;
import com.google.common.collect.Lists;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCluster;
@@ -33,9 +33,9 @@ public class RedisClusterStateQuerier extends BaseQueryFunction<RedisClusterStat
private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);
private final String redisKeyPrefix;
- private final TridentTupleMapper tupleMapper;
+ private final TupleMapper tupleMapper;
- public RedisClusterStateQuerier(String redisKeyPrefix, TridentTupleMapper tupleMapper) {
+ public RedisClusterStateQuerier(String redisKeyPrefix, TupleMapper tupleMapper) {
this.redisKeyPrefix = redisKeyPrefix;
this.tupleMapper = tupleMapper;
}
@@ -52,7 +52,7 @@ public class RedisClusterStateQuerier extends BaseQueryFunction<RedisClusterStat
for (TridentTuple input : inputs) {
- String key = this.tupleMapper.getKeyFromTridentTuple(input);
+ String key = this.tupleMapper.getKeyFromTuple(input);
if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
key = redisKeyPrefix + key;
}
@@ -72,7 +72,7 @@ public class RedisClusterStateQuerier extends BaseQueryFunction<RedisClusterStat
@Override
public void execute(TridentTuple tuple, String s, TridentCollector collector) {
- String key = this.tupleMapper.getKeyFromTridentTuple(tuple);
+ String key = this.tupleMapper.getKeyFromTuple(tuple);
collector.emit(new Values(key, s));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
index e72735a..023b527 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
@@ -17,7 +17,7 @@
*/
package org.apache.storm.redis.trident.state;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCluster;
@@ -31,10 +31,10 @@ public class RedisClusterStateUpdater extends BaseStateUpdater<RedisClusterState
private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);
private final String redisKeyPrefix;
- private final TridentTupleMapper tupleMapper;
+ private final TupleMapper tupleMapper;
private final int expireIntervalSec;
- public RedisClusterStateUpdater(String redisKeyPrefix, TridentTupleMapper tupleMapper, int expireIntervalSec) {
+ public RedisClusterStateUpdater(String redisKeyPrefix, TupleMapper tupleMapper, int expireIntervalSec) {
this.redisKeyPrefix = redisKeyPrefix;
this.tupleMapper = tupleMapper;
if (expireIntervalSec > 0) {
@@ -52,12 +52,12 @@ public class RedisClusterStateUpdater extends BaseStateUpdater<RedisClusterState
try {
jedisCluster = redisClusterState.getJedisCluster();
for (TridentTuple input : inputs) {
- String key = this.tupleMapper.getKeyFromTridentTuple(input);
+ String key = this.tupleMapper.getKeyFromTuple(input);
String redisKey = key;
if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
redisKey = redisKeyPrefix + redisKey;
}
- String value = this.tupleMapper.getValueFromTridentTuple(input);
+ String value = this.tupleMapper.getValueFromTuple(input);
logger.debug("update key[" + key + "] redisKey[" + redisKey + "] value[" + value + "]");
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
index f934cea..7f3edd1 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
@@ -22,7 +22,7 @@ import backtype.storm.tuple.Values;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
index f2fd624..2c7fd13 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
@@ -18,7 +18,7 @@
package org.apache.storm.redis.trident.state;
import backtype.storm.task.IMetricsContext;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
index 051088e..294e83b 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
@@ -19,7 +19,7 @@ package org.apache.storm.redis.trident.state;
import backtype.storm.tuple.Values;
import com.google.common.collect.Lists;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
@@ -33,9 +33,9 @@ public class RedisStateQuerier extends BaseQueryFunction<RedisState, String> {
private static final Logger logger = LoggerFactory.getLogger(RedisState.class);
private final String redisKeyPrefix;
- private final TridentTupleMapper tupleMapper;
+ private final TupleMapper tupleMapper;
- public RedisStateQuerier(String redisKeyPrefix, TridentTupleMapper tupleMapper) {
+ public RedisStateQuerier(String redisKeyPrefix, TupleMapper tupleMapper) {
this.redisKeyPrefix = redisKeyPrefix;
this.tupleMapper = tupleMapper;
}
@@ -44,7 +44,7 @@ public class RedisStateQuerier extends BaseQueryFunction<RedisState, String> {
public List<String> batchRetrieve(RedisState redisState, List<TridentTuple> inputs) {
List<String> keys = Lists.newArrayList();
for (TridentTuple input : inputs) {
- String key = this.tupleMapper.getKeyFromTridentTuple(input);
+ String key = this.tupleMapper.getKeyFromTuple(input);
if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
key = redisKeyPrefix + key;
}
@@ -64,7 +64,7 @@ public class RedisStateQuerier extends BaseQueryFunction<RedisState, String> {
@Override
public void execute(TridentTuple tuple, String s, TridentCollector collector) {
- String key = this.tupleMapper.getKeyFromTridentTuple(tuple);
+ String key = this.tupleMapper.getKeyFromTuple(tuple);
collector.emit(new Values(key, s));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java
index 5b04d59..6b75f31 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java
@@ -18,7 +18,7 @@
package org.apache.storm.redis.trident.state;
import backtype.storm.tuple.Values;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
@@ -33,9 +33,9 @@ public class RedisStateSetCountQuerier extends BaseQueryFunction<RedisState, Lon
private static final Logger logger = LoggerFactory.getLogger(RedisState.class);
private final String redisKeyPrefix;
- private final TridentTupleMapper tupleMapper;
+ private final TupleMapper tupleMapper;
- public RedisStateSetCountQuerier(String redisKeyPrefix, TridentTupleMapper tupleMapper) {
+ public RedisStateSetCountQuerier(String redisKeyPrefix, TupleMapper tupleMapper) {
this.redisKeyPrefix = redisKeyPrefix;
this.tupleMapper = tupleMapper;
}
@@ -48,7 +48,7 @@ public class RedisStateSetCountQuerier extends BaseQueryFunction<RedisState, Lon
try {
jedis = redisState.getJedis();
for (TridentTuple input : inputs) {
- String key = this.tupleMapper.getKeyFromTridentTuple(input);
+ String key = this.tupleMapper.getKeyFromTuple(input);
if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
key = redisKeyPrefix + key;
}
@@ -68,7 +68,7 @@ public class RedisStateSetCountQuerier extends BaseQueryFunction<RedisState, Lon
@Override
public void execute(TridentTuple tuple, Long s, TridentCollector collector) {
- String key = this.tupleMapper.getKeyFromTridentTuple(tuple);
+ String key = this.tupleMapper.getKeyFromTuple(tuple);
collector.emit(new Values(key, s));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java
index c36d1f0..d7c43da 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java
@@ -18,7 +18,7 @@
package org.apache.storm.redis.trident.state;
import backtype.storm.tuple.Values;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
@@ -32,10 +32,10 @@ public class RedisStateSetUpdater extends BaseStateUpdater<RedisState> {
private static final Logger logger = LoggerFactory.getLogger(RedisState.class);
private final String redisKeyPrefix;
- private final TridentTupleMapper tupleMapper;
+ private final TupleMapper tupleMapper;
private final int expireIntervalSec;
- public RedisStateSetUpdater(String redisKeyPrefix, TridentTupleMapper tupleMapper, int expireIntervalSec) {
+ public RedisStateSetUpdater(String redisKeyPrefix, TupleMapper tupleMapper, int expireIntervalSec) {
this.redisKeyPrefix = redisKeyPrefix;
this.tupleMapper = tupleMapper;
if (expireIntervalSec > 0) {
@@ -53,12 +53,12 @@ public class RedisStateSetUpdater extends BaseStateUpdater<RedisState> {
try {
jedis = redisState.getJedis();
for (TridentTuple input : inputs) {
- String key = this.tupleMapper.getKeyFromTridentTuple(input);
+ String key = this.tupleMapper.getKeyFromTuple(input);
String redisKey = key;
if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
redisKey = redisKeyPrefix + redisKey;
}
- String value = this.tupleMapper.getValueFromTridentTuple(input);
+ String value = this.tupleMapper.getValueFromTuple(input);
logger.debug("update key[" + key + "] redisKey[" + redisKey + "] value[" + value + "]");
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
index 67f7c51..664a222 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
@@ -17,7 +17,7 @@
*/
package org.apache.storm.redis.trident.state;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
@@ -31,10 +31,10 @@ public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
private static final Logger logger = LoggerFactory.getLogger(RedisState.class);
private final String redisKeyPrefix;
- private final TridentTupleMapper tupleMapper;
+ private final TupleMapper tupleMapper;
private final int expireIntervalSec;
- public RedisStateUpdater(String redisKeyPrefix, TridentTupleMapper tupleMapper, int expireIntervalSec) {
+ public RedisStateUpdater(String redisKeyPrefix, TupleMapper tupleMapper, int expireIntervalSec) {
this.redisKeyPrefix = redisKeyPrefix;
this.tupleMapper = tupleMapper;
if (expireIntervalSec > 0) {
@@ -51,12 +51,12 @@ public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
try {
jedis = redisState.getJedis();
for (TridentTuple input : inputs) {
- String key = this.tupleMapper.getKeyFromTridentTuple(input);
+ String key = this.tupleMapper.getKeyFromTuple(input);
String redisKey = key;
if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
redisKey = redisKeyPrefix + redisKey;
}
- String value = this.tupleMapper.getValueFromTridentTuple(input);
+ String value = this.tupleMapper.getValueFromTuple(input);
logger.debug("update key[" + key + "] redisKey[" + redisKey + "] value[" + value + "]");
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisClusterConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisClusterConfig.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisClusterConfig.java
deleted file mode 100644
index 355119a..0000000
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisClusterConfig.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.redis.util.config;
-
-import com.google.common.base.Preconditions;
-import redis.clients.jedis.HostAndPort;
-import redis.clients.jedis.Protocol;
-
-import java.io.Serializable;
-import java.net.InetSocketAddress;
-import java.util.HashSet;
-import java.util.Set;
-
-public class JedisClusterConfig implements Serializable {
- private Set<InetSocketAddress> nodes;
- private int timeout;
- private int maxRedirections;
-
- public JedisClusterConfig(Set<InetSocketAddress> nodes, int timeout, int maxRedirections) {
- this.nodes = nodes;
- this.timeout = timeout;
- this.maxRedirections = maxRedirections;
- }
-
- public Set<HostAndPort> getNodes() {
- Set<HostAndPort> ret = new HashSet<HostAndPort>();
- for (InetSocketAddress node : nodes) {
- ret.add(new HostAndPort(node.getHostName(), node.getPort()));
- }
- return ret;
- }
-
- public int getTimeout() {
- return timeout;
- }
-
- public int getMaxRedirections() {
- return maxRedirections;
- }
-
- public static class Builder {
- private Set<InetSocketAddress> nodes;
- private int timeout = Protocol.DEFAULT_TIMEOUT;
- private int maxRedirections = 5;
-
- public Builder setNodes(Set<InetSocketAddress> nodes) {
- this.nodes = nodes;
- return this;
- }
-
- public Builder setTimeout(int timeout) {
- this.timeout = timeout;
- return this;
- }
-
- public Builder setMaxRedirections(int maxRedirections) {
- this.maxRedirections = maxRedirections;
- return this;
- }
-
- public JedisClusterConfig build() {
- Preconditions.checkNotNull(this.nodes, "Node information should be presented");
-
- return new JedisClusterConfig(nodes, timeout, maxRedirections);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisPoolConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisPoolConfig.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisPoolConfig.java
deleted file mode 100644
index 9a42cf7..0000000
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisPoolConfig.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.redis.util.config;
-
-import redis.clients.jedis.Protocol;
-
-import java.io.Serializable;
-
-public class JedisPoolConfig implements Serializable {
- public static final String DEFAULT_HOST = "127.0.0.1";
-
- private String host;
- private int port;
- private int timeout;
- private int database;
- private String password;
-
- public JedisPoolConfig(String host, int port, int timeout, String password, int database) {
- this.host = host;
- this.port = port;
- this.timeout = timeout;
- this.database = database;
- this.password = password;
- }
-
- public String getHost() {
- return host;
- }
-
- public int getPort() {
- return port;
- }
-
- public int getTimeout() {
- return timeout;
- }
-
- public int getDatabase() {
- return database;
- }
-
- public String getPassword() {
- return password;
- }
-
- public static class Builder {
- private String host = DEFAULT_HOST;
- private int port = Protocol.DEFAULT_PORT;
- private int timeout = Protocol.DEFAULT_TIMEOUT;
- private int database = Protocol.DEFAULT_DATABASE;
- private String password;
-
- public Builder setHost(String host) {
- this.host = host;
- return this;
- }
-
- public Builder setPort(int port) {
- this.port = port;
- return this;
- }
-
- public Builder setTimeout(int timeout) {
- this.timeout = timeout;
- return this;
- }
-
- public Builder setDatabase(int database) {
- this.database = database;
- return this;
- }
-
- public Builder setPassword(String password) {
- this.password = password;
- return this;
- }
-
- public JedisPoolConfig build() {
- return new JedisPoolConfig(host, port, timeout, password, database);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisClusterContainer.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisClusterContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisClusterContainer.java
deleted file mode 100644
index 5fd4115..0000000
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisClusterContainer.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.redis.util.container;
-
-import redis.clients.jedis.JedisCluster;
-import redis.clients.jedis.JedisCommands;
-
-import java.io.Closeable;
-
-public class JedisClusterContainer implements JedisCommandsInstanceContainer, Closeable {
-
- private JedisCluster jedisCluster;
-
- public JedisClusterContainer(JedisCluster jedisCluster) {
- this.jedisCluster = jedisCluster;
- }
-
- @Override
- public JedisCommands getInstance() {
- return this.jedisCluster;
- }
-
- @Override
- public void returnInstance(JedisCommands jedisCommands) {
- // do nothing
- }
-
- @Override
- public void close() {
- this.jedisCluster.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsContainerBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsContainerBuilder.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsContainerBuilder.java
deleted file mode 100644
index 8d2dd38..0000000
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsContainerBuilder.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.redis.util.container;
-
-import org.apache.storm.redis.util.config.JedisClusterConfig;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
-import redis.clients.jedis.JedisCluster;
-import redis.clients.jedis.JedisPool;
-
-public class JedisCommandsContainerBuilder {
-
- public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
-
- public static JedisCommandsInstanceContainer build(JedisPoolConfig config) {
- JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG, config.getHost(), config.getPort(), config.getTimeout(), config.getPassword(), config.getDatabase());
- return new JedisContainer(jedisPool);
- }
-
- public static JedisCommandsInstanceContainer build(JedisClusterConfig config) {
- JedisCluster jedisCluster = new JedisCluster(config.getNodes(), config.getTimeout(), config.getMaxRedirections(), DEFAULT_POOL_CONFIG);
- return new JedisClusterContainer(jedisCluster);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsInstanceContainer.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsInstanceContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsInstanceContainer.java
deleted file mode 100644
index 847d6a5..0000000
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsInstanceContainer.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.redis.util.container;
-
-import redis.clients.jedis.JedisCommands;
-
-public interface JedisCommandsInstanceContainer {
- JedisCommands getInstance();
- void returnInstance(JedisCommands jedisCommands);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisContainer.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisContainer.java
deleted file mode 100644
index e75cccc..0000000
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisContainer.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.redis.util.container;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisCommands;
-import redis.clients.jedis.JedisPool;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-public class JedisContainer implements JedisCommandsInstanceContainer, Closeable {
- private static final Logger LOG = LoggerFactory.getLogger(JedisContainer.class);
-
- private JedisPool jedisPool;
-
- public JedisContainer(JedisPool jedisPool) {
- this.jedisPool = jedisPool;
- }
-
- @Override
- public JedisCommands getInstance() {
- return jedisPool.getResource();
- }
-
- @Override
- public void returnInstance(JedisCommands jedisCommands) {
- if (jedisCommands == null) {
- return;
- }
-
- try {
- ((Closeable) jedisCommands).close();
- } catch (IOException e) {
- LOG.warn("Failed to close (return) instance to pool");
- try {
- jedisPool.returnBrokenResource((Jedis) jedisCommands);
- } catch (Exception e2) {
- LOG.error("Failed to discard instance from pool");
- }
- }
- }
-
- @Override
- public void close() {
- jedisPool.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java
index a62fdff..ae053de 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java
@@ -20,72 +20,65 @@ package org.apache.storm.redis.topology;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.ITuple;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
-import org.apache.storm.redis.bolt.AbstractRedisBolt;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
+import com.google.common.collect.Lists;
+import org.apache.storm.redis.bolt.RedisLookupBolt;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import redis.clients.jedis.JedisCommands;
-import redis.clients.jedis.exceptions.JedisConnectionException;
-import redis.clients.jedis.exceptions.JedisException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import java.util.Random;
public class LookupWordCount {
private static final String WORD_SPOUT = "WORD_SPOUT";
private static final String LOOKUP_BOLT = "LOOKUP_BOLT";
+ private static final String PRINT_BOLT = "PRINT_BOLT";
private static final String TEST_REDIS_HOST = "127.0.0.1";
private static final int TEST_REDIS_PORT = 6379;
- public static class LookupWordTotalCountBolt extends AbstractRedisBolt {
- private static final Logger LOG = LoggerFactory.getLogger(LookupWordTotalCountBolt.class);
+ public static class PrintWordTotalCountBolt extends BaseRichBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(PrintWordTotalCountBolt.class);
private static final Random RANDOM = new Random();
+ private OutputCollector collector;
- public LookupWordTotalCountBolt(JedisPoolConfig config) {
- super(config);
- }
-
- public LookupWordTotalCountBolt(JedisClusterConfig config) {
- super(config);
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
}
@Override
public void execute(Tuple input) {
- JedisCommands jedisCommands = null;
- try {
- jedisCommands = getInstance();
- String wordName = input.getStringByField("word");
- String countStr = jedisCommands.get(wordName);
+ String wordName = input.getStringByField("wordName");
+ String countStr = input.getStringByField("count");
+
+ // print lookup result with low probability
+ if(RANDOM.nextInt(1000) > 995) {
+ int count = 0;
if (countStr != null) {
- int count = Integer.parseInt(countStr);
- this.collector.emit(new Values(wordName, count));
-
- // print lookup result with low probability
- if(RANDOM.nextInt(1000) > 995) {
- LOG.info("Lookup result - word : " + wordName + " / count : " + count);
- }
- } else {
- // skip
- LOG.warn("Word not found in Redis - word : " + wordName);
+ count = Integer.parseInt(countStr);
}
- } finally {
- if (jedisCommands != null) {
- returnInstance(jedisCommands);
- }
- this.collector.ack(input);
+ LOG.info("Lookup result - word : " + wordName + " / count : " + count);
}
+
+ collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
- // wordName, count
- declarer.declare(new Fields("wordName", "count"));
}
}
@@ -104,12 +97,16 @@ public class LookupWordCount {
.setHost(host).setPort(port).build();
WordSpout spout = new WordSpout();
- LookupWordTotalCountBolt redisLookupBolt = new LookupWordTotalCountBolt(poolConfig);
+ RedisLookupMapper lookupMapper = setupLookupMapper();
+ RedisLookupBolt lookupBolt = new RedisLookupBolt(poolConfig, lookupMapper);
+
+ PrintWordTotalCountBolt printBolt = new PrintWordTotalCountBolt();
//wordspout -> lookupbolt
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(WORD_SPOUT, spout, 1);
- builder.setBolt(LOOKUP_BOLT, redisLookupBolt, 1).shuffleGrouping(WORD_SPOUT);
+ builder.setBolt(LOOKUP_BOLT, lookupBolt, 1).shuffleGrouping(WORD_SPOUT);
+ builder.setBolt(PRINT_BOLT, printBolt, 1).shuffleGrouping(LOOKUP_BOLT);
if (args.length == 2) {
LocalCluster cluster = new LocalCluster();
@@ -124,4 +121,46 @@ public class LookupWordCount {
System.out.println("Usage: LookupWordCount <redis host> <redis port> (topology name)");
}
}
+
+ private static RedisLookupMapper setupLookupMapper() {
+ return new WordCountRedisLookupMapper();
+ }
+
+ private static class WordCountRedisLookupMapper implements RedisLookupMapper {
+ private RedisDataTypeDescription description;
+ private final String hashKey = "wordCount";
+
+ public WordCountRedisLookupMapper() {
+ description = new RedisDataTypeDescription(
+ RedisDataTypeDescription.RedisDataType.HASH, hashKey);
+ }
+
+ @Override
+ public List<Values> toTuple(ITuple input, Object value) {
+ String member = getKeyFromTuple(input);
+ List<Values> values = Lists.newArrayList();
+ values.add(new Values(member, value));
+ return values;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("wordName", "count"));
+ }
+
+ @Override
+ public RedisDataTypeDescription getDataTypeDescription() {
+ return description;
+ }
+
+ @Override
+ public String getKeyFromTuple(ITuple tuple) {
+ return tuple.getStringByField("word");
+ }
+
+ @Override
+ public String getValueFromTuple(ITuple tuple) {
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
index 535d7b9..14a969d 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
@@ -23,10 +23,14 @@ import backtype.storm.StormSubmitter;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.ITuple;
import backtype.storm.tuple.Tuple;
import org.apache.storm.redis.bolt.AbstractRedisBolt;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
+import org.apache.storm.redis.bolt.RedisStoreBolt;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCommands;
@@ -36,7 +40,7 @@ import redis.clients.jedis.exceptions.JedisException;
public class PersistentWordCount {
private static final String WORD_SPOUT = "WORD_SPOUT";
private static final String COUNT_BOLT = "COUNT_BOLT";
- private static final String REDIS_BOLT = "REDIS_BOLT";
+ private static final String STORE_BOLT = "STORE_BOLT";
private static final String TEST_REDIS_HOST = "127.0.0.1";
private static final int TEST_REDIS_PORT = 6379;
@@ -92,14 +96,15 @@ public class PersistentWordCount {
WordSpout spout = new WordSpout();
WordCounter bolt = new WordCounter();
- StoreCountRedisBolt redisBolt = new StoreCountRedisBolt(poolConfig);
+ RedisStoreMapper storeMapper = setupStoreMapper();
+ RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
// wordSpout ==> countBolt ==> RedisBolt
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(WORD_SPOUT, spout, 1);
- builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
- builder.setBolt(REDIS_BOLT, redisBolt, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
+ builder.setBolt(COUNT_BOLT, bolt, 1).fieldsGrouping(WORD_SPOUT, new Fields("word"));
+ builder.setBolt(STORE_BOLT, storeBolt, 1).shuffleGrouping(COUNT_BOLT);
if (args.length == 2) {
LocalCluster cluster = new LocalCluster();
@@ -114,4 +119,33 @@ public class PersistentWordCount {
System.out.println("Usage: PersistentWordCount <redis host> <redis port> (topology name)");
}
}
+
+ private static RedisStoreMapper setupStoreMapper() {
+ return new WordCountStoreMapper();
+ }
+
+ private static class WordCountStoreMapper implements RedisStoreMapper {
+ private RedisDataTypeDescription description;
+ private final String hashKey = "wordCount";
+
+ public WordCountStoreMapper() {
+ description = new RedisDataTypeDescription(
+ RedisDataTypeDescription.RedisDataType.HASH, hashKey);
+ }
+
+ @Override
+ public RedisDataTypeDescription getDataTypeDescription() {
+ return description;
+ }
+
+ @Override
+ public String getKeyFromTuple(ITuple tuple) {
+ return tuple.getStringByField("word");
+ }
+
+ @Override
+ public String getValueFromTuple(ITuple tuple) {
+ return tuple.getStringByField("count");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java
index 6a0548d..6f25038 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java
@@ -23,23 +23,32 @@ import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import com.google.common.collect.Maps;
import java.util.Map;
import static backtype.storm.utils.Utils.tuple;
public class WordCounter implements IBasicBolt {
-
+ private Map<String, Integer> wordCounter = Maps.newHashMap();
@SuppressWarnings("rawtypes")
public void prepare(Map stormConf, TopologyContext context) {
}
- /*
- * Just output the word value with a count of 1.
- */
public void execute(Tuple input, BasicOutputCollector collector) {
- collector.emit(tuple(input.getValues().get(0), 1));
+ String word = input.getStringByField("word");
+ int count;
+ if (wordCounter.containsKey(word)) {
+ count = wordCounter.get(word) + 1;
+ wordCounter.put(word, wordCounter.get(word) + 1);
+ } else {
+ count = 1;
+ }
+
+ wordCounter.put(word, count);
+ collector.emit(new Values(word, String.valueOf(count)));
}
public void cleanup() {
http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
index 9a28cb7..8b6ebc5 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
@@ -23,15 +23,14 @@ import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
import org.apache.storm.redis.trident.state.RedisState;
import org.apache.storm.redis.trident.state.RedisStateQuerier;
import org.apache.storm.redis.trident.state.RedisStateUpdater;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
import storm.trident.Stream;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
-import storm.trident.state.StateFactory;
import storm.trident.testing.FixedBatchSpout;
public class WordCountTridentRedis {
@@ -48,7 +47,7 @@ public class WordCountTridentRedis {
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost(redisHost).setPort(redisPort)
.build();
- TridentTupleMapper tupleMapper = new WordCountTupleMapper();
+ TupleMapper tupleMapper = new WordCountTupleMapper();
RedisState.Factory factory = new RedisState.Factory(poolConfig);
TridentTopology topology = new TridentTopology();