You are viewing a plain text version of this content. The canonical link for it is here.
Posted to log4j-user@logging.apache.org by Michael Williams <wi...@gmail.com> on 2020/06/01 15:23:55 UTC

Unable to send Log Events via the KafkaAppender with existing Kafka Schema

Hi

I’m trying to send logs to Kafka via log4j2. The Topic I’m sending to
already has a Schema and I cannot find a way for the KafkaAppender to get
the Schema Id as it always passes the string “bytes” to the Schema Registry.

In (1) KafkaAppender.tryAppend a byte[] is created representing the
LogEvent to be sent to Kafka. It is always a byte[], never a subclass of
GenericContainer.

Further down the call chain the Schema string is obtained by calling (2)
AvroSchemaUtils.getSchema(). This returns “bytes” as the object passed in
is the byte[] from tryAppend. This is then used in a call to the Schema
Registry which expects a string representation of the Schema and not the
string "bytes".

I haven’t found a way to set the Schema Id for the KafkaAppender or have
the appender use a GenericContainer subclass of my choosing. Is either
possible or is there another way?

Thanks
Michael

1) Creating the byte[] to be sent to Kafka:

private void tryAppend(final LogEvent event) throws ExecutionException,
 InterruptedException, TimeoutException {
    final Layout<? extends Serializable> layout = getLayout();
    byte[] data;
    if (layout instanceof SerializedLayout) {
        final byte[] header = layout.getHeader();
        final byte[] body = layout.toByteArray(event);
        data = new byte[header.length + body.length];
        System.arraycopy(header, 0, data, 0, header.length);
        System.arraycopy(body, 0, data, header.length, body.length);
    } else {
        data = layout.toByteArray(event);
    }
    manager.send(data);
}


2) Getting the Schema string to pass to the Schema Registry to get the
Schema Id:

public static Schema getSchema(Object object) {
  if (object == null) {
    return (Schema)primitiveSchemas.get("Null");
  } else if (object instanceof Boolean) {
    return (Schema)primitiveSchemas.get("Boolean");
  } else if (object instanceof Integer) {
    return (Schema)primitiveSchemas.get("Integer");
  } else if (object instanceof Long) {
    return (Schema)primitiveSchemas.get("Long");
  } else if (object instanceof Float) {
    return (Schema)primitiveSchemas.get("Float");
  } else if (object instanceof Double) {
    return (Schema)primitiveSchemas.get("Double");
  } else if (object instanceof CharSequence) {
    return (Schema)primitiveSchemas.get("String");
  } else if (object instanceof byte[]) {
    return (Schema)primitiveSchemas.get("Bytes");
  } else if (object instanceof GenericContainer) {
    return ((GenericContainer)object).getSchema();
  } else {
    throw new IllegalArgumentException("Unsupported Avro type. Supported
types are null, Boolean, Integer, Long, Float, Double, String, byte[] and
IndexedRecord");
  }
}