You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2014/11/04 13:43:34 UTC
[jira] [Commented] (STORM-378) SleepSpoutWaitStrategy.emptyEmit
should use the variable "streak"
[ https://issues.apache.org/jira/browse/STORM-378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14196059#comment-14196059 ]
ASF GitHub Bot commented on STORM-378:
--------------------------------------
Github user clockfly commented on the pull request:
https://github.com/apache/storm/pull/295#issuecomment-61634029
To make the whole topology responsive, the spout need to stay active to pull data frequently from acker or system tick.
When setting "topology.sleep.spout.wait.strategy.time.ms" to 1 ms, it should be good enough, the system load is relatively small.
What is the motivation to make it increasing?
> SleepSpoutWaitStrategy.emptyEmit should use the variable "streak"
> ------------------------------------------------------------------
>
> Key: STORM-378
> URL: https://issues.apache.org/jira/browse/STORM-378
> Project: Apache Storm
> Issue Type: Bug
> Affects Versions: 0.9.2-incubating
> Reporter: caofangkun
> Priority: Minor
>
> {code:java}
> Index: src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java
> ===================================================================
> --- src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java (revision 2868)
> +++ src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java (working copy)
> @@ -18,6 +18,8 @@
> package backtype.storm.spout;
>
> import backtype.storm.Config;
> +import backtype.storm.utils.Utils;
> +
> import java.util.Map;
>
>
> @@ -27,13 +29,14 @@
>
> @Override
> public void prepare(Map conf) {
> - sleepMillis = ((Number) conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS)).longValue();
> + sleepMillis = Utils.getLong(
> + conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS), 500);
> }
>
> @Override
> public void emptyEmit(long streak) {
> try {
> - Thread.sleep(sleepMillis);
> + Thread.sleep(Math.abs(sleepMillis + streak));
> } catch (InterruptedException e) {
> throw new RuntimeException(e);
> }
> Index: src/jvm/backtype/storm/utils/Utils.java
> ===================================================================
> --- src/jvm/backtype/storm/utils/Utils.java (revision 2888)
> +++ src/jvm/backtype/storm/utils/Utils.java (working copy)
> @@ -325,6 +325,24 @@
> throw new IllegalArgumentException("Don't know how to convert " + o + " + to int");
> }
> }
> +
> + public static Long getLong(Object o, long defaultValue) {
> +
> + if (o == null) {
> + return defaultValue;
> + }
> +
> + if (o instanceof String) {
> + return Long.valueOf(String.valueOf(o));
> + } else if (o instanceof Integer) {
> + Integer value = (Integer) o;
> + return Long.valueOf((Integer) value);
> + } else if (o instanceof Long) {
> + return (Long) o;
> + } else {
> + return defaultValue;
> + }
> + }
>
> public static boolean getBoolean(Object o, boolean defaultValue) {
> if (null == o) {
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)