You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by aurelien violette <au...@webgroup-limited.com> on 2019/06/06 10:32:44 UTC

Upgrading Kafka from 0.8.x to 0.10.x with Storm 1.1.x

Hello,

I was sucessfully using Kafka 0.8.x in a storm topology based on Storm
Crawler. I needed though to upgrade to Kafka 0.10.x

I tried to simulate my enviroment using a Docker environment :
Storm 1.1 and Kafka 2.11-0.10.2.2

Unfortunately, at the deploy, I get an error on :

Caused by: java.lang.IllegalArgumentException: Invalid lambda
deserialization
       at
c.a.b.storm.topologies.ConfigurableTopology.$deserializeLambda$(AntConfigurableTopology.java:24)
~[stormjar.jar:?]
       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_212]
       at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_212]
       at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_212]
       at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212]
       at
java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
~[?:1.8.0_212]
       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_212]
       at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_212]
       at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_212]
       at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212]
       at
java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260)
~[?:1.8.0_212]
       at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
~[?:1.8.0_212]
       at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
~[?:1.8.0_212]
       at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
~[?:1.8.0_212]
       at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
~[?:1.8.0_212]
       at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
~[?:1.8.0_212]
       at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
~[?:1.8.0_212]
       at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
~[?:1.8.0_212]
       at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
~[?:1.8.0_212]
       at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
~[?:1.8.0_212]
       at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
~[?:1.8.0_212]
       at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
~[?:1.8.0_212]
       at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
~[?:1.8.0_212]
       at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
~[?:1.8.0_212]
       at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
~[?:1.8.0_212]
       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
~[?:1.8.0_212]
       at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:253)
~[storm-core-1.1.3.jar:1.1.3]

Where my ConfigurableTopology is only gathering some config utils for
building topology. In particular, it defines the SpoutConfig.

/**
 * Get a spout config by topic, define the scheme.
 * @param topic
 * @param deserializer deserializer to use from bytes to value.
 * @return
 */
KafkaSpoutConfig getSpoutConfig(
        String topic,
        Object deserializer)
{

  String topic = (String) this.getConf().get(topic);

  // With Kafka 0.10.x, we use the KafkaSpoutConfig.builder
  KafkaSpoutConfig kafkaConfig =
KafkaSpoutConfig.builder(bootstrapServers, topic)
          // Consummer will start from the latest uncommitted offset,
or the earliest offset if any.
          .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
          // Setup serializers from bytes to string.
          // careful the key is dropped from here.
          .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
          .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
          // Setup deserialization to fields : (String key, String
json value) => (String Key, Unpacked object from json)
          // .setRecordTranslator(new ByTopicRecordTranslator<>((r) ->
new Values(r.value()), new Fields("FieldNames")))
          .build();


  return kafkaConfig;

}

I don't understand the origin of the issue. My Maven sets java to 1.8.
Any idea on this issue ?

Actually, I wanted to set up a RecordTranslator to handle the transition
from the input JSON String to my deserialized JSON object. Deserialization
is handled by Gson.

Thank you for your help,
BR,
Aurelien


-- 
BR,
Aurelien Violette

Re: Upgrading Kafka from 0.8.x to 0.10.x with Storm 1.1.x

Posted by Stig Rohde Døssing <st...@gmail.com>.
I still don't see anything wrong.

Googling the error leads to
https://bugs.eclipse.org/bugs/show_bug.cgi?id=516620. Is it possible that
you're compiling on a different JDK than the one running the cluster?

A workaround would be to eliminate the lambda from ByTopicRecordTranslator,
by using an anonymous class instead?

Den tor. 6. jun. 2019 kl. 17.25 skrev aurelien violette <
aurelien.v@webgroup-limited.com>:

