You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Pushkar Deole <pd...@gmail.com> on 2019/07/25 08:51:11 UTC

Getting started with Kafka topic to store multiple types

Hi All,

I am new to Kafka and still getting myself acquainted with the product. I
have a basic question around using Kafka. I want to store in a Kafka topic,
a string value against some keys while a HashMap value against some of the
keys. For this purpose, I have created two different producers as below
which I instantiate with two different producer instances. Note that I need
to create two different producers since I want to use generic types
properly, else with a single producer if I want to use the same producer to
store a String and Map then I will need to use <String, Object> in the
generic types and Object is too generic which I don't want to allow, so
defined two different producers...

private static Producer<String, String> basicProducer = null;
private static Producer<String, Map> hashProducer = null;

Now, if I want to use other streams classes such as KTable or GlobalKTable
from where I would read my data, then these classes also requires to define
generic types e.g. if I define a GlobalKTable like below then it won't work
for HashMap stored against key:

private static GlobalKTable<String, String> eventsGTable;

So, in order to allow a String as well as Map stored against key in a
topic, how should I go about it? Do I need to use <String, Object> as
generic types in all definitions and then cast from Object to String or Map
as per the type of instance stored in the object?

Re: Getting started with Kafka topic to store multiple types

Posted by Peter Levart <pe...@gmail.com>.
Hi Pushkar,


On 7/25/19 10:51 AM, Pushkar Deole wrote:
> Hi All,
>
> I am new to Kafka and still getting myself acquainted with the product. I
> have a basic question around using Kafka. I want to store in a Kafka topic,
> a string value against some keys while a HashMap value against some of the
> keys. For this purpose, I have created two different producers as below
> which I instantiate with two different producer instances. Note that I need
> to create two different producers since I want to use generic types
> properly, else with a single producer if I want to use the same producer to
> store a String and Map then I will need to use <String, Object> in the
> generic types and Object is too generic which I don't want to allow, so
> defined two different producers...
>
> private static Producer<String, String> basicProducer = null;
> private static Producer<String, Map> hashProducer = null;
>
> Now, if I want to use other streams classes such as KTable or GlobalKTable
> from where I would read my data, then these classes also requires to define
> generic types e.g. if I define a GlobalKTable like below then it won't work
> for HashMap stored against key:
>
> private static GlobalKTable<String, String> eventsGTable;
>
> So, in order to allow a String as well as Map stored against key in a
> topic, how should I go about it? Do I need to use <String, Object> as
> generic types in all definitions and then cast from Object to String or Map
> as per the type of instance stored in the object?
>

If you want to store String(s) as well as Map(s) into values of a single 
topic and still be type-safe, I recommend introducing an indirection in 
the form of an "Either" class. For example:


import java.util.Objects;

public final class Either<FST, SND> {

     public static <FST, SND> Either<FST, SND> first(FST first) {
         return new Either<>(Objects.requireNonNull(first), null);
     }

     public static <FST, SND> Either<FST, SND> second(SND second) {
         return new Either<>(null, Objects.requireNonNull(second));
     }

     private final FST fst;
     private final SND snd;

     private Either(FST fst, SND snd) {
         this.fst = fst;
         this.snd = snd;
     }

     public boolean isFirst() {
         return fst != null;
     }

     public boolean isSecond() {
         return snd != null;
     }

     public FST getFirst() {
         return Objects.requireNonNull(fst);
     }

     public SND getSecond() {
         return Objects.requireNonNull(snd);
     }
}


