You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Manikumar (JIRA)" <ji...@apache.org> on 2018/07/25 16:29:00 UTC
[jira] [Resolved] (KAFKA-5799) New
KafkaSpoutConfig(Scheme)-ByteArrayKeyValueScheme
[ https://issues.apache.org/jira/browse/KAFKA-5799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Manikumar resolved KAFKA-5799.
------------------------------
Resolution: Auto Closed
Closing Apache Storm - Kafka Spout related query. If this still issue, please contact storm mailing list.
> New KafkaSpoutConfig(Scheme)-ByteArrayKeyValueScheme
> ----------------------------------------------------
>
> Key: KAFKA-5799
> URL: https://issues.apache.org/jira/browse/KAFKA-5799
> Project: Kafka
> Issue Type: New Feature
> Affects Versions: 0.11.0.0
> Environment: apache-storm 1.1.0
> Reporter: Juhong NamGung
> Priority: Minor
> Attachments: 1.JPG, 2.JPG, bakvs.JPG
>
>
> I try to integrate Kafka with Apache Strom.
> I want to get data from Kafka, using KafkaSpout in Apache Storm.
> To get data from Kafka using KafkaSpout, SpoutConfig-scheme must be setting. (Scheme is an interface that dictates how the ByteBuffer consumed from Kafka gets transformed into a storm tuple)
> I want to get both key and value in Kafka, so I used to KafkaSpoutConfig ‘KeyValueSchemeAsMultiScheme’.
> KeyValueSchemeAsMultiScheme’s Constructor is as follows.
> [^2.JPG]
> But, as you can see in the picture, implementing classes of Interface KeyValueScheme are only StringKeyValueScheme.
> [^1.JPG]
> Using StringKeyValueShceme causes problems when importing Integer data from Kafka. Because StringKeyValueScheme deserialize Bytebuffer to String.
> So I implement ByteArrayKeyValueScheme that deserialize ByteBuffer to ByteArray.
> ByteArrayKeyValueScheme imports data as BtyeArray.
> If you use ByteArrayKeyValueScheme, you can import data regardless of data type from Kafka without error.
> (But, you should convert data type ByteArray to data type that you want(e.g. String, Integer...))
> [^bakvs.JPG]
> {code:java}
> // Some comments here
> import java.nio.ByteBuffer;
> import java.util.List;
> import org.apache.storm.kafka.KeyValueScheme;
> import org.apache.storm.spout.RawScheme;
> import org.apache.storm.tuple.Values;
> import com.google.common.collect.ImmutableMap;
> public class ByteArrayKeyValueScheme extends RawScheme implements KeyValueScheme {
> @Override
> public List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value) {
> // TODO Auto-generated method stub
> if (key == null) {
> return deserialize(value);
> }
> Object keytuple = deserialize(key).get(0);
> Object valuetuple = deserialize(value).get(0);
> return new Values(ImmutableMap.of(keytuple, valuetuple));
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)