> Hey,
>
> Well, here it is. It extends ConfigurableTopology from storm-crawler. And
> I've tried many simplifications to get rid of any lambda potential code.
> The only place I see it, is in the RecordTranslator defaults.
>
> Thank you for you ideas if any about where or what to search for.
>
>
> import com.digitalpebble.stormcrawler.ConfigurableTopology;
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
> import org.apache.storm.kafka.spout.KafkaSpoutConfig;
>
> import java.io.Serializable;
>
> public abstract class AntConfigurableTopology extends ConfigurableTopology implements Serializable {
>
>   String topologyName;
>   // ZkHosts zkHosts;
>   String bootstrapServers = "localhost:9092"; // Default local configuration
>   int parallel = 1;
>
>   void init(String topologyId) {
>     topologyName = (String) this.getConf().get(topologyId);
>     bootstrapServers = (String) this.getConf().get("metadata.broker");
>     final Integer parallelismHint = (Integer) this.getConf().getOrDefault("parallelism", 1);
>     parallel = parallelismHint;
>     if (!this.getConf().containsKey("zkhost")) {
>       this.getConf().put("zkhost", "localhost:2181");
>     }
>   }
>
>
>   /**
>    * Get a spout config by topic
>    * @param topic
>    * @return
>    */
>   KafkaSpoutConfig getSpoutConfig(String topic) {
>     return getSpoutConfig(topic, null); //new AntmarkTupleBuilder(this.getConf()));
>   }
>
>
>   /**
>    * Get a spout config by topic, define the scheme.
>    * @param topic
>    * @param deserializer deserializer to use from bytes to value.
>    * @return
>    */
>   KafkaSpoutConfig getSpoutConfig(
>           String topic,
>           Object deserializer)
>   {
>
>     String addWeblinkTopic = (String) this.getConf().get(topic);
>
>     // With Kafka 0.10.x, we use the KafkaSpoutConfig.builder
>     KafkaSpoutConfig kafkaAddWeblinkConfig = KafkaSpoutConfig.builder(bootstrapServers, addWeblinkTopic)
>             // Consummer will start from the latest uncommitted offset, or the earliest offset if any.
>             .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
>             // Setup serializers from bytes to string.
>             // careful the key is dropped from here.
>             .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,  "org.apache.kafka.common.serialization.StringSerializer")
>             .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
>             // Setup deserialization to fields : (String key, String json value) => (String Key, Unpacked object from json)
>             // .setRecordTranslator(new ByTopicRecordTranslator<>((TargetInterface Serializable)(r) -> new Values(r.value()), new Fields(FieldNames.ANTMARK)))
>             //.setRecordTranslator(deserializer, deserializer.getFields())
>             .build();
>
>
>     return kafkaAddWeblinkConfig;
>   }
> }
>
>
> }
>
>
> Le jeu. 6 juin 2019 à 16:39, Stig Rohde Døssing <st...@gmail.com>
> a écrit :
>
>> I don't see anything wrong with the code you posted. Could you post the
>> full AntConfigurableTopology code? It's hard to tell from that snippet what
>> your topology setup looks like.
>>
>> Den tor. 6. jun. 2019 kl. 12.33 skrev aurelien violette <
>> aurelien.v@webgroup-limited.com>:
>>
>>> Hello,
>>>
>>> I was sucessfully using Kafka 0.8.x in a storm topology based on Storm
>>> Crawler. I needed though to upgrade to Kafka 0.10.x
>>>
>>> I tried to simulate my enviroment using a Docker environment :
>>> Storm 1.1 and Kafka 2.11-0.10.2.2
>>>
>>> Unfortunately, at the deploy, I get an error on :
>>>
>>> Caused by: java.lang.IllegalArgumentException: Invalid lambda
>>> deserialization
>>>        at
>>> c.a.b.storm.topologies.ConfigurableTopology.$deserializeLambda$(AntConfigurableTopology.java:24)
>>> ~[stormjar.jar:?]
>>>        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> ~[?:1.8.0_212]
>>>        at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> ~[?:1.8.0_212]
>>>        at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> ~[?:1.8.0_212]
>>>        at java.lang.reflect.Method.invoke(Method.java:498)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
>>> ~[?:1.8.0_212]
>>>        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> ~[?:1.8.0_212]
>>>        at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> ~[?:1.8.0_212]
>>>        at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> ~[?:1.8.0_212]
>>>        at java.lang.reflect.Method.invoke(Method.java:498)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>>> ~[?:1.8.0_212]
>>>        at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:253)
>>> ~[storm-core-1.1.3.jar:1.1.3]
>>>
>>> Where my ConfigurableTopology is only gathering some config utils for
>>> building topology. In particular, it defines the SpoutConfig.
>>>
>>> /**
>>>  * Get a spout config by topic, define the scheme.
>>>  * @param topic
>>>  * @param deserializer deserializer to use from bytes to value.
>>>  * @return
>>>  */
>>> KafkaSpoutConfig getSpoutConfig(
>>>         String topic,
>>>         Object deserializer)
>>> {
>>>
>>>   String topic = (String) this.getConf().get(topic);
>>>
>>>   // With Kafka 0.10.x, we use the KafkaSpoutConfig.builder
>>>   KafkaSpoutConfig kafkaConfig = KafkaSpoutConfig.builder(bootstrapServers, topic)
>>>           // Consummer will start from the latest uncommitted offset, or the earliest offset if any.
>>>           .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
>>>           // Setup serializers from bytes to string.
>>>           // careful the key is dropped from here.
>>>           .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,  "org.apache.kafka.common.serialization.StringSerializer")
>>>           .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
>>>           // Setup deserialization to fields : (String key, String json value) => (String Key, Unpacked object from json)
>>>           // .setRecordTranslator(new ByTopicRecordTranslator<>((r) -> new Values(r.value()), new Fields("FieldNames")))
>>>           .build();
>>>
>>>
>>>   return kafkaConfig;
>>>
>>> }
>>>
>>> I don't understand the origin of the issue. My Maven sets java to 1.8.
>>> Any idea on this issue ?
>>>
>>> Actually, I wanted to set up a RecordTranslator to handle the transition
>>> from the input JSON String to my deserialized JSON object. Deserialization
>>> is handled by Gson.
>>>
>>> Thank you for your help,
>>> BR,
>>> Aurelien
>>>
>>>
>>> --
>>> BR,
>>> Aurelien Violette
>>>
>>
>
> --
> BR,
> Aurelien Violette
>

