You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Tecno Brain <ce...@gmail.com> on 2018/01/24 20:32:44 UTC

Providing Kafka configuration as Map of Strings

On page
https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
there is this Java example:

Map<String, Object> kafkaParams = new
HashMap<>();kafkaParams.put("bootstrap.servers",
"localhost:9092,anotherhost:9092");kafkaParams.put("key.deserializer",
StringDeserializer.class);kafkaParams.put("value.deserializer",
StringDeserializer.class);kafkaParams.put("group.id",
"use_a_separate_group_id_for_each_stream");kafkaParams.put("auto.offset.reset",
"latest");kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("topicA", "topicB");
JavaInputDStream<ConsumerRecord<String, String>> stream =
  KafkaUtils.createDirectStream(
    streamingContext,
    LocationStrategies.PreferConsistent(),
    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
  );

I would like to configure Kafka from properties loaded from a Properties
file or a Map<String, String>.

Is there any API to take a Map<String, String> and produce the required
Map<String, Object> required to set the Kafka parameters ? Such code would
convert "true" to a boolean, or a class name to the Class depending on the
key.

Seems to me that I would need to know ALL possible Kafka parameters and
what data type they should be converted to in order to produce the
Map<String, Object> kafkaParams.

The older API used a Map<String, String> passed to the
KafkaUtils.createDirectStream

Thanks

Re: Providing Kafka configuration as Map of Strings

Posted by Cody Koeninger <co...@koeninger.org>.
Have you tried passing in a Map<String,Object> that happens to have
string for all the values?  I haven't tested this, but the underlying
kafka consumer constructor is documented to take either strings or
objects as values, despite the static type.

On Wed, Jan 24, 2018 at 2:48 PM, Tecno Brain
<ce...@gmail.com> wrote:
> Basically, I am trying to avoid writing code like:
>
>       switch( key ) {
>                 case "key.deserializer" :  result.put(key ,
> Class.forName(value)); break;
>                 case "key.serializer"   :  result.put(key ,
> Class.forName(value)); break;
>                 case "value.deserializer" :  result.put(key ,
> Class.forName(value)); break;
>                 case "value.serializer"   :  result.put(key ,
> Class.forName(value)); break;
>                 case "max.partition.fetch.bytes" : result.put(key,
> Long.valueOf(value)); break;
>                 case "max.poll.interval.ms" : result.put(key,
> Long.valueOf(value)); break;
>                 case "enable.auto.commit" : result.put(key,
> Boolean.valueOf(value)); break;
>                 default:
>                     result.put(key, value);
>                     break;
>             }
>
> since I would need to go over all possible Kafka properties that are not
> expected as a String.
>
> On Wed, Jan 24, 2018 at 12:32 PM, Tecno Brain <ce...@gmail.com>
> wrote:
>>
>> On page
>> https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>> there is this Java example:
>>
>> Map<String, Object> kafkaParams = new HashMap<>();
>> kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
>> kafkaParams.put("key.deserializer", StringDeserializer.class);
>> kafkaParams.put("value.deserializer", StringDeserializer.class);
>> kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
>> kafkaParams.put("auto.offset.reset", "latest");
>> kafkaParams.put("enable.auto.commit", false);
>>
>> Collection<String> topics = Arrays.asList("topicA", "topicB");
>>
>> JavaInputDStream<ConsumerRecord<String, String>> stream =
>>   KafkaUtils.createDirectStream(
>>     streamingContext,
>>     LocationStrategies.PreferConsistent(),
>>     ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
>>   );
>>
>> I would like to configure Kafka from properties loaded from a Properties
>> file or a Map<String, String>.
>>
>> Is there any API to take a Map<String, String> and produce the required
>> Map<String, Object> required to set the Kafka parameters ? Such code would
>> convert "true" to a boolean, or a class name to the Class depending on the
>> key.
>>
>> Seems to me that I would need to know ALL possible Kafka parameters and
>> what data type they should be converted to in order to produce the
>> Map<String, Object> kafkaParams.
>>
>> The older API used a Map<String, String> passed to the
>> KafkaUtils.createDirectStream
>>
>> Thanks
>>
>>
>>
>>
>>
>>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Providing Kafka configuration as Map of Strings

Posted by Tecno Brain <ce...@gmail.com>.
Basically, I am trying to avoid writing code like:

      switch( key ) {
                case "key.deserializer" :  result.put(key ,
Class.forName(value)); break;
                case "key.serializer"   :  result.put(key ,
Class.forName(value)); break;
                case "value.deserializer" :  result.put(key ,
Class.forName(value)); break;
                case "value.serializer"   :  result.put(key ,
Class.forName(value)); break;
                case "max.partition.fetch.bytes" : result.put(key,
Long.valueOf(value)); break;
                case "max.poll.interval.ms" : result.put(key,
Long.valueOf(value)); break;
                case "enable.auto.commit" : result.put(key,
Boolean.valueOf(value)); break;
                default:
                    result.put(key, value);
                    break;
            }

since I would need to go over all possible Kafka properties that are not
expected as a String.

On Wed, Jan 24, 2018 at 12:32 PM, Tecno Brain <ce...@gmail.com>
wrote:

> On page https://spark.apache.org/docs/latest/streaming-kafka-0-
> 10-integration.html
> there is this Java example:
>
> Map<String, Object> kafkaParams = new HashMap<>();kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");kafkaParams.put("key.deserializer", StringDeserializer.class);kafkaParams.put("value.deserializer", StringDeserializer.class);kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");kafkaParams.put("auto.offset.reset", "latest");kafkaParams.put("enable.auto.commit", false);
> Collection<String> topics = Arrays.asList("topicA", "topicB");
> JavaInputDStream<ConsumerRecord<String, String>> stream =
>   KafkaUtils.createDirectStream(
>     streamingContext,
>     LocationStrategies.PreferConsistent(),
>     ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
>   );
>
> I would like to configure Kafka from properties loaded from a Properties
> file or a Map<String, String>.
>
> Is there any API to take a Map<String, String> and produce the required
> Map<String, Object> required to set the Kafka parameters ? Such code would
> convert "true" to a boolean, or a class name to the Class depending on the
> key.
>
> Seems to me that I would need to know ALL possible Kafka parameters and
> what data type they should be converted to in order to produce the
> Map<String, Object> kafkaParams.
>
> The older API used a Map<String, String> passed to the
> KafkaUtils.createDirectStream
>
> Thanks
>
>
>
>
>
>
>