You are viewing a plain text version of this content. The canonical link for it is here.
Posted to log4j-user@logging.apache.org by Michael Williams <wi...@gmail.com> on 2020/06/01 15:23:55 UTC
Unable to send Log Events via the KafkaAppender with existing Kafka Schema
Hi
I’m trying to send logs to Kafka via log4j2. The Topic I’m sending to
already has a Schema and I cannot find a way for the KafkaAppender to get
the Schema Id as it always passes the string “bytes” to the Schema Registry.
In (1) KafkaAppender.tryAppend a byte[] is created representing the
LogEvent to be sent to Kafka. It is always a byte[], never a subclass of
GenericContainer.
Further down the call chain the Schema string is obtained by calling (2)
AvroSchemaUtils.getSchema(). This returns “bytes” as the object passed in
is the byte[] from tryAppend. This is then used in a call to the Schema
Registry which expects a string representation of the Schema and not the
string "bytes".
I haven’t found a way to set the Schema Id for the KafkaAppender or have
the appender use a GenericContainer subclass of my choosing. Is either
possible or is there another way?
Thanks
Michael
1) Creating the byte[] to be sent to Kafka:
private void tryAppend(final LogEvent event) throws ExecutionException,
InterruptedException, TimeoutException {
final Layout<? extends Serializable> layout = getLayout();
byte[] data;
if (layout instanceof SerializedLayout) {
final byte[] header = layout.getHeader();
final byte[] body = layout.toByteArray(event);
data = new byte[header.length + body.length];
System.arraycopy(header, 0, data, 0, header.length);
System.arraycopy(body, 0, data, header.length, body.length);
} else {
data = layout.toByteArray(event);
}
manager.send(data);
}
2) Getting the Schema string to pass to the Schema Registry to get the
Schema Id:
public static Schema getSchema(Object object) {
if (object == null) {
return (Schema)primitiveSchemas.get("Null");
} else if (object instanceof Boolean) {
return (Schema)primitiveSchemas.get("Boolean");
} else if (object instanceof Integer) {
return (Schema)primitiveSchemas.get("Integer");
} else if (object instanceof Long) {
return (Schema)primitiveSchemas.get("Long");
} else if (object instanceof Float) {
return (Schema)primitiveSchemas.get("Float");
} else if (object instanceof Double) {
return (Schema)primitiveSchemas.get("Double");
} else if (object instanceof CharSequence) {
return (Schema)primitiveSchemas.get("String");
} else if (object instanceof byte[]) {
return (Schema)primitiveSchemas.get("Bytes");
} else if (object instanceof GenericContainer) {
return ((GenericContainer)object).getSchema();
} else {
throw new IllegalArgumentException("Unsupported Avro type. Supported
types are null, Boolean, Integer, Long, Float, Double, String, byte[] and
IndexedRecord");
}
}