You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/07/10 23:23:10 UTC
[1/4] storm git commit: STORM-2594: Apply new code style to
storm-rocketmq
Repository: storm
Updated Branches:
refs/heads/master 29a8a42c8 -> 64fab1e58
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringKeyValueScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringKeyValueScheme.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringKeyValueScheme.java
index 47ef996..21a762e 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringKeyValueScheme.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringKeyValueScheme.java
@@ -15,19 +15,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.rocketmq.spout.scheme;
import com.google.common.collect.ImmutableMap;
-import org.apache.storm.tuple.Values;
import java.nio.ByteBuffer;
import java.util.List;
+import org.apache.storm.tuple.Values;
+
public class StringKeyValueScheme extends StringScheme implements KeyValueScheme {
@Override
public List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value) {
- if ( key == null ) {
+ if (key == null) {
return deserialize(value);
}
String keyString = StringScheme.deserializeString(key);
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringScheme.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringScheme.java
index 64c8241..810cb98 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringScheme.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringScheme.java
@@ -15,17 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.rocketmq.spout.scheme;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
import org.apache.storm.spout.Scheme;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-
public class StringScheme implements Scheme {
public static final String STRING_SCHEME_KEY = "str";
@@ -33,12 +33,17 @@ public class StringScheme implements Scheme {
return new Values(deserializeString(bytes));
}
- public static String deserializeString(ByteBuffer string) {
- if (string.hasArray()) {
- int base = string.arrayOffset();
- return new String(string.array(), base + string.position(), string.remaining());
+ /**
+ * Deserialize ByteBuffer to String.
+ * @param byteBuffer input ByteBuffer
+ * @return deserialized string
+ */
+ public static String deserializeString(ByteBuffer byteBuffer) {
+ if (byteBuffer.hasArray()) {
+ int base = byteBuffer.arrayOffset();
+ return new String(byteBuffer.array(), base + byteBuffer.position(), byteBuffer.remaining());
} else {
- return new String(Utils.toByteArray(string), StandardCharsets.UTF_8);
+ return new String(Utils.toByteArray(byteBuffer), StandardCharsets.UTF_8);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java
deleted file mode 100644
index b6413ad..0000000
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.rocketmq.trident.state;
-
-import org.apache.commons.lang.Validate;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.MQProducer;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.storm.rocketmq.RocketMQConfig;
-import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper;
-import org.apache.storm.rocketmq.common.selector.TopicSelector;
-import org.apache.storm.topology.FailedException;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-public class RocketMQState implements State {
-
- private static final Logger LOG = LoggerFactory.getLogger(RocketMQState.class);
-
- private Options options;
- private MQProducer producer;
-
- protected RocketMQState(Map map, Options options) {
- this.options = options;
- }
-
- public static class Options implements Serializable {
- private TopicSelector selector;
- private TupleToMessageMapper mapper;
- private Properties properties;
-
- public Options withSelector(TopicSelector selector) {
- this.selector = selector;
- return this;
- }
-
- public Options withMapper(TupleToMessageMapper mapper) {
- this.mapper = mapper;
- return this;
- }
-
- public Options withProperties(Properties properties) {
- this.properties = properties;
- return this;
- }
- }
-
- protected void prepare() {
- Validate.notEmpty(options.properties, "Producer properties can not be empty");
-
- producer = new DefaultMQProducer();
- RocketMQConfig.buildProducerConfigs(options.properties, (DefaultMQProducer)producer);
-
- try {
- producer.start();
- } catch (MQClientException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void beginCommit(Long txid) {
- LOG.debug("beginCommit is noop.");
- }
-
- @Override
- public void commit(Long txid) {
- LOG.debug("commit is noop.");
- }
-
- public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
- try {
- for (TridentTuple tuple : tuples) {
- String topic = options.selector.getTopic(tuple);
- String tag = options.selector.getTag(tuple);
- String key = options.mapper.getKeyFromTuple(tuple);
- byte[] value = options.mapper.getValueFromTuple(tuple);
-
- if (topic == null) {
- LOG.warn("skipping Message with Key = " + key + ", topic selector returned null.");
- continue;
- }
-
- Message msg = new Message(topic,tag, key, value);
- this.producer.send(msg);
- }
- } catch (Exception e) {
- LOG.warn("Batch write failed but some requests might have succeeded. Triggering replay.", e);
- throw new FailedException(e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateFactory.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateFactory.java
deleted file mode 100644
index 2b4cc37..0000000
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateFactory.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.rocketmq.trident.state;
-
-import org.apache.storm.task.IMetricsContext;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.state.StateFactory;
-
-import java.util.Map;
-
-public class RocketMQStateFactory implements StateFactory {
-
- private RocketMQState.Options options;
-
- public RocketMQStateFactory(RocketMQState.Options options) {
- this.options = options;
- }
-
- @Override
- public State makeState(Map<String, Object> conf, IMetricsContext metrics,
- int partitionIndex, int numPartitions) {
- RocketMQState state = new RocketMQState(conf, options);
- state.prepare();
- return state;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateUpdater.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateUpdater.java
deleted file mode 100644
index a548ce8..0000000
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateUpdater.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.rocketmq.trident.state;
-
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.BaseStateUpdater;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-import java.util.List;
-
-public class RocketMQStateUpdater extends BaseStateUpdater<RocketMQState> {
-
- @Override
- public void updateState(RocketMQState state, List<TridentTuple> tuples,
- TridentCollector collector) {
- state.updateState(tuples, collector);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqState.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqState.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqState.java
new file mode 100644
index 0000000..9a8a46e
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqState.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.rocketmq.trident.state;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.lang.Validate;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.MQProducer;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.storm.rocketmq.RocketMqConfig;
+import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper;
+import org.apache.storm.rocketmq.common.selector.TopicSelector;
+import org.apache.storm.topology.FailedException;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocketMqState implements State {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RocketMqState.class);
+
+ private Options options;
+ private MQProducer producer;
+
+ protected RocketMqState(Map map, Options options) {
+ this.options = options;
+ }
+
+ public static class Options implements Serializable {
+ private TopicSelector selector;
+ private TupleToMessageMapper mapper;
+ private Properties properties;
+
+ public Options withSelector(TopicSelector selector) {
+ this.selector = selector;
+ return this;
+ }
+
+ public Options withMapper(TupleToMessageMapper mapper) {
+ this.mapper = mapper;
+ return this;
+ }
+
+ public Options withProperties(Properties properties) {
+ this.properties = properties;
+ return this;
+ }
+ }
+
+ protected void prepare() {
+ Validate.notEmpty(options.properties, "Producer properties can not be empty");
+
+ producer = new DefaultMQProducer();
+ RocketMqConfig.buildProducerConfigs(options.properties, (DefaultMQProducer)producer);
+
+ try {
+ producer.start();
+ } catch (MQClientException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void beginCommit(Long txid) {
+ LOG.debug("beginCommit is noop.");
+ }
+
+ @Override
+ public void commit(Long txid) {
+ LOG.debug("commit is noop.");
+ }
+
+ /**
+ * Update the RocketMQ state.
+ * @param tuples trident tuples
+ * @param collector trident collector
+ */
+ public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
+ try {
+ for (TridentTuple tuple : tuples) {
+ String topic = options.selector.getTopic(tuple);
+ String tag = options.selector.getTag(tuple);
+ String key = options.mapper.getKeyFromTuple(tuple);
+ byte[] value = options.mapper.getValueFromTuple(tuple);
+
+ if (topic == null) {
+ LOG.warn("skipping Message with Key = " + key + ", topic selector returned null.");
+ continue;
+ }
+
+ Message msg = new Message(topic,tag, key, value);
+ this.producer.send(msg);
+ }
+ } catch (Exception e) {
+ LOG.warn("Batch write failed but some requests might have succeeded. Triggering replay.", e);
+ throw new FailedException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqStateFactory.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqStateFactory.java
new file mode 100644
index 0000000..64b8277
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqStateFactory.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.rocketmq.trident.state;
+
+import java.util.Map;
+
+import org.apache.storm.task.IMetricsContext;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+
+public class RocketMqStateFactory implements StateFactory {
+
+ private RocketMqState.Options options;
+
+ public RocketMqStateFactory(RocketMqState.Options options) {
+ this.options = options;
+ }
+
+ @Override
+ public State makeState(Map<String, Object> conf, IMetricsContext metrics,
+ int partitionIndex, int numPartitions) {
+ RocketMqState state = new RocketMqState(conf, options);
+ state.prepare();
+ return state;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqStateUpdater.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqStateUpdater.java
new file mode 100644
index 0000000..589ac5e
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqStateUpdater.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.rocketmq.trident.state;
+
+import java.util.List;
+
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.BaseStateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class RocketMqStateUpdater extends BaseStateUpdater<RocketMqState> {
+
+ @Override
+ public void updateState(RocketMqState state, List<TridentTuple> tuples,
+ TridentCollector collector) {
+ state.updateState(tuples, collector);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/TestMessageRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/TestMessageRetryManager.java b/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/TestMessageRetryManager.java
index 47de20d..50aaf74 100644
--- a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/TestMessageRetryManager.java
+++ b/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/TestMessageRetryManager.java
@@ -15,21 +15,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.rocketmq;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.storm.utils.Utils;
-import org.junit.Before;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.storm.utils.Utils;
+import org.junit.Before;
+import org.junit.Test;
public class TestMessageRetryManager {
MessageRetryManager messageRetryManager;
[3/4] storm git commit: Merge branch 'STORM-2594' of
https://github.com/vesense/storm into STORM-2594-merge
Posted by ka...@apache.org.
Merge branch 'STORM-2594' of https://github.com/vesense/storm into STORM-2594-merge
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/88e691aa
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/88e691aa
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/88e691aa
Branch: refs/heads/master
Commit: 88e691aa3009cf14f433333775d37b964fcd6c90
Parents: 29a8a42 1d7be76
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Jul 11 08:22:26 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Jul 11 08:22:26 2017 +0900
----------------------------------------------------------------------
docs/storm-rocketmq.md | 28 +--
.../rocketmq/topology/WordCountTopology.java | 12 +-
.../rocketmq/trident/WordCountTrident.java | 16 +-
external/storm-rocketmq/README.md | 28 +--
external/storm-rocketmq/pom.xml | 3 -
.../apache/storm/rocketmq/ConsumerMessage.java | 1 +
.../rocketmq/DefaultMessageBodySerializer.java | 3 +-
.../rocketmq/DefaultMessageRetryManager.java | 20 +-
.../storm/rocketmq/MessageBodySerializer.java | 5 +-
.../storm/rocketmq/MessageRetryManager.java | 13 +-
.../apache/storm/rocketmq/RocketMQConfig.java | 162 --------------
.../apache/storm/rocketmq/RocketMQUtils.java | 64 ------
.../apache/storm/rocketmq/RocketMqConfig.java | 178 +++++++++++++++
.../apache/storm/rocketmq/RocketMqUtils.java | 76 +++++++
.../org/apache/storm/rocketmq/SpoutConfig.java | 3 +-
.../storm/rocketmq/bolt/RocketMQBolt.java | 160 -------------
.../storm/rocketmq/bolt/RocketMqBolt.java | 160 +++++++++++++
.../FieldNameBasedTupleToMessageMapper.java | 14 +-
.../common/mapper/TupleToMessageMapper.java | 6 +-
.../common/selector/DefaultTopicSelector.java | 5 +-
.../selector/FieldNameBasedTopicSelector.java | 9 +-
.../rocketmq/common/selector/TopicSelector.java | 6 +-
.../storm/rocketmq/spout/RocketMQSpout.java | 218 ------------------
.../storm/rocketmq/spout/RocketMqSpout.java | 223 +++++++++++++++++++
.../rocketmq/spout/scheme/KeyValueScheme.java | 4 +-
.../spout/scheme/StringKeyValueScheme.java | 6 +-
.../rocketmq/spout/scheme/StringScheme.java | 23 +-
.../rocketmq/trident/state/RocketMQState.java | 117 ----------
.../trident/state/RocketMQStateFactory.java | 42 ----
.../trident/state/RocketMQStateUpdater.java | 34 ---
.../rocketmq/trident/state/RocketMqState.java | 123 ++++++++++
.../trident/state/RocketMqStateFactory.java | 43 ++++
.../trident/state/RocketMqStateUpdater.java | 35 +++
.../storm/rocketmq/TestMessageRetryManager.java | 15 +-
34 files changed, 964 insertions(+), 891 deletions(-)
----------------------------------------------------------------------
[4/4] storm git commit: STORM-2594: CHANGELOG
Posted by ka...@apache.org.
STORM-2594: CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/64fab1e5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/64fab1e5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/64fab1e5
Branch: refs/heads/master
Commit: 64fab1e58349f5057d9bfd95be018ed8224a88c4
Parents: 88e691a
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Jul 11 08:22:46 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Jul 11 08:22:46 2017 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/64fab1e5/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4a5f7b1..5a58d75 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 2.0.0
+ * STORM-2594: Apply new code style to storm-rocketmq
* STORM-2615: Add topology readonly user configuration
* STORM-2093: Fix permissions in multi-tenant, secure mode
* STORM-2610: Fixed throttle metrics
[2/4] storm git commit: STORM-2594: Apply new code style to
storm-rocketmq
Posted by ka...@apache.org.
STORM-2594: Apply new code style to storm-rocketmq
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1d7be760
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1d7be760
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1d7be760
Branch: refs/heads/master
Commit: 1d7be760198189711165abac48acfd3bc332083a
Parents: d7c7818
Author: Xin Wang <be...@163.com>
Authored: Sun Jul 9 21:02:30 2017 +0800
Committer: Xin Wang <be...@163.com>
Committed: Sun Jul 9 21:02:30 2017 +0800
----------------------------------------------------------------------
docs/storm-rocketmq.md | 28 +--
.../rocketmq/topology/WordCountTopology.java | 12 +-
.../rocketmq/trident/WordCountTrident.java | 16 +-
external/storm-rocketmq/README.md | 28 +--
external/storm-rocketmq/pom.xml | 3 -
.../apache/storm/rocketmq/ConsumerMessage.java | 1 +
.../rocketmq/DefaultMessageBodySerializer.java | 3 +-
.../rocketmq/DefaultMessageRetryManager.java | 20 +-
.../storm/rocketmq/MessageBodySerializer.java | 5 +-
.../storm/rocketmq/MessageRetryManager.java | 13 +-
.../apache/storm/rocketmq/RocketMQConfig.java | 162 --------------
.../apache/storm/rocketmq/RocketMQUtils.java | 64 ------
.../apache/storm/rocketmq/RocketMqConfig.java | 178 +++++++++++++++
.../apache/storm/rocketmq/RocketMqUtils.java | 76 +++++++
.../org/apache/storm/rocketmq/SpoutConfig.java | 3 +-
.../storm/rocketmq/bolt/RocketMQBolt.java | 160 -------------
.../storm/rocketmq/bolt/RocketMqBolt.java | 160 +++++++++++++
.../FieldNameBasedTupleToMessageMapper.java | 14 +-
.../common/mapper/TupleToMessageMapper.java | 6 +-
.../common/selector/DefaultTopicSelector.java | 5 +-
.../selector/FieldNameBasedTopicSelector.java | 9 +-
.../rocketmq/common/selector/TopicSelector.java | 6 +-
.../storm/rocketmq/spout/RocketMQSpout.java | 218 ------------------
.../storm/rocketmq/spout/RocketMqSpout.java | 223 +++++++++++++++++++
.../rocketmq/spout/scheme/KeyValueScheme.java | 4 +-
.../spout/scheme/StringKeyValueScheme.java | 6 +-
.../rocketmq/spout/scheme/StringScheme.java | 23 +-
.../rocketmq/trident/state/RocketMQState.java | 117 ----------
.../trident/state/RocketMQStateFactory.java | 42 ----
.../trident/state/RocketMQStateUpdater.java | 34 ---
.../rocketmq/trident/state/RocketMqState.java | 123 ++++++++++
.../trident/state/RocketMqStateFactory.java | 43 ++++
.../trident/state/RocketMqStateUpdater.java | 35 +++
.../storm/rocketmq/TestMessageRetryManager.java | 15 +-
34 files changed, 964 insertions(+), 891 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/docs/storm-rocketmq.md
----------------------------------------------------------------------
diff --git a/docs/storm-rocketmq.md b/docs/storm-rocketmq.md
index 17daf8c..2661ddf 100644
--- a/docs/storm-rocketmq.md
+++ b/docs/storm-rocketmq.md
@@ -6,10 +6,10 @@ Storm/Trident integration for [RocketMQ](https://rocketmq.incubator.apache.org/)
## Read from Topic
The spout included in this package for reading data from a topic.
-### RocketMQSpout
-To use the `RocketMQSpout`, you construct an instance of it by specifying a Properties instance which including rocketmq configs.
-RocketMQSpout uses RocketMQ MQPushConsumer as the default implementation. PushConsumer is a high level consumer API, wrapping the pulling details. Looks like broker push messages to consumer.
-RocketMQSpout will retry 3(use `SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY` to change the value) times when messages are failed.
+### RocketMqSpout
+To use the `RocketMqSpout`, you construct an instance of it by specifying a Properties instance which including rocketmq configs.
+RocketMqSpout uses RocketMQ MQPushConsumer as the default implementation. PushConsumer is a high level consumer API, wrapping the pulling details. Looks like broker push messages to consumer.
+RocketMqSpout will retry 3(use `SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY` to change the value) times when messages are failed.
```java
Properties properties = new Properties();
@@ -17,7 +17,7 @@ RocketMQSpout will retry 3(use `SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY` to chang
properties.setProperty(SpoutConfig.CONSUMER_GROUP, group);
properties.setProperty(SpoutConfig.CONSUMER_TOPIC, topic);
- RocketMQSpout spout = new RocketMQSpout(properties);
+ RocketMqSpout spout = new RocketMqSpout(properties);
```
@@ -51,18 +51,18 @@ public interface TopicSelector extends Serializable {
`storm-rocketmq` includes general purpose `TopicSelector` implementations called `DefaultTopicSelector` and `FieldNameBasedTopicSelector`.
-### RocketMQBolt
-To use the `RocketMQBolt`, you construct an instance of it by specifying TupleToMessageMapper, TopicSelector and Properties instances.
-RocketMQBolt send messages async by default. You can change this by invoking `withAsync(false)`.
+### RocketMqBolt
+To use the `RocketMqBolt`, you construct an instance of it by specifying TupleToMessageMapper, TopicSelector and Properties instances.
+RocketMqBolt send messages async by default. You can change this by invoking `withAsync(false)`.
```java
TupleToMessageMapper mapper = new FieldNameBasedTupleToMessageMapper("word", "count");
TopicSelector selector = new DefaultTopicSelector(topic);
properties = new Properties();
- properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameserverAddr);
+ properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, nameserverAddr);
- RocketMQBolt insertBolt = new RocketMQBolt()
+ RocketMqBolt insertBolt = new RocketMqBolt()
.withMapper(mapper)
.withSelector(selector)
.withProperties(properties);
@@ -76,19 +76,19 @@ We support trident persistent state that can be used with trident topologies. To
TopicSelector selector = new DefaultTopicSelector(topic);
Properties properties = new Properties();
- properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameserverAddr);
+ properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, nameserverAddr);
- RocketMQState.Options options = new RocketMQState.Options()
+ RocketMqState.Options options = new RocketMqState.Options()
.withMapper(mapper)
.withSelector(selector)
.withProperties(properties);
- StateFactory factory = new RocketMQStateFactory(options);
+ StateFactory factory = new RocketMqStateFactory(options);
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);
stream.partitionPersist(factory, fields,
- new RocketMQStateUpdater(), new Fields());
+ new RocketMqStateUpdater(), new Fields());
```
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCountTopology.java b/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCountTopology.java
index 0ce844e..5ae4bca 100644
--- a/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCountTopology.java
+++ b/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCountTopology.java
@@ -22,14 +22,14 @@ import org.apache.storm.LocalCluster;
import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
-import org.apache.storm.rocketmq.RocketMQConfig;
+import org.apache.storm.rocketmq.RocketMqConfig;
import org.apache.storm.rocketmq.SpoutConfig;
-import org.apache.storm.rocketmq.bolt.RocketMQBolt;
+import org.apache.storm.rocketmq.bolt.RocketMqBolt;
import org.apache.storm.rocketmq.common.mapper.FieldNameBasedTupleToMessageMapper;
import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper;
import org.apache.storm.rocketmq.common.selector.DefaultTopicSelector;
import org.apache.storm.rocketmq.common.selector.TopicSelector;
-import org.apache.storm.rocketmq.spout.RocketMQSpout;
+import org.apache.storm.rocketmq.spout.RocketMqSpout;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
@@ -49,7 +49,7 @@ public class WordCountTopology {
properties.setProperty(SpoutConfig.CONSUMER_GROUP, CONSUMER_GROUP);
properties.setProperty(SpoutConfig.CONSUMER_TOPIC, CONSUMER_TOPIC);
- RocketMQSpout spout = new RocketMQSpout(properties);
+ RocketMqSpout spout = new RocketMqSpout(properties);
WordCounter bolt = new WordCounter();
@@ -57,9 +57,9 @@ public class WordCountTopology {
TopicSelector selector = new DefaultTopicSelector(topic);
properties = new Properties();
- properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameserverAddr);
+ properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, nameserverAddr);
- RocketMQBolt insertBolt = new RocketMQBolt()
+ RocketMqBolt insertBolt = new RocketMqBolt()
.withMapper(mapper)
.withSelector(selector)
.withProperties(properties);
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/trident/WordCountTrident.java
----------------------------------------------------------------------
diff --git a/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/trident/WordCountTrident.java b/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/trident/WordCountTrident.java
index 1817d2e..3d1f37c 100644
--- a/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/trident/WordCountTrident.java
+++ b/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/trident/WordCountTrident.java
@@ -22,14 +22,14 @@ import org.apache.storm.LocalCluster;
import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
-import org.apache.storm.rocketmq.RocketMQConfig;
+import org.apache.storm.rocketmq.RocketMqConfig;
import org.apache.storm.rocketmq.common.mapper.FieldNameBasedTupleToMessageMapper;
import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper;
import org.apache.storm.rocketmq.common.selector.DefaultTopicSelector;
import org.apache.storm.rocketmq.common.selector.TopicSelector;
-import org.apache.storm.rocketmq.trident.state.RocketMQState;
-import org.apache.storm.rocketmq.trident.state.RocketMQStateFactory;
-import org.apache.storm.rocketmq.trident.state.RocketMQStateUpdater;
+import org.apache.storm.rocketmq.trident.state.RocketMqState;
+import org.apache.storm.rocketmq.trident.state.RocketMqStateFactory;
+import org.apache.storm.rocketmq.trident.state.RocketMqStateUpdater;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.state.StateFactory;
@@ -55,20 +55,20 @@ public class WordCountTrident {
TopicSelector selector = new DefaultTopicSelector(topic);
Properties properties = new Properties();
- properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameserverAddr);
+ properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, nameserverAddr);
- RocketMQState.Options options = new RocketMQState.Options()
+ RocketMqState.Options options = new RocketMqState.Options()
.withMapper(mapper)
.withSelector(selector)
.withProperties(properties);
- StateFactory factory = new RocketMQStateFactory(options);
+ StateFactory factory = new RocketMqStateFactory(options);
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);
stream.partitionPersist(factory, fields,
- new RocketMQStateUpdater(), new Fields());
+ new RocketMqStateUpdater(), new Fields());
return topology.build();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/README.md
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/README.md b/external/storm-rocketmq/README.md
index 341bb07..84c2ae2 100644
--- a/external/storm-rocketmq/README.md
+++ b/external/storm-rocketmq/README.md
@@ -6,10 +6,10 @@ Storm/Trident integration for [RocketMQ](https://rocketmq.incubator.apache.org/)
## Read from Topic
The spout included in this package for reading data from a topic.
-### RocketMQSpout
-To use the `RocketMQSpout`, you construct an instance of it by specifying a Properties instance which including rocketmq configs.
-RocketMQSpout uses RocketMQ MQPushConsumer as the default implementation. PushConsumer is a high level consumer API, wrapping the pulling details. Looks like broker push messages to consumer.
-RocketMQSpout will retry 3(use `SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY` to change the value) times when messages are failed.
+### RocketMqSpout
+To use the `RocketMqSpout`, you construct an instance of it by specifying a Properties instance which including rocketmq configs.
+RocketMqSpout uses RocketMQ MQPushConsumer as the default implementation. PushConsumer is a high level consumer API, wrapping the pulling details. Looks like broker push messages to consumer.
+RocketMqSpout will retry 3(use `SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY` to change the value) times when messages are failed.
```java
Properties properties = new Properties();
@@ -17,7 +17,7 @@ RocketMQSpout will retry 3(use `SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY` to chang
properties.setProperty(SpoutConfig.CONSUMER_GROUP, group);
properties.setProperty(SpoutConfig.CONSUMER_TOPIC, topic);
- RocketMQSpout spout = new RocketMQSpout(properties);
+ RocketMqSpout spout = new RocketMqSpout(properties);
```
@@ -51,18 +51,18 @@ public interface TopicSelector extends Serializable {
`storm-rocketmq` includes general purpose `TopicSelector` implementations called `DefaultTopicSelector` and `FieldNameBasedTopicSelector`.
-### RocketMQBolt
-To use the `RocketMQBolt`, you construct an instance of it by specifying TupleToMessageMapper, TopicSelector and Properties instances.
-RocketMQBolt send messages async by default. You can change this by invoking `withAsync(false)`.
+### RocketMqBolt
+To use the `RocketMqBolt`, you construct an instance of it by specifying TupleToMessageMapper, TopicSelector and Properties instances.
+RocketMqBolt send messages async by default. You can change this by invoking `withAsync(false)`.
```java
TupleToMessageMapper mapper = new FieldNameBasedTupleToMessageMapper("word", "count");
TopicSelector selector = new DefaultTopicSelector(topic);
properties = new Properties();
- properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameserverAddr);
+ properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, nameserverAddr);
- RocketMQBolt insertBolt = new RocketMQBolt()
+ RocketMqBolt insertBolt = new RocketMqBolt()
.withMapper(mapper)
.withSelector(selector)
.withProperties(properties);
@@ -76,20 +76,20 @@ We support trident persistent state that can be used with trident topologies. To
TopicSelector selector = new DefaultTopicSelector(topic);
Properties properties = new Properties();
- properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameserverAddr);
+ properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, nameserverAddr);
- RocketMQState.Options options = new RocketMQState.Options()
+ RocketMqState.Options options = new RocketMqState.Options()
.withMapper(mapper)
.withSelector(selector)
.withProperties(properties);
- StateFactory factory = new RocketMQStateFactory(options);
+ StateFactory factory = new RocketMqStateFactory(options);
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);
stream.partitionPersist(factory, fields,
- new RocketMQStateUpdater(), new Fields());
+ new RocketMqStateUpdater(), new Fields());
```
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/pom.xml b/external/storm-rocketmq/pom.xml
index 4e3d729..efdbeab 100644
--- a/external/storm-rocketmq/pom.xml
+++ b/external/storm-rocketmq/pom.xml
@@ -74,9 +74,6 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
- <configuration>
- <maxAllowedViolations>100</maxAllowedViolations>
- </configuration>
</plugin>
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/ConsumerMessage.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/ConsumerMessage.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/ConsumerMessage.java
index 97afae1..a3fd70e 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/ConsumerMessage.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/ConsumerMessage.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.rocketmq;
import org.apache.rocketmq.common.message.MessageExt;
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageBodySerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageBodySerializer.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageBodySerializer.java
index 5e7e314..0394721 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageBodySerializer.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageBodySerializer.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.rocketmq;
import java.nio.charset.StandardCharsets;
@@ -25,7 +26,7 @@ public class DefaultMessageBodySerializer implements MessageBodySerializer {
* Currently, we just convert string to bytes using UTF-8 charset.
* Note: in this way, object.toString() method is invoked.
* @param body RocketMQ Message body
- * @return
+ * @return serialized byte[]
*/
@Override
public byte[] serialize(Object body) {
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageRetryManager.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageRetryManager.java
index bcc7e99..528d8ac 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageRetryManager.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageRetryManager.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.rocketmq;
import java.util.Map;
@@ -24,14 +25,20 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
/**
- * An implementation of MessageRetryManager
+ * An implementation of MessageRetryManager.
*/
-public class DefaultMessageRetryManager implements MessageRetryManager{
+public class DefaultMessageRetryManager implements MessageRetryManager {
private Map<String,ConsumerMessage> cache = new ConcurrentHashMap<>(500);
private BlockingQueue<ConsumerMessage> queue;
private int maxRetry;
private int ttl;
+ /**
+ * DefaultMessageRetryManager Constructor.
+ * @param queue messages BlockingQueue
+ * @param maxRetry max retry times
+ * @param ttl TTL
+ */
public DefaultMessageRetryManager(BlockingQueue<ConsumerMessage> queue, int maxRetry, int ttl) {
this.queue = queue;
this.maxRetry = maxRetry;
@@ -53,10 +60,12 @@ public class DefaultMessageRetryManager implements MessageRetryManager{
}, period, period);
}
+ @Override
public void ack(String id) {
cache.remove(id);
}
+ @Override
public void fail(String id) {
ConsumerMessage message = cache.remove(id);
if (message == null) {
@@ -70,16 +79,13 @@ public class DefaultMessageRetryManager implements MessageRetryManager{
}
}
+ @Override
public void mark(ConsumerMessage message) {
message.setTimestamp(System.currentTimeMillis());
cache.put(message.getId(), message);
}
- /**
- * Whether the message need retry.
- * @param message
- * @return
- */
+ @Override
public boolean needRetry(ConsumerMessage message) {
return message.getRetries() < maxRetry;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageBodySerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageBodySerializer.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageBodySerializer.java
index f86ea26..fbf16cd 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageBodySerializer.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageBodySerializer.java
@@ -15,13 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.rocketmq;
import java.io.Serializable;
/**
- * RocketMQ message body serializer
+ * RocketMQ message body serializer.
*/
-public interface MessageBodySerializer extends Serializable{
+public interface MessageBodySerializer extends Serializable {
byte[] serialize(Object body);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageRetryManager.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageRetryManager.java
index 5e59d9c..1b24817 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageRetryManager.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageRetryManager.java
@@ -15,35 +15,36 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.rocketmq;
/**
- * Interface for messages retry manager
+ * Interface for messages retry manager.
*/
public interface MessageRetryManager {
/**
* Remove from the cache. Message with the id is successful.
- * @param id
+ * @param id message id
*/
void ack(String id);
/**
* Remove from the cache. Message with the id is failed.
* Invoke retry logics if necessary.
- * @param id
+ * @param id message id
*/
void fail(String id);
/**
* Mark message in the cache.
- * @param message
+ * @param message message
*/
void mark(ConsumerMessage message);
/**
* Whether the message need retry.
- * @param message
- * @return
+ * @param message ConsumerMessage
+ * @return true if need retry, otherwise false
*/
boolean needRetry(ConsumerMessage message);
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java
deleted file mode 100644
index fcf6ff4..0000000
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.rocketmq;
-
-import org.apache.commons.lang.Validate;
-import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
-
-import java.util.Properties;
-import java.util.UUID;
-
-import static org.apache.storm.rocketmq.RocketMQUtils.getInteger;
-
-/**
- * RocketMQConfig for Consumer/Producer
- */
-public class RocketMQConfig {
- // common
- public static final String NAME_SERVER_ADDR = "nameserver.addr"; // Required
-
- public static final String CLIENT_NAME = "client.name";
-
- public static final String CLIENT_IP = "client.ip";
- public static final String DEFAULT_CLIENT_IP = RemotingUtil.getLocalAddress();
-
- public static final String CLIENT_CALLBACK_EXECUTOR_THREADS = "client.callback.executor.threads";
- public static final int DEFAULT_CLIENT_CALLBACK_EXECUTOR_THREADS = Runtime.getRuntime().availableProcessors();;
-
- public static final String NAME_SERVER_POLL_INTERVAL = "nameserver.poll.interval";
- public static final int DEFAULT_NAME_SERVER_POLL_INTERVAL = 30000; // 30 seconds
-
- public static final String BROKER_HEART_BEAT_INTERVAL = "brokerserver.heartbeat.interval";
- public static final int DEFAULT_BROKER_HEART_BEAT_INTERVAL = 30000; // 30 seconds
-
-
- // producer
- public static final String PRODUCER_GROUP = "producer.group";
-
- public static final String PRODUCER_RETRY_TIMES = "producer.retry.times";
- public static final int DEFAULT_PRODUCER_RETRY_TIMES = 2;
-
- public static final String PRODUCER_TIMEOUT = "producer.timeout";
- public static final int DEFAULT_PRODUCER_TIMEOUT = 3000; // 3 seconds
-
-
- // consumer
- public static final String CONSUMER_GROUP = "consumer.group"; // Required
-
- public static final String CONSUMER_TOPIC = "consumer.topic"; // Required
-
- public static final String CONSUMER_TAG = "consumer.tag";
- public static final String DEFAULT_TAG = "*";
-
- public static final String CONSUMER_OFFSET_RESET_TO = "consumer.offset.reset.to";
- public static final String CONSUMER_OFFSET_LATEST = "latest";
- public static final String CONSUMER_OFFSET_EARLIEST = "earliest";
- public static final String CONSUMER_OFFSET_TIMESTAMP = "timestamp";
-
- public static final String CONSUMER_MESSAGES_ORDERLY = "consumer.messages.orderly";
-
- public static final String CONSUMER_OFFSET_PERSIST_INTERVAL = "consumer.offset.persist.interval";
- public static final int DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL = 5000; // 5 seconds
-
- public static final String CONSUMER_MIN_THREADS = "consumer.min.threads";
- public static final int DEFAULT_CONSUMER_MIN_THREADS = 20;
-
- public static final String CONSUMER_MAX_THREADS = "consumer.max.threads";
- public static final int DEFAULT_CONSUMER_MAX_THREADS = 64;
-
-
- public static void buildProducerConfigs(Properties props, DefaultMQProducer producer) {
- buildCommonConfigs(props, producer);
-
- // According to the RocketMQ official docs, "only one instance is allowed per producer group"
- // So, we use UUID as the producer group by default, to allow many producer instances for one topic
- String defaultGroup = UUID.randomUUID().toString();
- producer.setProducerGroup(props.getProperty(PRODUCER_GROUP, defaultGroup));
-
- producer.setRetryTimesWhenSendFailed(getInteger(props,
- PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
- producer.setRetryTimesWhenSendAsyncFailed(getInteger(props,
- PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
- producer.setSendMsgTimeout(getInteger(props,
- PRODUCER_TIMEOUT, DEFAULT_PRODUCER_TIMEOUT));
- }
-
- public static void buildConsumerConfigs(Properties props, DefaultMQPushConsumer consumer) {
- buildCommonConfigs(props, consumer);
-
- String group = props.getProperty(CONSUMER_GROUP);
- Validate.notEmpty(group);
- consumer.setConsumerGroup(group);
-
- consumer.setPersistConsumerOffsetInterval(getInteger(props,
- CONSUMER_OFFSET_PERSIST_INTERVAL, DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL));
- consumer.setConsumeThreadMin(getInteger(props,
- CONSUMER_MIN_THREADS, DEFAULT_CONSUMER_MIN_THREADS));
- consumer.setConsumeThreadMax(getInteger(props,
- CONSUMER_MAX_THREADS, DEFAULT_CONSUMER_MAX_THREADS));
-
- String initOffset = props.getProperty(CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);
- switch (initOffset) {
- case CONSUMER_OFFSET_EARLIEST:
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- break;
- case CONSUMER_OFFSET_LATEST:
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
- break;
- case CONSUMER_OFFSET_TIMESTAMP:
- consumer.setConsumeTimestamp(initOffset);
- break;
- default:
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
- }
-
- String topic = props.getProperty(CONSUMER_TOPIC);
- Validate.notEmpty(topic);
- try {
- consumer.subscribe(topic, props.getProperty(CONSUMER_TAG, DEFAULT_TAG));
- } catch (MQClientException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- public static void buildCommonConfigs(Properties props, ClientConfig client) {
- String namesvr = props.getProperty(NAME_SERVER_ADDR);
- Validate.notEmpty(namesvr);
- client.setNamesrvAddr(namesvr);
-
- client.setClientIP(props.getProperty(CLIENT_IP, DEFAULT_CLIENT_IP));
- // According to the RocketMQ official docs, "only one instance is allowed per machine"
- // So, we use UUID as the client name by default, to allow RocketMQ spout/bolt instances in one machine.
- String defaultClientName = UUID.randomUUID().toString();
- client.setInstanceName(props.getProperty(CLIENT_NAME, defaultClientName));
-
- client.setClientCallbackExecutorThreads(getInteger(props,
- CLIENT_CALLBACK_EXECUTOR_THREADS, DEFAULT_CLIENT_CALLBACK_EXECUTOR_THREADS));
- client.setPollNameServerInteval(getInteger(props,
- NAME_SERVER_POLL_INTERVAL, DEFAULT_NAME_SERVER_POLL_INTERVAL));
- client.setHeartbeatBrokerInterval(getInteger(props,
- BROKER_HEART_BEAT_INTERVAL, DEFAULT_BROKER_HEART_BEAT_INTERVAL));
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQUtils.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQUtils.java
deleted file mode 100644
index 7cbf749..0000000
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQUtils.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.rocketmq;
-
-import org.apache.rocketmq.common.message.Message;
-import org.apache.storm.rocketmq.spout.scheme.KeyValueScheme;
-import org.apache.storm.spout.Scheme;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.Properties;
-
-public final class RocketMQUtils {
-
- public static int getInteger(Properties props, String key, int defaultValue) {
- return Integer.parseInt(props.getProperty(key, String.valueOf(defaultValue)));
- }
-
- public static boolean getBoolean(Properties props, String key, boolean defaultValue) {
- return Boolean.parseBoolean(props.getProperty(key, String.valueOf(defaultValue)));
- }
-
- public static Scheme createScheme(Properties props) {
- String schemeString = props.getProperty(SpoutConfig.SCHEME, SpoutConfig.DEFAULT_SCHEME);
- Scheme scheme;
- try {
- Class clazz = Class.forName(schemeString);
- scheme = (Scheme)clazz.newInstance();
- } catch (Exception e) {
- throw new IllegalArgumentException("Cannot create Scheme for " + schemeString
- + " due to " + e.getMessage());
- }
- return scheme;
- }
-
- public static List<Object> generateTuples(Message msg, Scheme scheme) {
- List<Object> tup;
- String rawKey = msg.getKeys();
- ByteBuffer body = ByteBuffer.wrap(msg.getBody());
- if (rawKey != null && scheme instanceof KeyValueScheme) {
- ByteBuffer key = ByteBuffer.wrap(rawKey.getBytes(StandardCharsets.UTF_8));
- tup = ((KeyValueScheme)scheme).deserializeKeyAndValue(key, body);
- } else {
- tup = scheme.deserialize(body);
- }
- return tup;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMqConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMqConfig.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMqConfig.java
new file mode 100644
index 0000000..a69aaeb
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMqConfig.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.rocketmq;
+
+import static org.apache.storm.rocketmq.RocketMqUtils.getInteger;
+
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.commons.lang.Validate;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+
+/**
+ * RocketMqConfig for Consumer/Producer.
+ */
+public class RocketMqConfig {
+ // common
+ public static final String NAME_SERVER_ADDR = "nameserver.addr"; // Required
+
+ public static final String CLIENT_NAME = "client.name";
+
+ public static final String CLIENT_IP = "client.ip";
+ public static final String DEFAULT_CLIENT_IP = RemotingUtil.getLocalAddress();
+
+ public static final String CLIENT_CALLBACK_EXECUTOR_THREADS = "client.callback.executor.threads";
+ public static final int DEFAULT_CLIENT_CALLBACK_EXECUTOR_THREADS = Runtime.getRuntime().availableProcessors();
+
+ public static final String NAME_SERVER_POLL_INTERVAL = "nameserver.poll.interval";
+ public static final int DEFAULT_NAME_SERVER_POLL_INTERVAL = 30000; // 30 seconds
+
+ public static final String BROKER_HEART_BEAT_INTERVAL = "brokerserver.heartbeat.interval";
+ public static final int DEFAULT_BROKER_HEART_BEAT_INTERVAL = 30000; // 30 seconds
+
+
+ // producer
+ public static final String PRODUCER_GROUP = "producer.group";
+
+ public static final String PRODUCER_RETRY_TIMES = "producer.retry.times";
+ public static final int DEFAULT_PRODUCER_RETRY_TIMES = 2;
+
+ public static final String PRODUCER_TIMEOUT = "producer.timeout";
+ public static final int DEFAULT_PRODUCER_TIMEOUT = 3000; // 3 seconds
+
+
+ // consumer
+ public static final String CONSUMER_GROUP = "consumer.group"; // Required
+
+ public static final String CONSUMER_TOPIC = "consumer.topic"; // Required
+
+ public static final String CONSUMER_TAG = "consumer.tag";
+ public static final String DEFAULT_TAG = "*";
+
+ public static final String CONSUMER_OFFSET_RESET_TO = "consumer.offset.reset.to";
+ public static final String CONSUMER_OFFSET_LATEST = "latest";
+ public static final String CONSUMER_OFFSET_EARLIEST = "earliest";
+ public static final String CONSUMER_OFFSET_TIMESTAMP = "timestamp";
+
+ public static final String CONSUMER_MESSAGES_ORDERLY = "consumer.messages.orderly";
+
+ public static final String CONSUMER_OFFSET_PERSIST_INTERVAL = "consumer.offset.persist.interval";
+ public static final int DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL = 5000; // 5 seconds
+
+ public static final String CONSUMER_MIN_THREADS = "consumer.min.threads";
+ public static final int DEFAULT_CONSUMER_MIN_THREADS = 20;
+
+ public static final String CONSUMER_MAX_THREADS = "consumer.max.threads";
+ public static final int DEFAULT_CONSUMER_MAX_THREADS = 64;
+
+
+ /**
+ * Build Producer Configs.
+ * @param props Properties
+ * @param producer DefaultMQProducer
+ */
+ public static void buildProducerConfigs(Properties props, DefaultMQProducer producer) {
+ buildCommonConfigs(props, producer);
+
+ // According to the RocketMQ official docs, "only one instance is allowed per producer group"
+ // So, we use UUID as the producer group by default, to allow many producer instances for one topic
+ String defaultGroup = UUID.randomUUID().toString();
+ producer.setProducerGroup(props.getProperty(PRODUCER_GROUP, defaultGroup));
+
+ producer.setRetryTimesWhenSendFailed(getInteger(props,
+ PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
+ producer.setRetryTimesWhenSendAsyncFailed(getInteger(props,
+ PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
+ producer.setSendMsgTimeout(getInteger(props,
+ PRODUCER_TIMEOUT, DEFAULT_PRODUCER_TIMEOUT));
+ }
+
+ /**
+ * Build Consumer Configs.
+ * @param props Properties
+ * @param consumer DefaultMQPushConsumer
+ */
+ public static void buildConsumerConfigs(Properties props, DefaultMQPushConsumer consumer) {
+ buildCommonConfigs(props, consumer);
+
+ String group = props.getProperty(CONSUMER_GROUP);
+ Validate.notEmpty(group);
+ consumer.setConsumerGroup(group);
+
+ consumer.setPersistConsumerOffsetInterval(getInteger(props,
+ CONSUMER_OFFSET_PERSIST_INTERVAL, DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL));
+ consumer.setConsumeThreadMin(getInteger(props,
+ CONSUMER_MIN_THREADS, DEFAULT_CONSUMER_MIN_THREADS));
+ consumer.setConsumeThreadMax(getInteger(props,
+ CONSUMER_MAX_THREADS, DEFAULT_CONSUMER_MAX_THREADS));
+
+ String initOffset = props.getProperty(CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);
+ switch (initOffset) {
+ case CONSUMER_OFFSET_EARLIEST:
+ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ break;
+ case CONSUMER_OFFSET_LATEST:
+ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+ break;
+ case CONSUMER_OFFSET_TIMESTAMP:
+ consumer.setConsumeTimestamp(initOffset);
+ break;
+ default:
+ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+ }
+
+ String topic = props.getProperty(CONSUMER_TOPIC);
+ Validate.notEmpty(topic);
+ try {
+ consumer.subscribe(topic, props.getProperty(CONSUMER_TAG, DEFAULT_TAG));
+ } catch (MQClientException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ /**
+ * Build Common Configs.
+ * @param props Properties
+ * @param client ClientConfig
+ */
+ public static void buildCommonConfigs(Properties props, ClientConfig client) {
+ String namesvr = props.getProperty(NAME_SERVER_ADDR);
+ Validate.notEmpty(namesvr);
+ client.setNamesrvAddr(namesvr);
+
+ client.setClientIP(props.getProperty(CLIENT_IP, DEFAULT_CLIENT_IP));
+ // According to the RocketMQ official docs, "only one instance is allowed per machine"
+ // So, we use UUID as the client name by default, to allow RocketMQ spout/bolt instances in one machine.
+ String defaultClientName = UUID.randomUUID().toString();
+ client.setInstanceName(props.getProperty(CLIENT_NAME, defaultClientName));
+
+ client.setClientCallbackExecutorThreads(getInteger(props,
+ CLIENT_CALLBACK_EXECUTOR_THREADS, DEFAULT_CLIENT_CALLBACK_EXECUTOR_THREADS));
+ client.setPollNameServerInteval(getInteger(props,
+ NAME_SERVER_POLL_INTERVAL, DEFAULT_NAME_SERVER_POLL_INTERVAL));
+ client.setHeartbeatBrokerInterval(getInteger(props,
+ BROKER_HEART_BEAT_INTERVAL, DEFAULT_BROKER_HEART_BEAT_INTERVAL));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMqUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMqUtils.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMqUtils.java
new file mode 100644
index 0000000..f971ae9
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMqUtils.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.rocketmq;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.rocketmq.common.message.Message;
+import org.apache.storm.rocketmq.spout.scheme.KeyValueScheme;
+import org.apache.storm.spout.Scheme;
+
+public final class RocketMqUtils {
+
+ public static int getInteger(Properties props, String key, int defaultValue) {
+ return Integer.parseInt(props.getProperty(key, String.valueOf(defaultValue)));
+ }
+
+ public static boolean getBoolean(Properties props, String key, boolean defaultValue) {
+ return Boolean.parseBoolean(props.getProperty(key, String.valueOf(defaultValue)));
+ }
+
+ /**
+ * Create Scheme by Properties.
+ * @param props Properties
+ * @return Scheme
+ */
+ public static Scheme createScheme(Properties props) {
+ String schemeString = props.getProperty(SpoutConfig.SCHEME, SpoutConfig.DEFAULT_SCHEME);
+ Scheme scheme;
+ try {
+ Class clazz = Class.forName(schemeString);
+ scheme = (Scheme)clazz.newInstance();
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Cannot create Scheme for " + schemeString
+ + " due to " + e.getMessage());
+ }
+ return scheme;
+ }
+
+ /**
+ * Generate Storm tuple values by Message and Scheme.
+ * @param msg RocketMQ Message
+ * @param scheme Scheme for deserializing
+ * @return tuple values
+ */
+ public static List<Object> generateTuples(Message msg, Scheme scheme) {
+ List<Object> tup;
+ String rawKey = msg.getKeys();
+ ByteBuffer body = ByteBuffer.wrap(msg.getBody());
+ if (rawKey != null && scheme instanceof KeyValueScheme) {
+ ByteBuffer key = ByteBuffer.wrap(rawKey.getBytes(StandardCharsets.UTF_8));
+ tup = ((KeyValueScheme)scheme).deserializeKeyAndValue(key, body);
+ } else {
+ tup = scheme.deserialize(body);
+ }
+ return tup;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java
index 05d7f76..5990274 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java
@@ -15,11 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.rocketmq;
import org.apache.storm.rocketmq.spout.scheme.StringScheme;
-public class SpoutConfig extends RocketMQConfig {
+public class SpoutConfig extends RocketMqConfig {
public static final String QUEUE_SIZE = "spout.queue.size";
public static final String MESSAGES_MAX_RETRY = "spout.messages.max.retry";
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java
deleted file mode 100644
index 3932381..0000000
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.rocketmq.bolt;
-
-import org.apache.commons.lang.Validate;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.MQProducer;
-import org.apache.rocketmq.client.producer.SendCallback;
-import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.storm.rocketmq.RocketMQConfig;
-import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper;
-import org.apache.storm.rocketmq.common.selector.TopicSelector;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.IRichBolt;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.Properties;
-
-public class RocketMQBolt implements IRichBolt {
- private static final Logger LOG = LoggerFactory.getLogger(RocketMQBolt.class);
-
- private static MQProducer producer;
- private OutputCollector collector;
- private boolean async = true;
- private TopicSelector selector;
- private TupleToMessageMapper mapper;
- private Properties properties;
-
- @Override
- public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
- Validate.notEmpty(properties, "Producer properties can not be empty");
-
- // Since RocketMQ Producer is thread-safe, RocketMQBolt uses a single
- // producer instance across threads to improve the performance.
- synchronized (RocketMQBolt.class) {
- if (producer == null) {
- producer = new DefaultMQProducer();
- RocketMQConfig.buildProducerConfigs(properties, (DefaultMQProducer)producer);
-
- try {
- producer.start();
- } catch (MQClientException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- this.collector = collector;
-
- Validate.notNull(selector, "TopicSelector can not be null");
- Validate.notNull(mapper, "TupleToMessageMapper can not be null");
- }
-
- public RocketMQBolt withSelector(TopicSelector selector) {
- this.selector = selector;
- return this;
- }
-
- public RocketMQBolt withMapper(TupleToMessageMapper mapper) {
- this.mapper = mapper;
- return this;
- }
-
- public RocketMQBolt withAsync(boolean async) {
- this.async = async;
- return this;
- }
-
- public RocketMQBolt withProperties(Properties properties) {
- this.properties = properties;
- return this;
- }
-
- @Override
- public void execute(Tuple input) {
- // Mapping: from storm tuple -> rocketmq Message
- String topic = selector.getTopic(input);
- String tag = selector.getTag(input);
- String key = mapper.getKeyFromTuple(input);
- byte[] value = mapper.getValueFromTuple(input);
-
- if (topic == null) {
- LOG.warn("skipping Message with Key = " + key + ", topic selector returned null.");
- collector.ack(input);
- return;
- }
-
- Message msg = new Message(topic,tag, key, value);
-
- try {
- if (async) {
- // async sending
- producer.send(msg, new SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
- collector.ack(input);
- }
-
- @Override
- public void onException(Throwable throwable) {
- if (throwable != null) {
- collector.reportError(throwable);
- collector.fail(input);
- }
- }
- });
- } else {
- // sync sending, will return a SendResult
- producer.send(msg);
- collector.ack(input);
- }
- } catch (Exception e) {
- collector.reportError(e);
- collector.fail(input);
- }
-
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-
- @Override
- public void cleanup() {
- synchronized (RocketMQBolt.class) {
- if (producer != null) {
- producer.shutdown();
- producer = null;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMqBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMqBolt.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMqBolt.java
new file mode 100644
index 0000000..e945334
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMqBolt.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.rocketmq.bolt;
+
+import java.util.Map;
+import java.util.Properties;
+import org.apache.commons.lang.Validate;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.MQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.storm.rocketmq.RocketMqConfig;
+import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper;
+import org.apache.storm.rocketmq.common.selector.TopicSelector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocketMqBolt implements IRichBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(RocketMqBolt.class);
+
+ private static MQProducer producer;
+ private OutputCollector collector;
+ private boolean async = true;
+ private TopicSelector selector;
+ private TupleToMessageMapper mapper;
+ private Properties properties;
+
+ @Override
+ public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
+ Validate.notEmpty(properties, "Producer properties can not be empty");
+
+ // Since RocketMQ Producer is thread-safe, RocketMQBolt uses a single
+ // producer instance across threads to improve the performance.
+ synchronized (RocketMqBolt.class) {
+ if (producer == null) {
+ producer = new DefaultMQProducer();
+ RocketMqConfig.buildProducerConfigs(properties, (DefaultMQProducer)producer);
+
+ try {
+ producer.start();
+ } catch (MQClientException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ this.collector = collector;
+
+ Validate.notNull(selector, "TopicSelector can not be null");
+ Validate.notNull(mapper, "TupleToMessageMapper can not be null");
+ }
+
+ public RocketMqBolt withSelector(TopicSelector selector) {
+ this.selector = selector;
+ return this;
+ }
+
+ public RocketMqBolt withMapper(TupleToMessageMapper mapper) {
+ this.mapper = mapper;
+ return this;
+ }
+
+ public RocketMqBolt withAsync(boolean async) {
+ this.async = async;
+ return this;
+ }
+
+ public RocketMqBolt withProperties(Properties properties) {
+ this.properties = properties;
+ return this;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ // Mapping: from storm tuple -> rocketmq Message
+ String topic = selector.getTopic(input);
+ String tag = selector.getTag(input);
+ String key = mapper.getKeyFromTuple(input);
+ byte[] value = mapper.getValueFromTuple(input);
+
+ if (topic == null) {
+ LOG.warn("skipping Message with Key = " + key + ", topic selector returned null.");
+ collector.ack(input);
+ return;
+ }
+
+ Message msg = new Message(topic,tag, key, value);
+
+ try {
+ if (async) {
+ // async sending
+ producer.send(msg, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ collector.ack(input);
+ }
+
+ @Override
+ public void onException(Throwable throwable) {
+ if (throwable != null) {
+ collector.reportError(throwable);
+ collector.fail(input);
+ }
+ }
+ });
+ } else {
+ // sync sending, will return a SendResult
+ producer.send(msg);
+ collector.ack(input);
+ }
+ } catch (Exception e) {
+ collector.reportError(e);
+ collector.fail(input);
+ }
+
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+ @Override
+ public void cleanup() {
+ synchronized (RocketMqBolt.class) {
+ if (producer != null) {
+ producer.shutdown();
+ producer = null;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/FieldNameBasedTupleToMessageMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/FieldNameBasedTupleToMessageMapper.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/FieldNameBasedTupleToMessageMapper.java
index 622cbdb..a38a365 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/FieldNameBasedTupleToMessageMapper.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/FieldNameBasedTupleToMessageMapper.java
@@ -15,14 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.rocketmq.common.mapper;
import org.apache.storm.rocketmq.DefaultMessageBodySerializer;
import org.apache.storm.rocketmq.MessageBodySerializer;
import org.apache.storm.tuple.ITuple;
-import java.nio.charset.StandardCharsets;
-
public class FieldNameBasedTupleToMessageMapper implements TupleToMessageMapper {
public static final String BOLT_KEY = "key";
public static final String BOLT_MESSAGE = "message";
@@ -34,6 +33,11 @@ public class FieldNameBasedTupleToMessageMapper implements TupleToMessageMapper
this(BOLT_KEY, BOLT_MESSAGE);
}
+ /**
+ * FieldNameBasedTupleToMessageMapper Constructor.
+ * @param boltKeyField tuple field for selecting the key
+ * @param boltMessageField tuple field for selecting the value
+ */
public FieldNameBasedTupleToMessageMapper(String boltKeyField, String boltMessageField) {
this.boltKeyField = boltKeyField;
this.boltMessageField = boltMessageField;
@@ -55,9 +59,9 @@ public class FieldNameBasedTupleToMessageMapper implements TupleToMessageMapper
}
/**
- * using this method can override the default MessageBodySerializer
- * @param serializer
- * @return
+ * using this method can override the default MessageBodySerializer.
+ * @param serializer MessageBodySerializer
+ * @return this object
*/
public FieldNameBasedTupleToMessageMapper withMessageBodySerializer(MessageBodySerializer serializer) {
this.messageBodySerializer = serializer;
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/TupleToMessageMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/TupleToMessageMapper.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/TupleToMessageMapper.java
index 84ff4a2..ef716a4 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/TupleToMessageMapper.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/TupleToMessageMapper.java
@@ -15,16 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.storm.rocketmq.common.mapper;
-import org.apache.storm.tuple.ITuple;
+package org.apache.storm.rocketmq.common.mapper;
import java.io.Serializable;
+import org.apache.storm.tuple.ITuple;
/**
* Interface defining a mapping from storm tuple to rocketmq key and message.
*/
public interface TupleToMessageMapper extends Serializable {
+
String getKeyFromTuple(ITuple tuple);
+
byte[] getValueFromTuple(ITuple tuple);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/DefaultTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/DefaultTopicSelector.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/DefaultTopicSelector.java
index 5332036..f45bdf0 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/DefaultTopicSelector.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/DefaultTopicSelector.java
@@ -15,9 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.rocketmq.common.selector;
-import org.apache.storm.rocketmq.RocketMQConfig;
+import org.apache.storm.rocketmq.RocketMqConfig;
import org.apache.storm.tuple.ITuple;
public class DefaultTopicSelector implements TopicSelector {
@@ -30,7 +31,7 @@ public class DefaultTopicSelector implements TopicSelector {
}
public DefaultTopicSelector(final String topicName) {
- this(topicName, RocketMQConfig.DEFAULT_TAG);
+ this(topicName, RocketMqConfig.DEFAULT_TAG);
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/FieldNameBasedTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/FieldNameBasedTopicSelector.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/FieldNameBasedTopicSelector.java
index 60865c1..253221e 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/FieldNameBasedTopicSelector.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/FieldNameBasedTopicSelector.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.rocketmq.common.selector;
import org.apache.storm.tuple.ITuple;
@@ -33,7 +34,13 @@ public class FieldNameBasedTopicSelector implements TopicSelector {
private final String tagFieldName;
private final String defaultTagName;
-
+ /**
+ * FieldNameBasedTopicSelector Constructor.
+ * @param topicFieldName field name used for selecting topic
+ * @param defaultTopicName default field name used for selecting topic
+ * @param tagFieldName field name used for selecting tag
+ * @param defaultTagName default field name used for selecting tag
+ */
public FieldNameBasedTopicSelector(String topicFieldName, String defaultTopicName, String tagFieldName, String defaultTagName) {
this.topicFieldName = topicFieldName;
this.defaultTopicName = defaultTopicName;
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/TopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/TopicSelector.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/TopicSelector.java
index f33d4a6..2940920 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/TopicSelector.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/TopicSelector.java
@@ -15,13 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.storm.rocketmq.common.selector;
-import org.apache.storm.tuple.ITuple;
+package org.apache.storm.rocketmq.common.selector;
import java.io.Serializable;
+import org.apache.storm.tuple.ITuple;
public interface TopicSelector extends Serializable {
+
String getTopic(ITuple tuple);
+
String getTag(ITuple tuple);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
deleted file mode 100644
index d4dfc61..0000000
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.rocketmq.spout;
-
-import org.apache.commons.lang.Validate;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.MQPushConsumer;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
-import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.storm.Config;
-import org.apache.storm.rocketmq.ConsumerMessage;
-import org.apache.storm.rocketmq.DefaultMessageRetryManager;
-import org.apache.storm.rocketmq.MessageRetryManager;
-import org.apache.storm.rocketmq.RocketMQConfig;
-import org.apache.storm.rocketmq.RocketMQUtils;
-import org.apache.storm.rocketmq.SpoutConfig;
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.IRichSpout;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.utils.ObjectReader;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import static org.apache.storm.rocketmq.RocketMQUtils.getBoolean;
-import static org.apache.storm.rocketmq.RocketMQUtils.getInteger;
-
-/**
- * RocketMQSpout uses MQPushConsumer as the default implementation.
- * PushConsumer is a high level consumer API, wrapping the pulling details
- * Looks like broker push messages to consumer
- */
-public class RocketMQSpout implements IRichSpout {
- // TODO add metrics
-
- private static MQPushConsumer consumer;
- private SpoutOutputCollector collector;
- private BlockingQueue<ConsumerMessage> queue;
- private BlockingQueue<ConsumerMessage> pending;
-
- private Properties properties;
- private MessageRetryManager messageRetryManager;
- private Scheme scheme;
-
- public RocketMQSpout(Properties properties) {
- Validate.notEmpty(properties, "Consumer properties can not be empty");
- this.properties = properties;
- scheme = RocketMQUtils.createScheme(properties);
- }
-
- @Override
- public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
- // Since RocketMQ Consumer is thread-safe, RocketMQSpout uses a single
- // consumer instance across threads to improve the performance.
- synchronized (RocketMQSpout.class) {
- if (consumer == null) {
- buildAndStartConsumer();
- }
- }
-
- int queueSize = getInteger(properties, SpoutConfig.QUEUE_SIZE, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING)));
- queue = new LinkedBlockingQueue<>(queueSize);
- pending = new LinkedBlockingQueue<>(queueSize);
- int maxRetry = getInteger(properties, SpoutConfig.MESSAGES_MAX_RETRY, SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY);
- int ttl = getInteger(properties, SpoutConfig.MESSAGES_TTL, SpoutConfig.DEFAULT_MESSAGES_TTL);
-
- this.messageRetryManager = new DefaultMessageRetryManager(queue, maxRetry, ttl);
- this.collector = collector;
- }
-
- protected void buildAndStartConsumer() {
- consumer = new DefaultMQPushConsumer();
- RocketMQConfig.buildConsumerConfigs(properties, (DefaultMQPushConsumer)consumer);
-
- boolean ordered = getBoolean(properties, RocketMQConfig.CONSUMER_MESSAGES_ORDERLY, false);
- if (ordered) {
- consumer.registerMessageListener(new MessageListenerOrderly() {
- @Override
- public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeOrderlyContext context) {
- if (process(msgs)) {
- return ConsumeOrderlyStatus.SUCCESS;
- } else {
- return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
- }
- }
- });
- } else {
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
- if (process(msgs)) {
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- } else {
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- }
- });
- }
-
- try {
- consumer.start();
- } catch (MQClientException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * process pushed messages
- * @param msgs
- * @return
- */
- protected boolean process(List<MessageExt> msgs) {
- if (msgs.isEmpty()) {
- return true;
- }
-
- boolean notFull = true;
- for (MessageExt msg : msgs) {
- ConsumerMessage message = new ConsumerMessage(msg);
- // returning true upon success and false if this queue is full.
- if(!queue.offer(message)){
- notFull = false;
- pending.offer(message);
- }
- }
- return notFull;
- }
-
- @Override
- public void nextTuple() {
- ConsumerMessage message;
- if (!pending.isEmpty()) {
- message = pending.poll();
- } else {
- message = queue.poll();
- }
-
- if (message == null) {
- return;
- }
-
- messageRetryManager.mark(message);
- List<Object> tup = RocketMQUtils.generateTuples(message.getData(), scheme);
- if (tup != null) {
- collector.emit(tup, message.getId());
- }
- }
-
- @Override
- public void ack(Object msgId) {
- String id = msgId.toString();
- messageRetryManager.ack(id);
- }
-
- @Override
- public void fail(Object msgId) {
- String id = msgId.toString();
- messageRetryManager.fail(id);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(scheme.getOutputFields());
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-
- @Override
- public void close() {
- synchronized (RocketMQSpout.class) {
- if (consumer != null) {
- consumer.shutdown();
- consumer = null;
- }
- }
- }
-
- @Override
- public void activate() {
- consumer.resume();
- }
-
- @Override
- public void deactivate() {
- consumer.suspend();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMqSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMqSpout.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMqSpout.java
new file mode 100644
index 0000000..cebc230
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMqSpout.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.rocketmq.spout;
+
+import static org.apache.storm.rocketmq.RocketMqUtils.getBoolean;
+import static org.apache.storm.rocketmq.RocketMqUtils.getInteger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.lang.Validate;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.MQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.storm.Config;
+import org.apache.storm.rocketmq.ConsumerMessage;
+import org.apache.storm.rocketmq.DefaultMessageRetryManager;
+import org.apache.storm.rocketmq.MessageRetryManager;
+import org.apache.storm.rocketmq.RocketMqConfig;
+import org.apache.storm.rocketmq.RocketMqUtils;
+import org.apache.storm.rocketmq.SpoutConfig;
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.utils.ObjectReader;
+
+/**
+ * RocketMqSpout uses MQPushConsumer as the default implementation.
+ * PushConsumer is a high level consumer API, wrapping the pulling details
+ * Looks like broker push messages to consumer
+ */
+public class RocketMqSpout implements IRichSpout {
+ // TODO add metrics
+
+ private static MQPushConsumer consumer;
+ private SpoutOutputCollector collector;
+ private BlockingQueue<ConsumerMessage> queue;
+ private BlockingQueue<ConsumerMessage> pending;
+
+ private Properties properties;
+ private MessageRetryManager messageRetryManager;
+ private Scheme scheme;
+
+ /**
+ * RocketMqSpout Constructor.
+ * @param properties Properties Config
+ */
+ public RocketMqSpout(Properties properties) {
+ Validate.notEmpty(properties, "Consumer properties can not be empty");
+ this.properties = properties;
+ scheme = RocketMqUtils.createScheme(properties);
+ }
+
+ @Override
+ public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
+ // Since RocketMQ Consumer is thread-safe, RocketMQSpout uses a single
+ // consumer instance across threads to improve the performance.
+ synchronized (RocketMqSpout.class) {
+ if (consumer == null) {
+ buildAndStartConsumer();
+ }
+ }
+
+ int queueSize = getInteger(properties, SpoutConfig.QUEUE_SIZE, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING)));
+ queue = new LinkedBlockingQueue<>(queueSize);
+ pending = new LinkedBlockingQueue<>(queueSize);
+ int maxRetry = getInteger(properties, SpoutConfig.MESSAGES_MAX_RETRY, SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY);
+ int ttl = getInteger(properties, SpoutConfig.MESSAGES_TTL, SpoutConfig.DEFAULT_MESSAGES_TTL);
+
+ this.messageRetryManager = new DefaultMessageRetryManager(queue, maxRetry, ttl);
+ this.collector = collector;
+ }
+
+ protected void buildAndStartConsumer() {
+ consumer = new DefaultMQPushConsumer();
+ RocketMqConfig.buildConsumerConfigs(properties, (DefaultMQPushConsumer)consumer);
+
+ boolean ordered = getBoolean(properties, RocketMqConfig.CONSUMER_MESSAGES_ORDERLY, false);
+ if (ordered) {
+ consumer.registerMessageListener(new MessageListenerOrderly() {
+ @Override
+ public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
+ ConsumeOrderlyContext context) {
+ if (process(msgs)) {
+ return ConsumeOrderlyStatus.SUCCESS;
+ } else {
+ return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+ }
+ }
+ });
+ } else {
+ consumer.registerMessageListener(new MessageListenerConcurrently() {
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+ ConsumeConcurrentlyContext context) {
+ if (process(msgs)) {
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ } else {
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+ }
+ }
+ });
+ }
+
+ try {
+ consumer.start();
+ } catch (MQClientException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Process pushed messages.
+ * @param msgs messages
+ * @return the boolean flag processed result
+ */
+ protected boolean process(List<MessageExt> msgs) {
+ if (msgs.isEmpty()) {
+ return true;
+ }
+
+ boolean notFull = true;
+ for (MessageExt msg : msgs) {
+ ConsumerMessage message = new ConsumerMessage(msg);
+ // returning true upon success and false if this queue is full.
+ if (!queue.offer(message)) {
+ notFull = false;
+ pending.offer(message);
+ }
+ }
+ return notFull;
+ }
+
+ @Override
+ public void nextTuple() {
+ ConsumerMessage message;
+ if (!pending.isEmpty()) {
+ message = pending.poll();
+ } else {
+ message = queue.poll();
+ }
+
+ if (message == null) {
+ return;
+ }
+
+ messageRetryManager.mark(message);
+ List<Object> tup = RocketMqUtils.generateTuples(message.getData(), scheme);
+ if (tup != null) {
+ collector.emit(tup, message.getId());
+ }
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ String id = msgId.toString();
+ messageRetryManager.ack(id);
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ String id = msgId.toString();
+ messageRetryManager.fail(id);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(scheme.getOutputFields());
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+ @Override
+ public void close() {
+ synchronized (RocketMqSpout.class) {
+ if (consumer != null) {
+ consumer.shutdown();
+ consumer = null;
+ }
+ }
+ }
+
+ @Override
+ public void activate() {
+ consumer.resume();
+ }
+
+ @Override
+ public void deactivate() {
+ consumer.suspend();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/KeyValueScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/KeyValueScheme.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/KeyValueScheme.java
index e066784..f4cf803 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/KeyValueScheme.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/KeyValueScheme.java
@@ -15,12 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.storm.rocketmq.spout.scheme;
-import org.apache.storm.spout.Scheme;
+package org.apache.storm.rocketmq.spout.scheme;
import java.nio.ByteBuffer;
import java.util.List;
+import org.apache.storm.spout.Scheme;
public interface KeyValueScheme extends Scheme {
List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value);