Re: Upgrading Kafka from 0.8.x to 0.10.x with Storm 1.1.x

Posted by aurelien violette <au...@webgroup-limited.com>.
Hey,

Well, here it is. It extends ConfigurableTopology from storm-crawler. And
I've tried many simplifications to get rid of any lambda potential code.
The only place I see it, is in the RecordTranslator defaults.

Thank you for you ideas if any about where or what to search for.


import com.digitalpebble.stormcrawler.ConfigurableTopology;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;

import java.io.Serializable;

public abstract class AntConfigurableTopology extends
ConfigurableTopology implements Serializable {

  String topologyName;
  // ZkHosts zkHosts;
  String bootstrapServers = "localhost:9092"; // Default local configuration
  int parallel = 1;

  void init(String topologyId) {
    topologyName = (String) this.getConf().get(topologyId);
    bootstrapServers = (String) this.getConf().get("metadata.broker");
    final Integer parallelismHint = (Integer)
this.getConf().getOrDefault("parallelism", 1);
    parallel = parallelismHint;
    if (!this.getConf().containsKey("zkhost")) {
      this.getConf().put("zkhost", "localhost:2181");
    }
  }


  /**
   * Get a spout config by topic
   * @param topic
   * @return
   */
  KafkaSpoutConfig getSpoutConfig(String topic) {
    return getSpoutConfig(topic, null); //new
AntmarkTupleBuilder(this.getConf()));
  }


  /**
   * Get a spout config by topic, define the scheme.
   * @param topic
   * @param deserializer deserializer to use from bytes to value.
   * @return
   */
  KafkaSpoutConfig getSpoutConfig(
          String topic,
          Object deserializer)
  {

    String addWeblinkTopic = (String) this.getConf().get(topic);

    // With Kafka 0.10.x, we use the KafkaSpoutConfig.builder
    KafkaSpoutConfig kafkaAddWeblinkConfig =
KafkaSpoutConfig.builder(bootstrapServers, addWeblinkTopic)
            // Consummer will start from the latest uncommitted
offset, or the earliest offset if any.
            .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
            // Setup serializers from bytes to string.
            // careful the key is dropped from here.
            .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
            .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
            // Setup deserialization to fields : (String key, String
json value) => (String Key, Unpacked object from json)
            // .setRecordTranslator(new
