You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "liangxp (JIRA)" <ji...@apache.org> on 2018/10/27 09:39:00 UTC

[jira] [Updated] (STORM-3277) Flux ues in storm-kafka-client not work?

     [ https://issues.apache.org/jira/browse/STORM-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

liangxp updated STORM-3277:
---------------------------
    Environment: CentOS7
    Description: 
I use yaml like this :
{code:java}
//代码占位符
# Topology定义
name: "testTopology"
config:
  topology.workers: 1
components:
  - id: "spoutConfigBuilder"
    className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder"
    #contructorArgs 是一个列表,其元素是对象
    constructorArgs:
      # bootstrapServers
      - "10.7.3.45:9092"
      # topics
      - ["test-topic"]
    #查找class中setter函数进行set此propertie name对应值
    properties:
      - name: "firstPollOffsetStrategy"
        value: EARLIEST
      - name: "offsetCommitPeriodMs"
        value: 200
    #配置方法是属性和构造函数的参数
    configMethods:
    - name: "setProp"
      args:
        - {
          "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
          "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
          "max.partition.fetch.bytes": 200,
          "group.id": "kafkaSpoutTestGroup_test",
          }
  - id: "spoutConfig"
    className: "org.apache.storm.kafka.spout.KafkaSpoutConfig"
    constructorArgs:
      - ref: "spoutConfigBuilder"
# spout定义
spouts:
  - id: "spout-1"
    className: "org.apache.storm.kafka.spout.KafkaSpout"
    parallelism: 1
    constructorArgs:
      - ref: "spoutConfig"

# bolt定义
bolts:
  - id: "bolt-1"
    className: "com.crfchina.stream.test.OneBolt"
    parallelism: 1
  - id: "bolt-2"
    className: "com.crfchina.stream.test.TwoBolt"
    parallelism: 1

# stream定义
streams:
  - name: "spout-1 --> bolt-1" #name暂时未用上(可以在logging,UI等中作为placeholder)
    from: "spout-1"
    to: "bolt-1"
    grouping:
      type: SHUFFLE
      args: ["text"]

  - name: "bolt-1 --> bolt2"
    from: "bolt-1"
    to: "bolt-2"
    grouping:
      type: SHUFFLE
{code}
the error is 
{code:java}
737 [main] WARN o.a.s.f.FluxBuilder - Found multiple invokable constructors for class class org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder, given arguments [10.7.3.45:9092, [test-topic]]. Using the last one found.
Exception in thread "main" java.lang.IllegalArgumentException: argument type mismatch
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.storm.flux.FluxBuilder.applyProperties(FluxBuilder.java:282)
at org.apache.storm.flux.FluxBuilder.buildObject(FluxBuilder.java:383)
at org.apache.storm.flux.FluxBuilder.buildComponents(FluxBuilder.java:421)
at org.apache.storm.flux.FluxBuilder.buildTopology(FluxBuilder.java:101)
at org.apache.storm.flux.Flux.runCli(Flux.java:158)
at org.apache.storm.flux.Flux.main(Flux.java:103){code}
 

  was:
I use yaml like this :
{code:java}
//代码占位符
# Topology定义
name: "testTopology"
config:
  topology.workers: 1
components:
  - id: "spoutConfigBuilder"
    className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder"
    #contructorArgs 是一个列表,其元素是对象
    constructorArgs:
      # bootstrapServers
      - "10.7.3.45:9092"
      # topics
      - ["test-topic"]
    #查找class中setter函数进行set此propertie name对应值
    properties:
      - name: "firstPollOffsetStrategy"
        value: EARLIEST
      - name: "offsetCommitPeriodMs"
        value: 200
    #配置方法是属性和构造函数的参数
    configMethods:
    - name: "setProp"
      args:
        - {
          "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
          "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
          "max.partition.fetch.bytes": 200,
          "group.id": "kafkaSpoutTestGroup_test",
          }
  - id: "spoutConfig"
    className: "org.apache.storm.kafka.spout.KafkaSpoutConfig"
    constructorArgs:
      - ref: "spoutConfigBuilder"
# spout定义
spouts:
  - id: "spout-1"
    className: "org.apache.storm.kafka.spout.KafkaSpout"
    parallelism: 1
    constructorArgs:
      - ref: "spoutConfig"

# bolt定义
bolts:
  - id: "bolt-1"
    className: "com.crfchina.stream.test.OneBolt"
    parallelism: 1
  - id: "bolt-2"
    className: "com.crfchina.stream.test.TwoBolt"
    parallelism: 1

# stream定义
streams:
  - name: "spout-1 --> bolt-1" #name暂时未用上(可以在logging,UI等中作为placeholder)
    from: "spout-1"
    to: "bolt-1"
    grouping:
      type: SHUFFLE
      args: ["text"]

  - name: "bolt-1 --> bolt2"
    from: "bolt-1"
    to: "bolt-2"
    grouping:
      type: SHUFFLE
{code}
the error is cannot find "
{code:java}
org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder{code}
" arguments method


> Flux ues in storm-kafka-client not work?
> ----------------------------------------
>
>                 Key: STORM-3277
>                 URL: https://issues.apache.org/jira/browse/STORM-3277
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: Flux, storm-kafka-client
>    Affects Versions: 1.2.2
>         Environment: CentOS7
>            Reporter: liangxp
>            Priority: Critical
>
> I use yaml like this :
> {code:java}
> //代码占位符
> # Topology定义
> name: "testTopology"
> config:
>   topology.workers: 1
> components:
>   - id: "spoutConfigBuilder"
>     className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder"
>     #contructorArgs 是一个列表,其元素是对象
>     constructorArgs:
>       # bootstrapServers
>       - "10.7.3.45:9092"
>       # topics
>       - ["test-topic"]
>     #查找class中setter函数进行set此propertie name对应值
>     properties:
>       - name: "firstPollOffsetStrategy"
>         value: EARLIEST
>       - name: "offsetCommitPeriodMs"
>         value: 200
>     #配置方法是属性和构造函数的参数
>     configMethods:
>     - name: "setProp"
>       args:
>         - {
>           "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
>           "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
>           "max.partition.fetch.bytes": 200,
>           "group.id": "kafkaSpoutTestGroup_test",
>           }
>   - id: "spoutConfig"
>     className: "org.apache.storm.kafka.spout.KafkaSpoutConfig"
>     constructorArgs:
>       - ref: "spoutConfigBuilder"
> # spout定义
> spouts:
>   - id: "spout-1"
>     className: "org.apache.storm.kafka.spout.KafkaSpout"
>     parallelism: 1
>     constructorArgs:
>       - ref: "spoutConfig"
> # bolt定义
> bolts:
>   - id: "bolt-1"
>     className: "com.crfchina.stream.test.OneBolt"
>     parallelism: 1
>   - id: "bolt-2"
>     className: "com.crfchina.stream.test.TwoBolt"
>     parallelism: 1
> # stream定义
> streams:
>   - name: "spout-1 --> bolt-1" #name暂时未用上(可以在logging,UI等中作为placeholder)
>     from: "spout-1"
>     to: "bolt-1"
>     grouping:
>       type: SHUFFLE
>       args: ["text"]
>   - name: "bolt-1 --> bolt2"
>     from: "bolt-1"
>     to: "bolt-2"
>     grouping:
>       type: SHUFFLE
> {code}
> the error is 
> {code:java}
> 737 [main] WARN o.a.s.f.FluxBuilder - Found multiple invokable constructors for class class org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder, given arguments [10.7.3.45:9092, [test-topic]]. Using the last one found.
> Exception in thread "main" java.lang.IllegalArgumentException: argument type mismatch
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.storm.flux.FluxBuilder.applyProperties(FluxBuilder.java:282)
> at org.apache.storm.flux.FluxBuilder.buildObject(FluxBuilder.java:383)
> at org.apache.storm.flux.FluxBuilder.buildComponents(FluxBuilder.java:421)
> at org.apache.storm.flux.FluxBuilder.buildTopology(FluxBuilder.java:101)
> at org.apache.storm.flux.Flux.runCli(Flux.java:158)
> at org.apache.storm.flux.Flux.main(Flux.java:103){code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)