You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Paul Jose <pa...@ugamsolutions.com> on 2020/09/17 13:57:12 UTC

Flux 2.2.0

Hi,

My name is Paul. Previously I was using storm 0.10.0 and now have to upgrade to the latest 2.2.0.
I am using flux. But it isn't working. This is my config:


name: "AuditLogConsumerTopology"

components:

  - id: "kafkaConfigBuilder"
    className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder"
    constructorArgs:
      # bootstrap servers
      - "localhost:9092"
      # topics
      - ["topic"]

  #Spout Config
  - id: "SpoutConfig"
    className: "org.apache.storm.kafka.spout.KafkaSpoutConfig"
    constructorArgs:
      # brokerHosts
      - ref: "kafkaConfigBuilder"
    properties:
      - name: "group.id"
        value: "test-group"
      - name: "key.deserializer"
        value: "org.apache.kafka.common.serialization.StringDeserializer"
      - name: "value.deserializer"
        value: "org.apache.kafka.common.serialization.StringDeserializer"


config:
  topology.workers: 1
  topology.max.spout.pending: 1000
  topology.acker.executors: 1
  topology.executor.send.buffer.size: 16384
  topology.executor.receive.buffer.size: 16384
  topology.transfer.buffer.size: 32
  zookeeperHost: "localhost"
  zookeeperBasePath: "platformaudit"
  kafka.broker.properties:
    #bootstrap.servers: "localhost:9092"
    metadata.broker.list: "localhost:9092"
    request.required.acks: "1"
  topic: "topic"


spouts:
  - id: "TestSpout"
    className: "org.apache.storm.kafka.spout.KafkaSpout"
    parallelism: 1
    constructorArgs:
      - ref: "SpoutConfig"


bolts:
  - id: "TestBolt"
    className: "auditlog.TestBolt"
    parallelism: 1
    constructorArgs:
        #index
      - "auditlog"
        #type
      - "_doc"
        #hostname
      - localhost
        #port
      - 9200


streams:
  - name: "TestSpout --> TestBolt"
    from: "TesstSpout"
    to: "TestBolt"
    grouping:
      type: SHUFFLE


My changing the configuration by referring to the actual storm consumer 2.2.0 code I was able to make this yaml flux file.


However I am unable to specify any properties from this yaml file for the consumer.

I tried giving "group.id" and it gives an errror:

Exception in thread "main" java.lang.NoSuchFieldException: groupId

at java.lang.Class.getField(Class.java:1703)
at org.apache.storm.flux.FluxBuilder.findPublicField(FluxBuilder.java:298)
at org.apache.storm.flux.FluxBuilder.applyProperties(FluxBuilder.java:288)
at org.apache.storm.flux.FluxBuilder.buildObject(FluxBuilder.java:390)
at org.apache.storm.flux.FluxBuilder.buildComponents(FluxBuilder.java:428)
at org.apache.storm.flux.FluxBuilder.buildTopology(FluxBuilder.java:102)
at org.apache.storm.flux.Flux.runCli(Flux.java:174)
at org.apache.storm.flux.Flux.main(Flux.java:119)

Without specifying properties the topology does get submitted but it throws an error in the spout saying:


java.lang.RuntimeException: org.apache.kafka.common.config.ConfigException: Missing required configuration "key.deserializer" which has no default value. at org.apache.storm.utils.Utils$1.run(Utils.java:409) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "key.deserializer" which has no default value. at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:478) at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:468) at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108) at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129) at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:607) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:632) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:613) at org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault.createConsumer(ConsumerFactoryDefault.java:26) at org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault.createConsumer(ConsumerFactoryDefault.java:22) at org.apache.storm.kafka.spout.KafkaSpout.open(KafkaSpout.java:147) at org.apache.storm.executor.spout.SpoutExecutor.init(SpoutExecutor.java:149) at org.apache.storm.executor.spout.SpoutExecutor.call(SpoutExecutor.java:159) at org.apache.storm.executor.spout.SpoutExecutor.call(SpoutExecutor.java:56) at org.apache.storm.utils.Utils$1.run(Utils.java:389) ... 1 more

I went through a bit of the source code and found this. The properties aren't there in the "org.apache.storm.kafka.spout.KafkaSpoutConfig" file but in the "ConsumerConfig" file. So when flux tries to get the field name given in the properties file it fails.

Previously in version 0.10.0 the configuration fields were available in the "storm.kafka.SpoutConfig" file itself.


Can I please get some help on how to resolve this issue? The flux kafka spout example at "https://storm.apache.org/releases/2.2.0/flux.html" is also wrong. I am not sure if it has been updated.


Best Regards,

Paul Jose
---------------------------------------------------------------------------------------Disclaimer---------------------------------------------------------------------------------------------- 

****Views and opinions expressed in this e-mail belong to  their author and do not necessarily represent views and opinions  of Ugam. 
Our employees are obliged not to make any defamatory statement or infringe any legal right. 
Therefore, Ugam does not accept any responsibility or liability for such statements. The content of this email is confidential and intended for the recipient specified in message only. It is strictly forbidden to share any part of this message with any third party, without a written consent of the sender.
If you have received this message by mistake, please reply to this message and follow with its deletion, so that we can ensure such a mistake does not occur in the future. 
Warning: Sufficient measures have been taken to scan any presence of viruses however the recipient should check this email and any attachments for the presence of viruses as full security of the email cannot be ensured despite our best efforts.
Therefore, Ugam accepts no liability for any damage inflicted by viewing the content of this email.. ****

Please do not print this email unless it is necessary. Every unprinted email helps the environment. 