ByTopicRecordTranslator<>((TargetInterface Serializable)(r) -> new
Values(r.value()), new Fields(FieldNames.ANTMARK)))
            //.setRecordTranslator(deserializer, deserializer.getFields())
            .build();


    return kafkaAddWeblinkConfig;
  }
}


}


Le jeu. 6 juin 2019 à 16:39, Stig Rohde Døssing <st...@gmail.com> a
écrit :

> I don't see anything wrong with the code you posted. Could you post the
> full AntConfigurableTopology code? It's hard to tell from that snippet what
> your topology setup looks like.
>
> Den tor. 6. jun. 2019 kl. 12.33 skrev aurelien violette <
> aurelien.v@webgroup-limited.com>:
>
>> Hello,
>>
>> I was sucessfully using Kafka 0.8.x in a storm topology based on Storm
>> Crawler. I needed though to upgrade to Kafka 0.10.x
>>
>> I tried to simulate my enviroment using a Docker environment :
>> Storm 1.1 and Kafka 2.11-0.10.2.2
>>
>> Unfortunately, at the deploy, I get an error on :
>>
>> Caused by: java.lang.IllegalArgumentException: Invalid lambda
>> deserialization
>>        at
>> c.a.b.storm.topologies.ConfigurableTopology.$deserializeLambda$(AntConfigurableTopology.java:24)
>> ~[stormjar.jar:?]
>>        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> ~[?:1.8.0_212]
>>        at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> ~[?:1.8.0_212]
>>        at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_212]
>>        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212]
>>        at
>> java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
>> ~[?:1.8.0_212]
>>        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> ~[?:1.8.0_212]
>>        at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> ~[?:1.8.0_212]
>>        at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_212]
>>        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212]
>>        at
>> java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260)
>> ~[?:1.8.0_212]
>>        at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
>> ~[?:1.8.0_212]
>>        at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> ~[?:1.8.0_212]
>>        at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>> ~[?:1.8.0_212]
>>        at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>> ~[?:1.8.0_212]
>>        at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>> ~[?:1.8.0_212]
>>        at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> ~[?:1.8.0_212]
>>        at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>> ~[?:1.8.0_212]
>>        at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>> ~[?:1.8.0_212]
>>        at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>> ~[?:1.8.0_212]
>>        at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> ~[?:1.8.0_212]
>>        at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>> ~[?:1.8.0_212]
>>        at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>> ~[?:1.8.0_212]
>>        at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>> ~[?:1.8.0_212]
>>        at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> ~[?:1.8.0_212]
>>        at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>> ~[?:1.8.0_212]
>>        at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:253)
>> ~[storm-core-1.1.3.jar:1.1.3]
>>
>> Where my ConfigurableTopology is only gathering some config utils for
>> building topology. In particular, it defines the SpoutConfig.
>>
>> /**
>>  * Get a spout config by topic, define the scheme.
>>  * @param topic
>>  * @param deserializer deserializer to use from bytes to value.
>>  * @return
>>  */
>> KafkaSpoutConfig getSpoutConfig(
>>         String topic,
>>         Object deserializer)
>> {
>>
>>   String topic = (String) this.getConf().get(topic);
>>
>>   // With Kafka 0.10.x, we use the KafkaSpoutConfig.builder
>>   KafkaSpoutConfig kafkaConfig = KafkaSpoutConfig.builder(bootstrapServers, topic)
>>           // Consummer will start from the latest uncommitted offset, or the earliest offset if any.
>>           .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
>>           // Setup serializers from bytes to string.
>>           // careful the key is dropped from here.
>>           .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,  "org.apache.kafka.common.serialization.StringSerializer")
>>           .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
>>           // Setup deserialization to fields : (String key, String json value) => (String Key, Unpacked object from json)
>>           // .setRecordTranslator(new ByTopicRecordTranslator<>((r) -> new Values(r.value()), new Fields("FieldNames")))
>>           .build();
>>
>>
>>   return kafkaConfig;
>>
>> }
>>
>> I don't understand the origin of the issue. My Maven sets java to 1.8.
>> Any idea on this issue ?
>>
>> Actually, I wanted to set up a RecordTranslator to handle the transition
>> from the input JSON String to my deserialized JSON object. Deserialization
>> is handled by Gson.
>>
>> Thank you for your help,
>> BR,
>> Aurelien
>>
>>
>> --
>> BR,
>> Aurelien Violette
>>
>