(BTW, what type are the keys and values in your Map(s)? Let's say for 
the sake of this example, that you store Map<String, String> instances.

So you can then create a KafkaConsumer<String, Either<String, 
Map<String, String>>> and KafkaProducer<String, Either<String, 
Map<String, String>>> consumer/producer instances using custom 
(de)serialization (Serializer, Deserializer, Serde). Here's what I used 
in such occasions:

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;

/**
  * A {@link Serde} base implementation that is itself also a {@link 
Serializer} and a {@link Deserializer}.
  */
public abstract class SelfSerde<T> implements Serde<T>, Serializer<T>, 
Deserializer<T> {

     @Override
     public void configure(Map<String, ?> configs, boolean isKey) { }

     @Override
     public void close() { }

     @Override
     public Serializer<T> serializer() {
         return this;
     }

     @Override
     public Deserializer<T> deserializer() {
         return this;
     }

     /**
      * A {@link SelfSerde} implementation that serializes {@link Void} type
      * which has the only possible value {@code null}.
      */
     public static final SelfSerde<Void> VOID = new SelfSerde<Void>() {
         @Override
         public Void deserialize(String topic, byte[] data) {
             assert data == null;
             return null;
         }

         @Override
         public byte[] serialize(String topic, Void data) {
             return null;
         }
     };

     /**
      * A {@link SelfSerde} base implementation that delegates 
(de)serializing to methods using
      * {@link DataInput} and {@link DataOutput} API.
      */
     public static abstract class DataIO<T> extends SelfSerde<T> {
         @Override
         public final T deserialize(String topic, byte[] data) {
             if (data == null) return null;
             try {
                 return deserialize(topic, data, new DataInputStream(new 
ByteArrayInputStream(data)));
             } catch (IOException e) {
                 throw new UncheckedIOException(e);
             }
         }

         @Override
         public final byte[] serialize(String topic, T data) {
             if (data == null) return null;
             try {
                 ByteArrayOutputStream baos = new ByteArrayOutputStream();
                 serialize(topic, data, new DataOutputStream(baos));
                 return baos.toByteArray();
             } catch (IOException e) {
                 throw new UncheckedIOException(e);
             }
         }

         protected abstract T deserialize(String topic, byte[] data, 
DataInput in) throws IOException;

         protected abstract void serialize(String topic, T data, 
DataOutput out) throws IOException;

         protected static <T> void write(String topic, DataOutput out, T 
value, Serde<T> serde) throws IOException {
             byte[] bytes = serde.serializer().serialize(topic, value);
             if (bytes == null) {
                 out.writeInt(-1);
             } else {
                 out.writeInt(bytes.length);
                 out.write(bytes);
             }
         }

         protected static <T> T read(String topic, DataInput in, 
Serde<T> serde) throws IOException {
             int size = in.readInt();
             byte[] bytes;
             if (size < 0) {
                 bytes = null;
             } else {
                 in.readFully(bytes = new byte[size]);
             }
             return serde.deserializer().deserialize(topic, bytes);
         }
     }

     static abstract class MapSerde<K, V> extends 
SelfSerde.DataIO<Map<K, V>> {

         final Serde<K> keySerde;
         final Serde<V> valSerde;

         MapSerde(Serde<K> keySerde, Serde<V> valSerde) {
             this.keySerde = Objects.requireNonNull(keySerde);
             this.valSerde = Objects.requireNonNull(valSerde);
         }

         @Override
         public void configure(Map<String, ?> configs, boolean isKey) {
             keySerde.configure(configs, isKey);
             valSerde.configure(configs, isKey);
         }

         @Override
         public void close() {
             keySerde.close();
             valSerde.close();
         }

         @Override
         protected void serialize(String topic, Map<K, V> map, 
DataOutput out) throws IOException {
             try {
                 int[] size = new int[1];
                 map.forEach((k, v) -> {
                     try {
                         write(topic, out, k, keySerde);
                         write(topic, out, v, valSerde);
                         size[0]++;
                     } catch (IOException e) {
                         throw new UncheckedIOException(e);
                     }
                 });
                 out.writeInt(size[0]);
             } catch (UncheckedIOException e) {
                 throw e.getCause();
             }
         }
     }

     public static class HashMap<K, V> extends MapSerde<K, V> {

         public HashMap(Serde<K> keySerde, Serde<V> valSerde) {
             super(keySerde, valSerde);
         }

         @Override
         protected Map<K, V> deserialize(String topic, byte[] data, 
DataInput in) throws IOException {
             ByteBuffer bb = ByteBuffer.wrap(data, data.length - 
Integer.BYTES, Integer.BYTES);
             int size = bb.getInt();
             Map<K, V> map = new java.util.HashMap<>((size * 4 + 2) / 3);
             for (int i = 0; i < size; i++) {
                 map.put(read(topic, in, keySerde), read(topic, in, 
valSerde));
             }
             return map;
         }
     }
}


Here's also a Serde implementation for Either<FST, SND> instances:


import org.apache.kafka.common.serialization.Serde;

import java.io.StreamCorruptedException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;

public final class EitherSerde<FST, SND> extends SelfSerde<Either<FST, 
SND>> {
     private final Serde<FST> fstSerde;
     private final Serde<SND> sndSerde;

     public EitherSerde(Serde<FST> fstSerde,
                        Serde<SND> sndSerde) {
         this.fstSerde = Objects.requireNonNull(fstSerde);
         this.sndSerde = Objects.requireNonNull(sndSerde);
     }

     @Override
     public void configure(Map<String, ?> configs, boolean isKey) {
         fstSerde.configure(configs, isKey);
         sndSerde.configure(configs, isKey);
     }

     @Override
     public void close() {
         fstSerde.close();
         sndSerde.close();
     }

     @Override
     public Either<FST, SND> deserialize(String topic, byte[] data) {
         if (data == null) return null;
         byte[] elementData = Arrays.copyOfRange(data, 0, data.length - 1);
         byte id = data[data.length - 1];
         switch (id) {
             case 0:
                 return 
Either.first(fstSerde.deserializer().deserialize(topic, elementData));
             case 1:
                 return 
Either.second(sndSerde.deserializer().deserialize(topic, elementData));
             default:
                 throw new UncheckedIOException(new 
StreamCorruptedException("Expected 0 or 1 as last byte of Either"));
         }
     }

     @Override
     public byte[] serialize(String topic, Either<FST, SND> oneof2) {
         if (oneof2 == null) return null;
         byte[] elementData;
         byte id;
         if (oneof2.isFirst()) {
             elementData = fstSerde.serializer().serialize(topic, 
oneof2.getFirst());
             id = 0;
         } else {// oneof2.isSecond()
             elementData = sndSerde.serializer().serialize(topic, 
oneof2.getSecond());
             id = 1;
         }
         byte[] data = Arrays.copyOf(elementData, elementData.length + 1);
         data[data.length - 1] = id;
         return data;
     }
}



You could then use above utilities in the following way:


         SelfSerde<Either<String, Map<String, String>>> valSerde =
             new EitherSerde<>(
                 Serdes.String(),
                 new SelfSerde.HashMap<>(Serdes.String(), Serdes.String())
             );

         Properties props = ...;

         Consumer<String, Either<String, Map<String, String>>> consumer =
             new KafkaConsumer<>(
                 props,
                 Serdes.String().deserializer(),
                 valSerde
             );

// or:

         Properties props = ...;

         Producer<String, Either<String, Map<String, String>>> producer =
             new KafkaProducer<>(
                 props,
                 Serdes.String().serializer(),
                 valSerde
             );



With custom (de)serialization you achieve compact (small) messages and 
low CPU usage which you can't if you use Java default serialization. And 
you can still enjoy type-safe code :-)


Regards, Peter