Flux 2.2.0

Posted by Paul Jose <pa...@ugamsolutions.com>.
Hi,

My name is Paul. Previously I was using storm 0.10.0 and now have to upgrade to the latest 2.2.0.
I am using flux. But it isn't working. This is my config:


name: "AuditLogConsumerTopology"

components:

  - id: "kafkaConfigBuilder"
    className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder"
    constructorArgs:
      # bootstrap servers
      - "localhost:9092"
      # topics
      - ["topic"]

  #Spout Config
  - id: "SpoutConfig"
    className: "org.apache.storm.kafka.spout.KafkaSpoutConfig"
    constructorArgs:
      # brokerHosts
      - ref: "kafkaConfigBuilder"
    properties:
      - name: "group.id"
        value: "test-group"
      - name: "key.deserializer"
        value: "org.apache.kafka.common.serialization.StringDeserializer"
      - name: "value.deserializer"
        value: "org.apache.kafka.common.serialization.StringDeserializer"


config:
  topology.workers: 1
  topology.max.spout.pending: 1000
  topology.acker.executors: 1
  topology.executor.send.buffer.size: 16384
  topology.executor.receive.buffer.size: 16384
  topology.transfer.buffer.size: 32
  zookeeperHost: "localhost"
  zookeeperBasePath: "platformaudit"
  kafka.broker.properties:
    #bootstrap.servers: "localhost:9092"
    metadata.broker.list: "localhost:9092"
    request.required.acks: "1"
  topic: "topic"


spouts:
  - id: "TestSpout"
    className: "org.apache.storm.kafka.spout.KafkaSpout"
    parallelism: 1
    constructorArgs:
      - ref: "SpoutConfig"


bolts:
  - id: "TestBolt"
    className: "auditlog.TestBolt"
    parallelism: 1
    constructorArgs:
        #index
      - "auditlog"
        #type
      - "_doc"
        #hostname
      - localhost
        #port
      - 9200


streams:
  - name: "TestSpout --> TestBolt"
    from: "TesstSpout"
    to: "TestBolt"
    grouping:
      type: SHUFFLE


My changing the configuration by referring to the actual storm consumer 2.2.0 code I was able to make this yaml flux file.


However I am unable to specify any properties from this yaml file for the consumer.

I tried giving "group.id" and it gives an errror:

Exception in thread "main" java.lang.NoSuchFieldException: groupId

at java.lang.Class.getField(Class.java:1703)
at org.apache.storm.flux.FluxBuilder.findPublicField(FluxBuilder.java:298)
at org.apache.storm.flux.FluxBuilder.applyProperties(FluxBuilder.java:288)
at org.apache.storm.flux.FluxBuilder.buildObject(FluxBuilder.java:390)
at org.apache.storm.flux.FluxBuilder.buildComponents(FluxBuilder.java:428)
at org.apache.storm.flux.FluxBuilder.buildTopology(FluxBuilder.java:102)
at org.apache.storm.flux.Flux.runCli(Flux.java:174)
at org.apache.storm.flux.Flux.main(Flux.java:119)

Without specifying properties the topology does get submitted but it throws an error in the spout saying:


java.lang.RuntimeException: org.apache.kafka.common.config.ConfigException: Missing required configuration "key.deserializer" which has no default value. at org.apache.storm.utils.Utils$1.run(Utils.java:409) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "key.deserializer" which has no default value. at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:478) at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:468) at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108) at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129) at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:607) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:632) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:613) at org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault.createConsumer(ConsumerFactoryDefault.java:26) at org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault.createConsumer(ConsumerFactoryDefault.java:22) at org.apache.storm.kafka.spout.KafkaSpout.open(KafkaSpout.java:147) at org.apache.storm.executor.spout.SpoutExecutor.init(SpoutExecutor.java:149) at org.apache.storm.executor.spout.SpoutExecutor.call(SpoutExecutor.java:159) at org.apache.storm.executor.spout.SpoutExecutor.call(SpoutExecutor.java:56) at org.apache.storm.utils.Utils$1.run(Utils.java:389) ... 1 more

I went through a bit of the source code and found this. The properties aren't there in the "org.apache.storm.kafka.spout.KafkaSpoutConfig" file but in the "ConsumerConfig" file. So when flux tries to get the field name given in the properties file it fails.

Previously in version 0.10.0 the configuration fields were available in the "storm.kafka.SpoutConfig" file itself.


Can I please get some help on how to resolve this issue? The flux kafka spout example at "https://storm.apache.org/releases/2.2.0/flux.html" is also wrong. I am not sure if it has been updated.


Best Regards,

Paul Jose
---------------------------------------------------------------------------------------Disclaimer---------------------------------------------------------------------------------------------- 

****Views and opinions expressed in this e-mail belong to  their author and do not necessarily represent views and opinions  of Ugam. 
Our employees are obliged not to make any defamatory statement or infringe any legal right. 
Therefore, Ugam does not accept any responsibility or liability for such statements. The content of this email is confidential and intended for the recipient specified in message only. It is strictly forbidden to share any part of this message with any third party, without a written consent of the sender.
If you have received this message by mistake, please reply to this message and follow with its deletion, so that we can ensure such a mistake does not occur in the future. 
Warning: Sufficient measures have been taken to scan any presence of viruses however the recipient should check this email and any attachments for the presence of viruses as full security of the email cannot be ensured despite our best efforts.
Therefore, Ugam accepts no liability for any damage inflicted by viewing the content of this email.. ****

Please do not print this email unless it is necessary. Every unprinted email helps the environment.