-- 
BR,
Aurelien Violette

Re: Upgrading Kafka from 0.8.x to 0.10.x with Storm 1.1.x

Posted by Stig Rohde Døssing <st...@gmail.com>.
I don't see anything wrong with the code you posted. Could you post the
full AntConfigurableTopology code? It's hard to tell from that snippet what
your topology setup looks like.

Den tor. 6. jun. 2019 kl. 12.33 skrev aurelien violette <
aurelien.v@webgroup-limited.com>:

> Hello,
>
> I was sucessfully using Kafka 0.8.x in a storm topology based on Storm
> Crawler. I needed though to upgrade to Kafka 0.10.x
>
> I tried to simulate my enviroment using a Docker environment :
> Storm 1.1 and Kafka 2.11-0.10.2.2
>
> Unfortunately, at the deploy, I get an error on :
>
> Caused by: java.lang.IllegalArgumentException: Invalid lambda
> deserialization
>        at
> c.a.b.storm.topologies.ConfigurableTopology.$deserializeLambda$(AntConfigurableTopology.java:24)
> ~[stormjar.jar:?]
>        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_212]
>        at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_212]
>        at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_212]
>        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212]
>        at
> java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
> ~[?:1.8.0_212]
>        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_212]
>        at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_212]
>        at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_212]
>        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212]
>        at
> java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260)
> ~[?:1.8.0_212]
>        at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
> ~[?:1.8.0_212]
>        at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> ~[?:1.8.0_212]
>        at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> ~[?:1.8.0_212]
>        at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> ~[?:1.8.0_212]
>        at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> ~[?:1.8.0_212]
>        at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> ~[?:1.8.0_212]
>        at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> ~[?:1.8.0_212]
>        at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> ~[?:1.8.0_212]
>        at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> ~[?:1.8.0_212]
>        at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> ~[?:1.8.0_212]
>        at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> ~[?:1.8.0_212]
>        at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> ~[?:1.8.0_212]
>        at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> ~[?:1.8.0_212]
>        at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> ~[?:1.8.0_212]
>        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
> ~[?:1.8.0_212]
>        at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:253)
> ~[storm-core-1.1.3.jar:1.1.3]
>
> Where my ConfigurableTopology is only gathering some config utils for
> building topology. In particular, it defines the SpoutConfig.
>
> /**
>  * Get a spout config by topic, define the scheme.
>  * @param topic
>  * @param deserializer deserializer to use from bytes to value.
>  * @return
>  */
> KafkaSpoutConfig getSpoutConfig(
>         String topic,
>         Object deserializer)
> {
>
>   String topic = (String) this.getConf().get(topic);
>
>   // With Kafka 0.10.x, we use the KafkaSpoutConfig.builder
>   KafkaSpoutConfig kafkaConfig = KafkaSpoutConfig.builder(bootstrapServers, topic)
>           // Consummer will start from the latest uncommitted offset, or the earliest offset if any.
>           .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
>           // Setup serializers from bytes to string.
>           // careful the key is dropped from here.
>           .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,  "org.apache.kafka.common.serialization.StringSerializer")
>           .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
>           // Setup deserialization to fields : (String key, String json value) => (String Key, Unpacked object from json)
>           // .setRecordTranslator(new ByTopicRecordTranslator<>((r) -> new Values(r.value()), new Fields("FieldNames")))
>           .build();
>
>
>   return kafkaConfig;
>
> }
>
> I don't understand the origin of the issue. My Maven sets java to 1.8.
> Any idea on this issue ?
>
> Actually, I wanted to set up a RecordTranslator to handle the transition
> from the input JSON String to my deserialized JSON object. Deserialization
> is handled by Gson.
>
> Thank you for your help,
> BR,
> Aurelien
>
>
> --
> BR,
> Aurelien Violette
>