You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "Alexandre Vermeerbergen (JIRA)" <ji...@apache.org> on 2018/02/01 21:47:00 UTC

[jira] [Comment Edited] (STORM-2914) Remove enable.auto.commit support from storm-kafka-client

    [ https://issues.apache.org/jira/browse/STORM-2914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16349315#comment-16349315 ] 

Alexandre Vermeerbergen edited comment on STORM-2914 at 2/1/18 9:46 PM:
------------------------------------------------------------------------

Hello [~pshah] and [~Srdo],

Regarding storm-kafka-monitoring*.jar : thanks for your explanation, it was indeed already in our toollib/ directory from Storm 1.2.0RC2:

{{[root@ip-172-31-18-84 storm-stable]# ls toollib/}}
 {{storm-kafka-monitor-1.2.0.jar  storm-kafka-monitor-1.2.0-javadoc.jar  storm-kafka-monitor-1.2.0-sources.jar}}

But since I was no longer seeing Kafka spout stats in Nimbus UI's log, and it seem that this was broken because of some missing class issue (very strange that it's trying to load a class from *javadoc.jar" file, isn't it ?) :

{{org.apache.storm.utils.ShellUtils$ExitCodeException: Error: Could not find or load main class .usr.local.Storm.storm-stable.toollib.storm-kafka-monitor-1.2.0-javadoc.jar}}

{{        at org.apache.storm.utils.ShellUtils.runCommand(ShellUtils.java:231) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at org.apache.storm.utils.ShellUtils.run(ShellUtils.java:161) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at org.apache.storm.utils.ShellUtils$ShellCommandExecutor.execute(ShellUtils.java:371) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at org.apache.storm.utils.ShellUtils.execCommand(ShellUtils.java:461) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at org.apache.storm.utils.ShellUtils.execCommand(ShellUtils.java:444) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at org.apache.storm.utils.TopologySpoutLag.getLagResultForKafka(TopologySpoutLag.java:163) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at org.apache.storm.utils.TopologySpoutLag.getLagResultForNewKafkaSpout(TopologySpoutLag.java:189) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at org.apache.storm.utils.TopologySpoutLag.lag(TopologySpoutLag.java:57) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at org.apache.storm.ui.core$topology_lag.invoke(core.clj:805) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at org.apache.storm.ui.core$fn__9572.invoke(core.clj:1165) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at org.apache.storm.shade.compojure.core$make_route$fn__5965.invoke(core.clj:100) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at org.apache.storm.shade.compojure.core$if_route$fn__5953.invoke(core.clj:46) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at org.apache.storm.shade.compojure.core$if_method$fn__5946.invoke(core.clj:31) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at org.apache.storm.shade.compojure.core$routing$fn__5971.invoke(core.clj:113) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at clojure.core$some.invoke(core.clj:2570) ~[clojure-1.7.0.jar:?]}}
 {{        at org.apache.storm.shade.compojure.core$routing.doInvoke(core.clj:113) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at clojure.lang.RestFn.applyTo(RestFn.java:139) ~[clojure-1.7.0.jar:?]}}
 {{        at clojure.core$apply.invoke(core.clj:632) ~[clojure-1.7.0.jar:?]}}
 {{        at org.apache.storm.shade.compojure.core$routes$fn__5975.invoke(core.clj:118) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at org.apache.storm.shade.ring.middleware.cors$wrap_cors$fn__8880.invoke(cors.clj:149) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at org.apache.storm.shade.ring.middleware.json$wrap_json_params$fn__8827.invoke(json.clj:56) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at org.apache.storm.shade.ring.middleware.multipart_params$wrap_multipart_params$fn__6607.invoke(multipart_params.clj:118) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at org.apache.storm.shade.ring.middleware.reload$wrap_reload$fn__7890.invoke(reload.clj:22) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at org.apache.storm.ui.helpers$requests_middleware$fn__6860.invoke(helpers.clj:52) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at org.apache.storm.ui.core$catch_errors$fn__9747.invoke(core.clj:1428) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at org.apache.storm.shade.ring.middleware.keyword_params$wrap_keyword_params$fn__6527.invoke(keyword_params.clj:35) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at org.apache.storm.shade.ring.middleware.nested_params$wrap_nested_params$fn__6570.invoke(nested_params.clj:84) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at org.apache.storm.shade.ring.middleware.params$wrap_params$fn__6499.invoke(params.clj:64) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at org.apache.storm.shade.ring.middleware.multipart_params$wrap_multipart_params$fn__6607.invoke(multipart_params.clj:118) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at org.apache.storm.shade.ring.middleware.flash$wrap_flash$fn__6822.invoke(flash.clj:35) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at org.apache.storm.shade.ring.middleware.session$wrap_session$fn__6808.invoke(session.clj:98) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at org.apache.storm.shade.ring.util.servlet$make_service_method$fn__6357.invoke(servlet.clj:127) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at org.apache.storm.shade.ring.util.servlet$servlet$fn__6361.invoke(servlet.clj:136) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at org.apache.storm.shade.ring.util.servlet.proxy$javax.servlet.http.HttpServlet$ff19274a.service(Unknown Source) ~[storm-core-1.2.0.jar:1.2.0]}}
 {{        at org.apache.storm.shade.org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:654) ~[storm-core-1.2.0.jar:1.2.0]}}

Now in the .jar files built using Stig's 1.x branch, I see that there are two different versions of storm-kafka-monitor Jar file:

{{./storm-STORM-2914-2913-1.x/external/storm-kafka-monitor/target/storm-kafka-monitor-1.2.1-SNAPSHOT.jar}}
 {{./storm-STORM-2914-2913-1.x/external/storm-kafka-monitor/target/original-storm-kafka-monitor-1.2.1-SNAPSHOT.jar}}

which one would be the "right" one? what's the difference between them?

+Note:+ I couldn't resist so I tried removing everything from toollib/, then I copied {{storm-kafka-monitor-1.2.1-SNAPSHOT.jar}} into it and then I restarted everything (including Nimbus UI) => the metrics on Kafka spout are back in Nimbus UI :)  but why was it broken by default in Storm 1.2.0 RC2 ?

 [~Srdo]:

The WARN messages seems now gone, so from my perspective it confirms that https://issues.apache.org/jira/browse/STORM-2913 is solved in your 1.x branch, thanks!

Regarding your proposal to compare compare NONE and AT_MOST_ONCE with my load: why not, but then please give me clear instructions on how to setup my Kafka spout, so I won't do any mistake in this setup (and I guess some output will be printed to confirm the setup).

Best regards,

Alexandre Vermeerbergen

 


was (Author: avermeerbergen):
Hello [~Srdo]

Regarding storm-kafka-monitoring*.jar : thanks for your explanation, it was indeed already in our toollib/ directory from Storm 1.2.0RC2:

{{[root@ip-172-31-18-84 storm-stable]# ls toollib/}}
{{storm-kafka-monitor-1.2.0.jar  storm-kafka-monitor-1.2.0-javadoc.jar  storm-kafka-monitor-1.2.0-sources.jar}}

But since I was no longer seeing Kafka spout stats in Nimbus UI's log, and it seem that this was broken because of some missing class issue (very strange that it's trying to load a class from *javadoc.jar" file, isn't it ?) :

{{org.apache.storm.utils.ShellUtils$ExitCodeException: Error: Could not find or load main class .usr.local.Storm.storm-stable.toollib.storm-kafka-monitor-1.2.0-javadoc.jar}}

{{        at org.apache.storm.utils.ShellUtils.runCommand(ShellUtils.java:231) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at org.apache.storm.utils.ShellUtils.run(ShellUtils.java:161) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at org.apache.storm.utils.ShellUtils$ShellCommandExecutor.execute(ShellUtils.java:371) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at org.apache.storm.utils.ShellUtils.execCommand(ShellUtils.java:461) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at org.apache.storm.utils.ShellUtils.execCommand(ShellUtils.java:444) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at org.apache.storm.utils.TopologySpoutLag.getLagResultForKafka(TopologySpoutLag.java:163) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at org.apache.storm.utils.TopologySpoutLag.getLagResultForNewKafkaSpout(TopologySpoutLag.java:189) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at org.apache.storm.utils.TopologySpoutLag.lag(TopologySpoutLag.java:57) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at org.apache.storm.ui.core$topology_lag.invoke(core.clj:805) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at org.apache.storm.ui.core$fn__9572.invoke(core.clj:1165) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at org.apache.storm.shade.compojure.core$make_route$fn__5965.invoke(core.clj:100) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at org.apache.storm.shade.compojure.core$if_route$fn__5953.invoke(core.clj:46) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at org.apache.storm.shade.compojure.core$if_method$fn__5946.invoke(core.clj:31) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at org.apache.storm.shade.compojure.core$routing$fn__5971.invoke(core.clj:113) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at clojure.core$some.invoke(core.clj:2570) ~[clojure-1.7.0.jar:?]}}
{{        at org.apache.storm.shade.compojure.core$routing.doInvoke(core.clj:113) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at clojure.lang.RestFn.applyTo(RestFn.java:139) ~[clojure-1.7.0.jar:?]}}
{{        at clojure.core$apply.invoke(core.clj:632) ~[clojure-1.7.0.jar:?]}}
{{        at org.apache.storm.shade.compojure.core$routes$fn__5975.invoke(core.clj:118) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at org.apache.storm.shade.ring.middleware.cors$wrap_cors$fn__8880.invoke(cors.clj:149) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at org.apache.storm.shade.ring.middleware.json$wrap_json_params$fn__8827.invoke(json.clj:56) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at org.apache.storm.shade.ring.middleware.multipart_params$wrap_multipart_params$fn__6607.invoke(multipart_params.clj:118) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at org.apache.storm.shade.ring.middleware.reload$wrap_reload$fn__7890.invoke(reload.clj:22) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at org.apache.storm.ui.helpers$requests_middleware$fn__6860.invoke(helpers.clj:52) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at org.apache.storm.ui.core$catch_errors$fn__9747.invoke(core.clj:1428) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at org.apache.storm.shade.ring.middleware.keyword_params$wrap_keyword_params$fn__6527.invoke(keyword_params.clj:35) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at org.apache.storm.shade.ring.middleware.nested_params$wrap_nested_params$fn__6570.invoke(nested_params.clj:84) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at org.apache.storm.shade.ring.middleware.params$wrap_params$fn__6499.invoke(params.clj:64) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at org.apache.storm.shade.ring.middleware.multipart_params$wrap_multipart_params$fn__6607.invoke(multipart_params.clj:118) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at org.apache.storm.shade.ring.middleware.flash$wrap_flash$fn__6822.invoke(flash.clj:35) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at org.apache.storm.shade.ring.middleware.session$wrap_session$fn__6808.invoke(session.clj:98) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at org.apache.storm.shade.ring.util.servlet$make_service_method$fn__6357.invoke(servlet.clj:127) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at org.apache.storm.shade.ring.util.servlet$servlet$fn__6361.invoke(servlet.clj:136) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at org.apache.storm.shade.ring.util.servlet.proxy$javax.servlet.http.HttpServlet$ff19274a.service(Unknown Source) ~[storm-core-1.2.0.jar:1.2.0]}}
{{        at org.apache.storm.shade.org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:654) ~[storm-core-1.2.0.jar:1.2.0]}}

Now in the .jar files built using your 1.x branch, I see that there are two different version of storm-kafka-monitor Jar file:

{{./storm-STORM-2914-2913-1.x/external/storm-kafka-monitor/target/storm-kafka-monitor-1.2.1-SNAPSHOT.jar}}
{{./storm-STORM-2914-2913-1.x/external/storm-kafka-monitor/target/original-storm-kafka-monitor-1.2.1-SNAPSHOT.jar}}

which one would be the "right" one? what's the difference between them?

Regarding your proposal to compare compare NONE and AT_MOST_ONCE with my load: why not, but then please give me clear instructions on how to setup my Kafka spout, so I won't do any mistake in this setup (and I guess some output will be printed to confirm the setup).

Best regards,

Alexandre Vermeerbergen

 

> Remove enable.auto.commit support from storm-kafka-client
> ---------------------------------------------------------
>
>                 Key: STORM-2914
>                 URL: https://issues.apache.org/jira/browse/STORM-2914
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-kafka-client
>    Affects Versions: 2.0.0, 1.2.0
>            Reporter: Stig Rohde Døssing
>            Assignee: Stig Rohde Døssing
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> The enable.auto.commit option causes the KafkaConsumer to periodically commit the latest offsets it has returned from poll(). It is convenient for use cases where messages are polled from Kafka and processed synchronously, in a loop. 
> Due to https://issues.apache.org/jira/browse/STORM-2913 we'd really like to store some metadata in Kafka when the spout commits. This is not possible with enable.auto.commit. I took at look at what that setting actually does, and it just causes the KafkaConsumer to call commitAsync during poll (and during a few other operations, e.g. close and assign) with some interval. 
> Ideally I'd like to get rid of ProcessingGuarantee.NONE, since I think ProcessingGuarantee.AT_MOST_ONCE covers the same use cases, and is likely almost as fast. The primary difference between them is that AT_MOST_ONCE commits synchronously.
> If we really want to keep ProcessingGuarantee.NONE, I think we should make our ProcessingGuarantee.NONE setting cause the spout to call commitAsync after poll, and never use the enable.auto.commit option. This allows us to include metadata in the commit.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)