You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Manikumar (JIRA)" <ji...@apache.org> on 2018/07/25 16:29:00 UTC

[jira] [Resolved] (KAFKA-5799) New KafkaSpoutConfig(Scheme)-ByteArrayKeyValueScheme

     [ https://issues.apache.org/jira/browse/KAFKA-5799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Manikumar resolved KAFKA-5799.
------------------------------
    Resolution: Auto Closed

Closing Apache Storm - Kafka Spout related query.  If this still issue, please contact storm mailing list.

> New KafkaSpoutConfig(Scheme)-ByteArrayKeyValueScheme
> ----------------------------------------------------
>
>                 Key: KAFKA-5799
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5799
>             Project: Kafka
>          Issue Type: New Feature
>    Affects Versions: 0.11.0.0
>         Environment: apache-storm 1.1.0
>            Reporter: Juhong NamGung
>            Priority: Minor
>         Attachments: 1.JPG, 2.JPG, bakvs.JPG
>
>
> I try to integrate Kafka with Apache Strom.
> I want to get data from Kafka, using KafkaSpout in Apache Storm. 
> To get data from Kafka using KafkaSpout, SpoutConfig-scheme must be setting. (Scheme is an interface that dictates how the ByteBuffer consumed from Kafka gets transformed into a storm tuple)
> I want to get both key and value in Kafka, so I used to KafkaSpoutConfig ‘KeyValueSchemeAsMultiScheme’.
> KeyValueSchemeAsMultiScheme’s Constructor is as follows.
> [^2.JPG]
> But, as you can see in the picture, implementing classes of Interface KeyValueScheme are only StringKeyValueScheme.
> [^1.JPG]
> Using StringKeyValueShceme causes problems when importing Integer data from Kafka. Because StringKeyValueScheme deserialize Bytebuffer to String.
> So I implement ByteArrayKeyValueScheme that deserialize ByteBuffer to ByteArray.
> ByteArrayKeyValueScheme imports data as BtyeArray.
> If you use ByteArrayKeyValueScheme, you can import data regardless of data type from Kafka without error.
> (But, you should convert data type ByteArray to data type that you want(e.g. String, Integer...))
> [^bakvs.JPG]
> {code:java}
> // Some comments here
> import java.nio.ByteBuffer;
> import java.util.List;
> import org.apache.storm.kafka.KeyValueScheme;
> import org.apache.storm.spout.RawScheme;
> import org.apache.storm.tuple.Values;
> import com.google.common.collect.ImmutableMap;
> public class ByteArrayKeyValueScheme extends RawScheme implements KeyValueScheme {
> 	@Override
> 	public List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value) {
> 		// TODO Auto-generated method stub
> 		if (key == null) {
> 			return deserialize(value);
> 		}
> 		Object keytuple = deserialize(key).get(0);
> 		Object valuetuple = deserialize(value).get(0);
> 		return new Values(ImmutableMap.of(keytuple, valuetuple));
> 	}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)