You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Pushkar Deole <pd...@gmail.com> on 2019/07/25 08:51:11 UTC
Getting started with Kafka topic to store multiple types
Hi All,
I am new to Kafka and still getting myself acquainted with the product. I
have a basic question around using Kafka. I want to store in a Kafka topic,
a string value against some keys while a HashMap value against some of the
keys. For this purpose, I have created two different producers as below
which I instantiate with two different producer instances. Note that I need
to create two different producers since I want to use generic types
properly, else with a single producer if I want to use the same producer to
store a String and Map then I will need to use <String, Object> in the
generic types and Object is too generic which I don't want to allow, so
defined two different producers...
private static Producer<String, String> basicProducer = null;
private static Producer<String, Map> hashProducer = null;
Now, if I want to use other streams classes such as KTable or GlobalKTable
from where I would read my data, then these classes also requires to define
generic types e.g. if I define a GlobalKTable like below then it won't work
for HashMap stored against key:
private static GlobalKTable<String, String> eventsGTable;
So, in order to allow a String as well as Map stored against key in a
topic, how should I go about it? Do I need to use <String, Object> as
generic types in all definitions and then cast from Object to String or Map
as per the type of instance stored in the object?
Re: Getting started with Kafka topic to store multiple types
Posted by Peter Levart <pe...@gmail.com>.
Hi Pushkar,
On 7/25/19 10:51 AM, Pushkar Deole wrote:
> Hi All,
>
> I am new to Kafka and still getting myself acquainted with the product. I
> have a basic question around using Kafka. I want to store in a Kafka topic,
> a string value against some keys while a HashMap value against some of the
> keys. For this purpose, I have created two different producers as below
> which I instantiate with two different producer instances. Note that I need
> to create two different producers since I want to use generic types
> properly, else with a single producer if I want to use the same producer to
> store a String and Map then I will need to use <String, Object> in the
> generic types and Object is too generic which I don't want to allow, so
> defined two different producers...
>
> private static Producer<String, String> basicProducer = null;
> private static Producer<String, Map> hashProducer = null;
>
> Now, if I want to use other streams classes such as KTable or GlobalKTable
> from where I would read my data, then these classes also requires to define
> generic types e.g. if I define a GlobalKTable like below then it won't work
> for HashMap stored against key:
>
> private static GlobalKTable<String, String> eventsGTable;
>
> So, in order to allow a String as well as Map stored against key in a
> topic, how should I go about it? Do I need to use <String, Object> as
> generic types in all definitions and then cast from Object to String or Map
> as per the type of instance stored in the object?
>
If you want to store String(s) as well as Map(s) into values of a single
topic and still be type-safe, I recommend introducing an indirection in
the form of an "Either" class. For example:
import java.util.Objects;
public final class Either<FST, SND> {
public static <FST, SND> Either<FST, SND> first(FST first) {
return new Either<>(Objects.requireNonNull(first), null);
}
public static <FST, SND> Either<FST, SND> second(SND second) {
return new Either<>(null, Objects.requireNonNull(second));
}
private final FST fst;
private final SND snd;
private Either(FST fst, SND snd) {
this.fst = fst;
this.snd = snd;
}
public boolean isFirst() {
return fst != null;
}
public boolean isSecond() {
return snd != null;
}
public FST getFirst() {
return Objects.requireNonNull(fst);
}
public SND getSecond() {
return Objects.requireNonNull(snd);
}
}
(BTW, what type are the keys and values in your Map(s)? Let's say for
the sake of this example, that you store Map<String, String> instances.
So you can then create a KafkaConsumer<String, Either<String,
Map<String, String>>> and KafkaProducer<String, Either<String,
Map<String, String>>> consumer/producer instances using custom
(de)serialization (Serializer, Deserializer, Serde). Here's what I used
in such occasions:
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
/**
* A {@link Serde} base implementation that is itself also a {@link
Serializer} and a {@link Deserializer}.
*/
public abstract class SelfSerde<T> implements Serde<T>, Serializer<T>,
Deserializer<T> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) { }
@Override
public void close() { }
@Override
public Serializer<T> serializer() {
return this;
}
@Override
public Deserializer<T> deserializer() {
return this;
}
/**
* A {@link SelfSerde} implementation that serializes {@link Void} type
* which has the only possible value {@code null}.
*/
public static final SelfSerde<Void> VOID = new SelfSerde<Void>() {
@Override
public Void deserialize(String topic, byte[] data) {
assert data == null;
return null;
}
@Override
public byte[] serialize(String topic, Void data) {
return null;
}
};
/**
* A {@link SelfSerde} base implementation that delegates
(de)serializing to methods using
* {@link DataInput} and {@link DataOutput} API.
*/
public static abstract class DataIO<T> extends SelfSerde<T> {
@Override
public final T deserialize(String topic, byte[] data) {
if (data == null) return null;
try {
return deserialize(topic, data, new DataInputStream(new
ByteArrayInputStream(data)));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public final byte[] serialize(String topic, T data) {
if (data == null) return null;
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
serialize(topic, data, new DataOutputStream(baos));
return baos.toByteArray();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
protected abstract T deserialize(String topic, byte[] data,
DataInput in) throws IOException;
protected abstract void serialize(String topic, T data,
DataOutput out) throws IOException;
protected static <T> void write(String topic, DataOutput out, T
value, Serde<T> serde) throws IOException {
byte[] bytes = serde.serializer().serialize(topic, value);
if (bytes == null) {
out.writeInt(-1);
} else {
out.writeInt(bytes.length);
out.write(bytes);
}
}
protected static <T> T read(String topic, DataInput in,
Serde<T> serde) throws IOException {
int size = in.readInt();
byte[] bytes;
if (size < 0) {
bytes = null;
} else {
in.readFully(bytes = new byte[size]);
}
return serde.deserializer().deserialize(topic, bytes);
}
}
static abstract class MapSerde<K, V> extends
SelfSerde.DataIO<Map<K, V>> {
final Serde<K> keySerde;
final Serde<V> valSerde;
MapSerde(Serde<K> keySerde, Serde<V> valSerde) {
this.keySerde = Objects.requireNonNull(keySerde);
this.valSerde = Objects.requireNonNull(valSerde);
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
keySerde.configure(configs, isKey);
valSerde.configure(configs, isKey);
}
@Override
public void close() {
keySerde.close();
valSerde.close();
}
@Override
protected void serialize(String topic, Map<K, V> map,
DataOutput out) throws IOException {
try {
int[] size = new int[1];
map.forEach((k, v) -> {
try {
write(topic, out, k, keySerde);
write(topic, out, v, valSerde);
size[0]++;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
out.writeInt(size[0]);
} catch (UncheckedIOException e) {
throw e.getCause();
}
}
}
public static class HashMap<K, V> extends MapSerde<K, V> {
public HashMap(Serde<K> keySerde, Serde<V> valSerde) {
super(keySerde, valSerde);
}
@Override
protected Map<K, V> deserialize(String topic, byte[] data,
DataInput in) throws IOException {
ByteBuffer bb = ByteBuffer.wrap(data, data.length -
Integer.BYTES, Integer.BYTES);
int size = bb.getInt();
Map<K, V> map = new java.util.HashMap<>((size * 4 + 2) / 3);
for (int i = 0; i < size; i++) {
map.put(read(topic, in, keySerde), read(topic, in,
valSerde));
}
return map;
}
}
}
Here's also a Serde implementation for Either<FST, SND> instances:
import org.apache.kafka.common.serialization.Serde;
import java.io.StreamCorruptedException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
public final class EitherSerde<FST, SND> extends SelfSerde<Either<FST,
SND>> {
private final Serde<FST> fstSerde;
private final Serde<SND> sndSerde;
public EitherSerde(Serde<FST> fstSerde,
Serde<SND> sndSerde) {
this.fstSerde = Objects.requireNonNull(fstSerde);
this.sndSerde = Objects.requireNonNull(sndSerde);
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
fstSerde.configure(configs, isKey);
sndSerde.configure(configs, isKey);
}
@Override
public void close() {
fstSerde.close();
sndSerde.close();
}
@Override
public Either<FST, SND> deserialize(String topic, byte[] data) {
if (data == null) return null;
byte[] elementData = Arrays.copyOfRange(data, 0, data.length - 1);
byte id = data[data.length - 1];
switch (id) {
case 0:
return
Either.first(fstSerde.deserializer().deserialize(topic, elementData));
case 1:
return
Either.second(sndSerde.deserializer().deserialize(topic, elementData));
default:
throw new UncheckedIOException(new
StreamCorruptedException("Expected 0 or 1 as last byte of Either"));
}
}
@Override
public byte[] serialize(String topic, Either<FST, SND> oneof2) {
if (oneof2 == null) return null;
byte[] elementData;
byte id;
if (oneof2.isFirst()) {
elementData = fstSerde.serializer().serialize(topic,
oneof2.getFirst());
id = 0;
} else {// oneof2.isSecond()
elementData = sndSerde.serializer().serialize(topic,
oneof2.getSecond());
id = 1;
}
byte[] data = Arrays.copyOf(elementData, elementData.length + 1);
data[data.length - 1] = id;
return data;
}
}
You could then use above utilities in the following way:
SelfSerde<Either<String, Map<String, String>>> valSerde =
new EitherSerde<>(
Serdes.String(),
new SelfSerde.HashMap<>(Serdes.String(), Serdes.String())
);
Properties props = ...;
Consumer<String, Either<String, Map<String, String>>> consumer =
new KafkaConsumer<>(
props,
Serdes.String().deserializer(),
valSerde
);
// or:
Properties props = ...;
Producer<String, Either<String, Map<String, String>>> producer =
new KafkaProducer<>(
props,
Serdes.String().serializer(),
valSerde
);
With custom (de)serialization you achieve compact (small) messages and
low CPU usage which you can't if you use Java default serialization. And
you can still enjoy type-safe code :-)
Regards, Peter