You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fineract.apache.org by ar...@apache.org on 2022/11/07 10:16:24 UTC

[fineract] branch develop updated: FINERACT-1724: Refactoring the business event serializers for better extensability

This is an automated email from the ASF dual-hosted git repository.

arnold pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/fineract.git


The following commit(s) were added to refs/heads/develop by this push:
     new a7d737b21 FINERACT-1724: Refactoring the business event serializers for better extensability
a7d737b21 is described below

commit a7d737b2150a3698af89d44ace0ef526ec1628a2
Author: Arnold Galovics <ga...@gmail.com>
AuthorDate: Fri Nov 4 18:57:43 2022 +0100

    FINERACT-1724: Refactoring the business event serializers for better extensability
---
 fineract-avro-schemas/build.gradle                 |   1 +
 .../avro/generator/ByteBufferSerializable.java     |  28 +
 .../resources/avro-generator-templates/enum.vm     |  54 ++
 .../resources/avro-generator-templates/fixed.vm    |  82 ++
 .../resources/avro-generator-templates/protocol.vm | 114 +++
 .../resources/avro-generator-templates/record.vm   | 893 +++++++++++++++++++++
 ...r.java => AbstractBusinessEventSerializer.java} |  31 +-
 .../client/ClientBusinessEventSerializer.java      |  15 +-
 ...FixedDepositAccountBusinessEventSerializer.java |  15 +-
 .../group/GroupsBusinessEventSerializer.java       |  15 +-
 ...anAdjustTransactionBusinessEventSerializer.java |  16 +-
 .../loan/LoanBusinessEventSerializer.java          |  15 +-
 .../loan/LoanChargeBusinessEventSerializer.java    |  15 +-
 .../LoanChargeDeletedBusinessEventSerializer.java  |  13 +-
 .../loan/LoanProductBusinessEventSerializer.java   |  15 +-
 .../LoanTransactionBusinessEventSerializer.java    |  15 +-
 ...rringDepositAccountBusinessEventSerializer.java |  15 +-
 .../SavingsAccountBusinessEventSerializer.java     |  15 +-
 ...sAccountTransactionBusinessEventSerializer.java |  15 +-
 .../share/ShareAccountBusinessEventSerializer.java |  15 +-
 ...ductDividentsCreateBusinessEventSerializer.java |  15 +-
 21 files changed, 1251 insertions(+), 161 deletions(-)

diff --git a/fineract-avro-schemas/build.gradle b/fineract-avro-schemas/build.gradle
index ebea32170..ea2051dd3 100644
--- a/fineract-avro-schemas/build.gradle
+++ b/fineract-avro-schemas/build.gradle
@@ -41,6 +41,7 @@ task preprocessAvroSchemas() {
 task buildJavaSdk(type: GenerateAvroJavaTask) {
     source("$buildDir/generated/avro/src/main/avro")
     outputDir = file("$buildDir/generated/java/src/main/java")
+    templateDirectory = "$projectDir/src/main/resources/avro-generator-templates/"
 }
 
 spotless {
diff --git a/fineract-avro-schemas/src/main/java/org/apache/fineract/avro/generator/ByteBufferSerializable.java b/fineract-avro-schemas/src/main/java/org/apache/fineract/avro/generator/ByteBufferSerializable.java
new file mode 100644
index 000000000..db401154b
--- /dev/null
+++ b/fineract-avro-schemas/src/main/java/org/apache/fineract/avro/generator/ByteBufferSerializable.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fineract.avro.generator;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public interface ByteBufferSerializable {
+
+    ByteBuffer toByteBuffer() throws IOException;
+}
diff --git a/fineract-avro-schemas/src/main/resources/avro-generator-templates/enum.vm b/fineract-avro-schemas/src/main/resources/avro-generator-templates/enum.vm
new file mode 100644
index 000000000..813a77c7c
--- /dev/null
+++ b/fineract-avro-schemas/src/main/resources/avro-generator-templates/enum.vm
@@ -0,0 +1,54 @@
+#*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *#
+##
+## Licensed to the Apache Software Foundation (ASF) under one
+## or more contributor license agreements.  See the NOTICE file
+## distributed with this work for additional information
+## regarding copyright ownership.  The ASF licenses this file
+## to you under the Apache License, Version 2.0 (the
+## "License"); you may not use this file except in compliance
+## with the License.  You may obtain a copy of the License at
+##
+##     https://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+##
+#if ($schema.getNamespace())
+package $this.mangle($schema.getNamespace());
+#end
+#if ($schema.getDoc())
+/** $schema.getDoc() */
+#end
+#foreach ($annotation in $this.javaAnnotations($schema))
+@$annotation
+#end
+@org.apache.avro.specific.AvroGenerated
+public enum ${this.mangleTypeIdentifier($schema.getName())} implements org.apache.avro.generic.GenericEnumSymbol<${this.mangleTypeIdentifier($schema.getName())}> {
+  #foreach ($symbol in ${schema.getEnumSymbols()})${this.mangle($symbol)}#if ($foreach.hasNext), #end#end
+  ;
+  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("${this.javaEscape($schema.toString())}");
+  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+
+  @Override
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+}
\ No newline at end of file
diff --git a/fineract-avro-schemas/src/main/resources/avro-generator-templates/fixed.vm b/fineract-avro-schemas/src/main/resources/avro-generator-templates/fixed.vm
new file mode 100644
index 000000000..9815f0048
--- /dev/null
+++ b/fineract-avro-schemas/src/main/resources/avro-generator-templates/fixed.vm
@@ -0,0 +1,82 @@
+#*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *#
+##
+## Licensed to the Apache Software Foundation (ASF) under one
+## or more contributor license agreements.  See the NOTICE file
+## distributed with this work for additional information
+## regarding copyright ownership.  The ASF licenses this file
+## to you under the Apache License, Version 2.0 (the
+## "License"); you may not use this file except in compliance
+## with the License.  You may obtain a copy of the License at
+##
+##     https://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+##
+#if ($schema.getNamespace())
+package $this.mangle($schema.getNamespace());
+#end
+#if ($schema.getDoc())
+/** $schema.getDoc() */
+#end
+#foreach ($annotation in $this.javaAnnotations($schema))
+@$annotation
+#end
+@org.apache.avro.specific.FixedSize($schema.getFixedSize())
+@org.apache.avro.specific.AvroGenerated
+public class ${this.mangleTypeIdentifier($schema.getName())} extends org.apache.avro.specific.SpecificFixed {
+  private static final long serialVersionUID = ${this.fingerprint64($schema)}L;
+  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("${this.javaEscape($schema.toString())}");
+  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+
+  /** Creates a new ${this.mangleTypeIdentifier($schema.getName())} */
+  public ${this.mangleTypeIdentifier($schema.getName())}() {
+    super();
+  }
+
+  /**
+   * Creates a new ${this.mangleTypeIdentifier($schema.getName())} with the given bytes.
+   * @param bytes The bytes to create the new ${this.mangleTypeIdentifier($schema.getName())}.
+   */
+  public ${this.mangleTypeIdentifier($schema.getName())}(byte[] bytes) {
+    super(bytes);
+  }
+
+  private static final org.apache.avro.io.DatumWriter<${this.mangleTypeIdentifier($schema.getName())}>
+    WRITER$ = new org.apache.avro.specific.SpecificDatumWriter<${this.mangleTypeIdentifier($schema.getName())}>(SCHEMA$);
+
+  @Override public void writeExternal(java.io.ObjectOutput out)
+    throws java.io.IOException {
+    WRITER$.write(this, org.apache.avro.specific.SpecificData.getEncoder(out));
+  }
+
+  private static final org.apache.avro.io.DatumReader<${this.mangleTypeIdentifier($schema.getName())}>
+    READER$ = new org.apache.avro.specific.SpecificDatumReader<${this.mangleTypeIdentifier($schema.getName())}>(SCHEMA$);
+
+  @Override public void readExternal(java.io.ObjectInput in)
+    throws java.io.IOException {
+    READER$.read(this, org.apache.avro.specific.SpecificData.getDecoder(in));
+  }
+
+}
\ No newline at end of file
diff --git a/fineract-avro-schemas/src/main/resources/avro-generator-templates/protocol.vm b/fineract-avro-schemas/src/main/resources/avro-generator-templates/protocol.vm
new file mode 100644
index 000000000..a4a3190f1
--- /dev/null
+++ b/fineract-avro-schemas/src/main/resources/avro-generator-templates/protocol.vm
@@ -0,0 +1,114 @@
+#*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *#
+##
+## Licensed to the Apache Software Foundation (ASF) under one
+## or more contributor license agreements.  See the NOTICE file
+## distributed with this work for additional information
+## regarding copyright ownership.  The ASF licenses this file
+## to you under the Apache License, Version 2.0 (the
+## "License"); you may not use this file except in compliance
+## with the License.  You may obtain a copy of the License at
+##
+##     https://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+##
+#if ($protocol.getNamespace())
+package $this.mangle($protocol.getNamespace());
+#end
+
+#if ($protocol.getDoc())
+/** $protocol.getDoc() */
+#end
+#foreach ($annotation in $this.javaAnnotations($protocol))
+@$annotation
+#end
+@org.apache.avro.specific.AvroGenerated
+public interface $this.mangleTypeIdentifier($protocol.getName()) {
+  public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse(${this.javaSplit($protocol.toString())});
+#foreach ($e in $protocol.getMessages().entrySet())
+#set ($name = $e.getKey())
+#set ($message = $e.getValue())
+#set ($response = $message.getResponse())
+  /**
+#if ($message.getDoc())
+   * $this.escapeForJavadoc($message.getDoc())
+#end
+#foreach ($p in $message.getRequest().getFields())##
+#if ($p.doc())   * @param ${this.mangle($p.name())} $p.doc()
+#end
+#end
+   */
+#foreach ($annotation in $this.javaAnnotations($message))
+  @$annotation
+#end
+  #if ($message.isOneWay())void#else${this.javaUnbox($response, true)}#end
+ ${this.mangle($name)}(##
+#foreach ($p in $message.getRequest().getFields())##
+#*      *#${this.javaUnbox($p.schema(), false)} ${this.mangle($p.name())}#if ($foreach.hasNext), #end
+#end
+)#if (! $message.isOneWay() && $message.getErrors().getTypes().size() > 1)
+ throws ##
+## The first error is always "string", so we skip it.
+#foreach ($error in $message.getErrors().getTypes().subList(1, $message.getErrors().getTypes().size()))
+${this.mangle($error.getFullName())}##
+#if ($foreach.hasNext), #end
+#end##    (error list)
+#end##    (one way)
+;
+#end## (requests)
+
+## Generate nested callback API
+#if ($protocol.getDoc())
+  /** $protocol.getDoc() */
+#end
+  @org.apache.avro.specific.AvroGenerated
+  public interface Callback extends $this.mangleTypeIdentifier($protocol.getName()) {
+    public static final org.apache.avro.Protocol PROTOCOL = #if ($this.mangle($protocol.getNamespace()))$this.mangle($protocol.getNamespace()).#end${this.mangleTypeIdentifier($protocol.getName())}.PROTOCOL;
+#foreach ($e in $protocol.getMessages().entrySet())
+#set ($name = $e.getKey())
+#set ($message = $e.getValue())
+#set ($response = $message.getResponse())
+## Generate callback method if the message is not one-way:
+#if (! $message.isOneWay())
+    /**
+#if ($message.getDoc())
+     * $this.escapeForJavadoc($message.getDoc())
+#end
+#foreach ($p in $message.getRequest().getFields())##
+#if ($p.doc())     * @param ${this.mangle($p.name())} $p.doc()
+#end
+#end
+     * @throws java.io.IOException The async call could not be completed.
+     */
+    void ${this.mangle($name)}(##
+#foreach ($p in $message.getRequest().getFields())##
+#*      *#${this.javaUnbox($p.schema(), false)} ${this.mangle($p.name())}#if ($foreach.hasNext), #end
+#end
+#if ($message.getRequest().getFields().size() > 0), #end
+org.apache.avro.ipc.Callback<${this.javaType($response)}> callback) throws java.io.IOException;
+#end## (generate callback method)
+#end## (requests)
+  }## End of Callback interface
+
+}## End of protocol interface
\ No newline at end of file
diff --git a/fineract-avro-schemas/src/main/resources/avro-generator-templates/record.vm b/fineract-avro-schemas/src/main/resources/avro-generator-templates/record.vm
new file mode 100644
index 000000000..d7ee925ab
--- /dev/null
+++ b/fineract-avro-schemas/src/main/resources/avro-generator-templates/record.vm
@@ -0,0 +1,893 @@
+#*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *#
+##
+## Licensed to the Apache Software Foundation (ASF) under one
+## or more contributor license agreements.  See the NOTICE file
+## distributed with this work for additional information
+## regarding copyright ownership.  The ASF licenses this file
+## to you under the Apache License, Version 2.0 (the
+## "License"); you may not use this file except in compliance
+## with the License.  You may obtain a copy of the License at
+##
+##     https://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+##
+#if ($schema.getNamespace())
+package $this.mangle($schema.getNamespace());
+#end
+
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.util.Utf8;
+#if (!$schema.isError())
+import org.apache.avro.message.BinaryMessageEncoder;
+import org.apache.avro.message.BinaryMessageDecoder;
+import org.apache.avro.message.SchemaStore;
+#end
+#if (${this.gettersReturnOptional} || ${this.createOptionalGetters})import java.util.Optional;#end
+
+#if ($schema.getDoc())
+/** $schema.getDoc() */
+#end
+#foreach ($annotation in $this.javaAnnotations($schema))
+@$annotation
+#end
+@org.apache.avro.specific.AvroGenerated
+public class ${this.mangleTypeIdentifier($schema.getName())}#if ($schema.isError()) extends org.apache.avro.specific.SpecificExceptionBase#else extends org.apache.avro.specific.SpecificRecordBase#end implements org.apache.avro.specific.SpecificRecord, org.apache.fineract.avro.generator.ByteBufferSerializable {
+  private static final long serialVersionUID = ${this.fingerprint64($schema)}L;
+
+#set ($schemaString = $this.javaSplit($schema.toString()))
+#set ($customLogicalTypeFactories = $this.getUsedCustomLogicalTypeFactories($schema).entrySet())
+#if (!$customLogicalTypeFactories.isEmpty())
+  static {
+#foreach ($customLogicalTypeFactory in $customLogicalTypeFactories)
+    org.apache.avro.LogicalTypes.register("${customLogicalTypeFactory.getKey()}", new ${customLogicalTypeFactory.getValue()}());
+#end
+  }
+#end
+
+  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse($schemaString);
+  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+
+  private static final SpecificData MODEL$ = new SpecificData();
+#set ($usedConversions = $this.getUsedConversionClasses($schema))
+#if (!$usedConversions.isEmpty())
+  static {
+#foreach ($conversion in $usedConversions)
+    MODEL$.addLogicalTypeConversion(new ${conversion}());
+#end
+  }
+#end
+
+#if (!$schema.isError())
+  private static final BinaryMessageEncoder<${this.mangleTypeIdentifier($schema.getName())}> ENCODER =
+      new BinaryMessageEncoder<>(MODEL$, SCHEMA$);
+
+  private static final BinaryMessageDecoder<${this.mangleTypeIdentifier($schema.getName())}> DECODER =
+      new BinaryMessageDecoder<>(MODEL$, SCHEMA$);
+
+  /**
+   * Return the BinaryMessageEncoder instance used by this class.
+   * @return the message encoder used by this class
+   */
+  public static BinaryMessageEncoder<${this.mangleTypeIdentifier($schema.getName())}> getEncoder() {
+    return ENCODER;
+  }
+
+  /**
+   * Return the BinaryMessageDecoder instance used by this class.
+   * @return the message decoder used by this class
+   */
+  public static BinaryMessageDecoder<${this.mangleTypeIdentifier($schema.getName())}> getDecoder() {
+    return DECODER;
+  }
+
+  /**
+   * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}.
+   * @param resolver a {@link SchemaStore} used to find schemas by fingerprint
+   * @return a BinaryMessageDecoder instance for this class backed by the given SchemaStore
+   */
+  public static BinaryMessageDecoder<${this.mangleTypeIdentifier($schema.getName())}> createDecoder(SchemaStore resolver) {
+    return new BinaryMessageDecoder<>(MODEL$, SCHEMA$, resolver);
+  }
+
+  /**
+   * Serializes this ${schema.getName()} to a ByteBuffer.
+   * @return a buffer holding the serialized data for this instance
+   * @throws java.io.IOException if this instance could not be serialized
+   */
+  @Override
+  public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
+    return ENCODER.encode(this);
+  }
+
+  /**
+   * Deserializes a ${schema.getName()} from a ByteBuffer.
+   * @param b a byte buffer holding serialized data for an instance of this class
+   * @return a ${schema.getName()} instance decoded from the given buffer
+   * @throws java.io.IOException if the given bytes could not be deserialized into an instance of this class
+   */
+  public static ${this.mangleTypeIdentifier($schema.getName())} fromByteBuffer(
+      java.nio.ByteBuffer b) throws java.io.IOException {
+    return DECODER.decode(b);
+  }
+#end
+
+#foreach ($field in $schema.getFields())
+#if ($field.doc())
+  /** $field.doc() */
+#end
+#foreach ($annotation in $this.javaAnnotations($field))
+  @$annotation
+#end
+  #if (${this.publicFields()})public#elseif (${this.privateFields()})private#end ${this.javaUnbox($field.schema(), false)} ${this.mangle($field.name(), $schema.isError())};
+#end
+#if ($schema.isError())
+
+  public ${this.mangleTypeIdentifier($schema.getName())}() {
+    super();
+  }
+
+  public ${this.mangleTypeIdentifier($schema.getName())}(Object value) {
+    super(value);
+  }
+
+  public ${this.mangleTypeIdentifier($schema.getName())}(Throwable cause) {
+    super(cause);
+  }
+
+  public ${this.mangleTypeIdentifier($schema.getName())}(Object value, Throwable cause) {
+    super(value, cause);
+  }
+
+#else
+#if ($schema.getFields().size() > 0)
+
+  /**
+   * Default constructor.  Note that this does not initialize fields
+   * to their default values from the schema.  If that is desired then
+   * one should use <code>newBuilder()</code>.
+   */
+  public ${this.mangleTypeIdentifier($schema.getName())}() {}
+#if ($this.isCreateAllArgsConstructor())
+
+  /**
+   * All-args constructor.
+#foreach ($field in $schema.getFields())
+#if ($field.doc())   * @param ${this.mangle($field.name())} $field.doc()
+#else   * @param ${this.mangle($field.name())} The new value for ${field.name()}
+#end
+#end
+   */
+  public ${this.mangleTypeIdentifier($schema.getName())}(#foreach($field in $schema.getFields())${this.javaType($field.schema())} ${this.mangle($field.name())}#if($foreach.count < $schema.getFields().size()), #end#end) {
+#foreach ($field in $schema.getFields())
+    ${this.generateSetterCode($field.schema(), ${this.mangle($field.name())}, ${this.mangle($field.name())})}
+#end
+  }
+#else
+  /**
+   * This schema contains more than 254 fields which exceeds the maximum number
+   * of permitted constructor parameters in the JVM. An all-args constructor
+   * will not be generated. Please use <code>newBuilder()</code> to instantiate
+   * objects instead.
+   */
+#end
+#end
+
+#end
+  @Override
+  public org.apache.avro.specific.SpecificData getSpecificData() { return MODEL$; }
+
+  @Override
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+
+  // Used by DatumWriter.  Applications should not call.
+  @Override
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+#set ($i = 0)
+#foreach ($field in $schema.getFields())
+    case $i: return ${this.mangle($field.name(), $schema.isError())};
+#set ($i = $i + 1)
+#end
+    default: throw new IndexOutOfBoundsException("Invalid index: " + field$);
+    }
+  }
+
+#if ($this.hasLogicalTypeField($schema))
+  private static final org.apache.avro.Conversion<?>[] conversions =
+      new org.apache.avro.Conversion<?>[] {
+#foreach ($field in $schema.getFields())
+      ${this.conversionInstance($field.schema())},
+#end
+      null
+  };
+
+  @Override
+  public org.apache.avro.Conversion<?> getConversion(int field) {
+    return conversions[field];
+  }
+
+#end
+  // Used by DatumReader.  Applications should not call.
+  @Override
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+#set ($i = 0)
+#foreach ($field in $schema.getFields())
+    case $i: ${this.mangle($field.name(), $schema.isError())} = #if(${this.javaType($field.schema())} != "java.lang.Object" && ${this.javaType($field.schema())} != "java.lang.String")(${this.javaType($field.schema())})#{end}value$#if(${this.javaType($field.schema())} == "java.lang.String") != null ? value$.toString() : null#{end}; break;
+#set ($i = $i + 1)
+#end
+    default: throw new IndexOutOfBoundsException("Invalid index: " + field$);
+    }
+  }
+
+#foreach ($field in $schema.getFields())
+#if (${this.gettersReturnOptional} && (!${this.optionalGettersForNullableFieldsOnly} || ${field.schema().isNullable()}))
+  /**
+   * Gets the value of the '${this.mangle($field.name(), $schema.isError())}' field as an Optional&lt;${this.javaType($field.schema())}&gt;.
+#if ($field.doc())   * $field.doc()
+#end
+   * @return The value wrapped in an Optional&lt;${this.javaType($field.schema())}&gt;.
+   */
+  public Optional<${this.javaType($field.schema())}> ${this.generateGetMethod($schema, $field)}() {
+    return Optional.<${this.javaType($field.schema())}>ofNullable(${this.mangle($field.name(), $schema.isError())});
+  }
+#else
+  /**
+   * Gets the value of the '${this.mangle($field.name(), $schema.isError())}' field.
+#if ($field.doc())   * @return $field.doc()
+#else   * @return The value of the '${this.mangle($field.name(), $schema.isError())}' field.
+#end
+   */
+  public ${this.javaUnbox($field.schema(), false)} ${this.generateGetMethod($schema, $field)}() {
+    return ${this.mangle($field.name(), $schema.isError())};
+  }
+#end
+
+#if (${this.createOptionalGetters})
+  /**
+   * Gets the value of the '${this.mangle($field.name(), $schema.isError())}' field as an Optional&lt;${this.javaType($field.schema())}&gt;.
+#if ($field.doc())   * $field.doc()
+#end
+   * @return The value wrapped in an Optional&lt;${this.javaType($field.schema())}&gt;.
+   */
+  public Optional<${this.javaType($field.schema())}> ${this.generateGetOptionalMethod($schema, $field)}() {
+    return Optional.<${this.javaType($field.schema())}>ofNullable(${this.mangle($field.name(), $schema.isError())});
+  }
+#end
+
+#if ($this.createSetters)
+  /**
+   * Sets the value of the '${this.mangle($field.name(), $schema.isError())}' field.
+#if ($field.doc())   * $field.doc()
+#end
+   * @param value the value to set.
+   */
+  public void ${this.generateSetMethod($schema, $field)}(${this.javaUnbox($field.schema(), false)} value) {
+    ${this.generateSetterCode($field.schema(), ${this.mangle($field.name(), $schema.isError())}, "value")}
+  }
+#end
+
+#end
+  /**
+   * Creates a new ${this.mangleTypeIdentifier($schema.getName())} RecordBuilder.
+   * @return A new ${this.mangleTypeIdentifier($schema.getName())} RecordBuilder
+   */
+  public static #if ($schema.getNamespace())$this.mangle($schema.getNamespace()).#end${this.mangleTypeIdentifier($schema.getName())}.Builder newBuilder() {
+    return new #if ($schema.getNamespace())$this.mangle($schema.getNamespace()).#end${this.mangleTypeIdentifier($schema.getName())}.Builder();
+  }
+
+  /**
+   * Creates a new ${this.mangleTypeIdentifier($schema.getName())} RecordBuilder by copying an existing Builder.
+   * @param other The existing builder to copy.
+   * @return A new ${this.mangleTypeIdentifier($schema.getName())} RecordBuilder
+   */
+  public static #if ($schema.getNamespace())$this.mangle($schema.getNamespace()).#end${this.mangleTypeIdentifier($schema.getName())}.Builder newBuilder(#if ($schema.getNamespace())$this.mangle($schema.getNamespace()).#end${this.mangleTypeIdentifier($schema.getName())}.Builder other) {
+    if (other == null) {
+      return new #if ($schema.getNamespace())$this.mangle($schema.getNamespace()).#end${this.mangleTypeIdentifier($schema.getName())}.Builder();
+    } else {
+      return new #if ($schema.getNamespace())$this.mangle($schema.getNamespace()).#end${this.mangleTypeIdentifier($schema.getName())}.Builder(other);
+    }
+  }
+
+  /**
+   * Creates a new ${this.mangleTypeIdentifier($schema.getName())} RecordBuilder by copying an existing $this.mangleTypeIdentifier($schema.getName()) instance.
+   * @param other The existing instance to copy.
+   * @return A new ${this.mangleTypeIdentifier($schema.getName())} RecordBuilder
+   */
+  public static #if ($schema.getNamespace())$this.mangle($schema.getNamespace()).#end${this.mangleTypeIdentifier($schema.getName())}.Builder newBuilder(#if ($schema.getNamespace())$this.mangle($schema.getNamespace()).#end${this.mangleTypeIdentifier($schema.getName())} other) {
+    if (other == null) {
+      return new #if ($schema.getNamespace())$this.mangle($schema.getNamespace()).#end${this.mangleTypeIdentifier($schema.getName())}.Builder();
+    } else {
+      return new #if ($schema.getNamespace())$this.mangle($schema.getNamespace()).#end${this.mangleTypeIdentifier($schema.getName())}.Builder(other);
+    }
+  }
+
+  /**
+   * RecordBuilder for ${this.mangleTypeIdentifier($schema.getName())} instances.
+   */
+  @org.apache.avro.specific.AvroGenerated
+  public static class Builder extends#if ($schema.isError()) org.apache.avro.specific.SpecificErrorBuilderBase<${this.mangleTypeIdentifier($schema.getName())}>#else org.apache.avro.specific.SpecificRecordBuilderBase<${this.mangleTypeIdentifier($schema.getName())}>#end
+
+    implements#if ($schema.isError()) org.apache.avro.data.ErrorBuilder<${this.mangleTypeIdentifier($schema.getName())}>#else org.apache.avro.data.RecordBuilder<${this.mangleTypeIdentifier($schema.getName())}>#end {
+
+#foreach ($field in $schema.getFields())
+#if ($field.doc())
+    /** $field.doc() */
+#end
+    private ${this.javaUnbox($field.schema(), false)} ${this.mangle($field.name(), $schema.isError())};
+#if (${this.hasBuilder($field.schema())})
+    private ${this.javaUnbox($field.schema(), false)}.Builder ${this.mangle($field.name(), $schema.isError())}Builder;
+#end
+#end
+
+    /** Creates a new Builder */
+    private Builder() {
+      super(SCHEMA$, MODEL$);
+    }
+
+    /**
+     * Creates a Builder by copying an existing Builder.
+     * @param other The existing Builder to copy.
+     */
+    private Builder(#if ($schema.getNamespace())$this.mangle($schema.getNamespace()).#end${this.mangleTypeIdentifier($schema.getName())}.Builder other) {
+      super(other);
+#foreach ($field in $schema.getFields())
+      if (isValidValue(fields()[$field.pos()], other.${this.mangle($field.name(), $schema.isError())})) {
+        this.${this.mangle($field.name(), $schema.isError())} = data().deepCopy(fields()[$field.pos()].schema(), other.${this.mangle($field.name(), $schema.isError())});
+        fieldSetFlags()[$field.pos()] = other.fieldSetFlags()[$field.pos()];
+      }
+#if (${this.hasBuilder($field.schema())})
+      if (other.${this.generateHasBuilderMethod($schema, $field)}()) {
+        this.${this.mangle($field.name(), $schema.isError())}Builder = ${this.javaType($field.schema())}.newBuilder(other.${this.generateGetBuilderMethod($schema, $field)}());
+      }
+#end
+#end
+    }
+
+    /**
+     * Creates a Builder by copying an existing $this.mangleTypeIdentifier($schema.getName()) instance
+     * @param other The existing instance to copy.
+     */
+    private Builder(#if ($schema.getNamespace())$this.mangle($schema.getNamespace()).#end${this.mangleTypeIdentifier($schema.getName())} other) {
+#if ($schema.isError())      super(other)#else
+      super(SCHEMA$, MODEL$)#end;
+#foreach ($field in $schema.getFields())
+      if (isValidValue(fields()[$field.pos()], other.${this.mangle($field.name(), $schema.isError())})) {
+        this.${this.mangle($field.name(), $schema.isError())} = data().deepCopy(fields()[$field.pos()].schema(), other.${this.mangle($field.name(), $schema.isError())});
+        fieldSetFlags()[$field.pos()] = true;
+      }
+#if (${this.hasBuilder($field.schema())})
+      this.${this.mangle($field.name(), $schema.isError())}Builder = null;
+#end
+#end
+    }
+#if ($schema.isError())
+
+    @Override
+    public #if ($schema.getNamespace())$this.mangle($schema.getNamespace()).#end${this.mangleTypeIdentifier($schema.getName())}.Builder setValue(Object value) {
+      super.setValue(value);
+      return this;
+    }
+
+    @Override
+    public #if ($schema.getNamespace())$this.mangle($schema.getNamespace()).#end${this.mangleTypeIdentifier($schema.getName())}.Builder clearValue() {
+      super.clearValue();
+      return this;
+    }
+
+    @Override
+    public #if ($schema.getNamespace())$this.mangle($schema.getNamespace()).#end${this.mangleTypeIdentifier($schema.getName())}.Builder setCause(Throwable cause) {
+      super.setCause(cause);
+      return this;
+    }
+
+    @Override
+    public #if ($schema.getNamespace())$this.mangle($schema.getNamespace()).#end${this.mangleTypeIdentifier($schema.getName())}.Builder clearCause() {
+      super.clearCause();
+      return this;
+    }
+#end
+
+#foreach ($field in $schema.getFields())
+    /**
+      * Gets the value of the '${this.mangle($field.name(), $schema.isError())}' field.
+#if ($field.doc())      * $field.doc()
+#end
+      * @return The value.
+      */
+    public ${this.javaUnbox($field.schema(), false)} ${this.generateGetMethod($schema, $field)}() {
+      return ${this.mangle($field.name(), $schema.isError())};
+    }
+
+#if (${this.createOptionalGetters})
+    /**
+      * Gets the value of the '${this.mangle($field.name(), $schema.isError())}' field as an Optional&lt;${this.javaType($field.schema())}&gt;.
+#if ($field.doc())      * $field.doc()
+#end
+      * @return The value wrapped in an Optional&lt;${this.javaType($field.schema())}&gt;.
+      */
+    public Optional<${this.javaType($field.schema())}> ${this.generateGetOptionalMethod($schema, $field)}() {
+      return Optional.<${this.javaType($field.schema())}>ofNullable(${this.mangle($field.name(), $schema.isError())});
+    }
+#end
+
+    /**
+      * Sets the value of the '${this.mangle($field.name(), $schema.isError())}' field.
+#if ($field.doc())      * $field.doc()
+#end
+      * @param value The value of '${this.mangle($field.name(), $schema.isError())}'.
+      * @return This builder.
+      */
+    public #if ($schema.getNamespace())$this.mangle($schema.getNamespace()).#end${this.mangleTypeIdentifier($schema.getName())}.Builder ${this.generateSetMethod($schema, $field)}(${this.javaUnbox($field.schema(), false)} value) {
+      validate(fields()[$field.pos()], value);
+#if (${this.hasBuilder($field.schema())})
+      this.${this.mangle($field.name(), $schema.isError())}Builder = null;
+#end
+      ${this.generateSetterCode($field.schema(), ${this.mangle($field.name(), $schema.isError())}, "value")}
+      fieldSetFlags()[$field.pos()] = true;
+      return this;
+    }
+
+    /**
+      * Checks whether the '${this.mangle($field.name(), $schema.isError())}' field has been set.
+#if ($field.doc())      * $field.doc()
+#end
+      * @return True if the '${this.mangle($field.name(), $schema.isError())}' field has been set, false otherwise.
+      */
+    public boolean ${this.generateHasMethod($schema, $field)}() {
+      return fieldSetFlags()[$field.pos()];
+    }
+
+#if (${this.hasBuilder($field.schema())})
+    /**
+     * Gets the Builder instance for the '${this.mangle($field.name(), $schema.isError())}' field and creates one if it doesn't exist yet.
+#if ($field.doc())     * $field.doc()
+#end
+     * @return This builder.
+     */
+    public ${this.javaType($field.schema())}.Builder ${this.generateGetBuilderMethod($schema, $field)}() {
+      if (${this.mangle($field.name(), $schema.isError())}Builder == null) {
+        if (${this.generateHasMethod($schema, $field)}()) {
+          ${this.generateSetBuilderMethod($schema, $field)}(${this.javaType($field.schema())}.newBuilder(${this.mangle($field.name(), $schema.isError())}));
+        } else {
+          ${this.generateSetBuilderMethod($schema, $field)}(${this.javaType($field.schema())}.newBuilder());
+        }
+      }
+      return ${this.mangle($field.name(), $schema.isError())}Builder;
+    }
+
+    /**
+     * Sets the Builder instance for the '${this.mangle($field.name(), $schema.isError())}' field
+#if ($field.doc())     * $field.doc()
+#end
+     * @param value The builder instance that must be set.
+     * @return This builder.
+     */
+
+    public #if ($schema.getNamespace())$this.mangle($schema.getNamespace()).#end${this.mangleTypeIdentifier($schema.getName())}.Builder ${this.generateSetBuilderMethod($schema, $field)}(${this.javaUnbox($field.schema(), false)}.Builder value) {
+      ${this.generateClearMethod($schema, $field)}();
+      ${this.mangle($field.name(), $schema.isError())}Builder = value;
+      return this;
+    }
+
+    /**
+     * Checks whether the '${this.mangle($field.name(), $schema.isError())}' field has an active Builder instance
+#if ($field.doc())     * $field.doc()
+#end
+     * @return True if the '${this.mangle($field.name(), $schema.isError())}' field has an active Builder instance
+     */
+    public boolean ${this.generateHasBuilderMethod($schema, $field)}() {
+      return ${this.mangle($field.name(), $schema.isError())}Builder != null;
+    }
+#end
+
+    /**
+      * Clears the value of the '${this.mangle($field.name(), $schema.isError())}' field.
+#if ($field.doc())      * $field.doc()
+#end
+      * @return This builder.
+      */
+    public #if ($schema.getNamespace())$this.mangle($schema.getNamespace()).#end${this.mangleTypeIdentifier($schema.getName())}.Builder ${this.generateClearMethod($schema, $field)}() {
+#if (${this.isUnboxedJavaTypeNullable($field.schema())})
+      ${this.mangle($field.name(), $schema.isError())} = null;
+#end
+#if (${this.hasBuilder($field.schema())})
+      ${this.mangle($field.name(), $schema.isError())}Builder = null;
+#end
+      fieldSetFlags()[$field.pos()] = false;
+      return this;
+    }
+
+#end
+    @Override
+    @SuppressWarnings("unchecked")
+    public ${this.mangleTypeIdentifier($schema.getName())} build() {
+      try {
+        ${this.mangleTypeIdentifier($schema.getName())} record = new ${this.mangleTypeIdentifier($schema.getName())}(#if ($schema.isError())getValue(), getCause()#end);
+#foreach ($field in $schema.getFields())
+#if (${this.hasBuilder($field.schema())})
+        if (${this.mangle($field.name(), $schema.isError())}Builder != null) {
+          try {
+            record.${this.mangle($field.name(), $schema.isError())} = this.${this.mangle($field.name(), $schema.isError())}Builder.build();
+          } catch (org.apache.avro.AvroMissingFieldException e) {
+            e.addParentField(record.getSchema().getField("${this.mangle($field.name(), $schema.isError())}"));
+            throw e;
+          }
+        } else {
+          record.${this.mangle($field.name(), $schema.isError())} = fieldSetFlags()[$field.pos()] ? this.${this.mangle($field.name(), $schema.isError())} : #if(${this.javaType($field.schema())} != "java.lang.Object")(${this.javaType($field.schema())})#{end} defaultValue(fields()[$field.pos()]);
+        }
+#else
+        record.${this.mangle($field.name(), $schema.isError())} = fieldSetFlags()[$field.pos()] ? this.${this.mangle($field.name(), $schema.isError())} : #if(${this.javaType($field.schema())} != "java.lang.Object")(${this.javaType($field.schema())})#{end} defaultValue(fields()[$field.pos()]);
+#end
+#end
+        return record;
+      } catch (org.apache.avro.AvroMissingFieldException e) {
+        throw e;
+      } catch (java.lang.Exception e) {
+        throw new org.apache.avro.AvroRuntimeException(e);
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private static final org.apache.avro.io.DatumWriter<${this.mangleTypeIdentifier($schema.getName())}>
+    WRITER$ = (org.apache.avro.io.DatumWriter<${this.mangleTypeIdentifier($schema.getName())}>)MODEL$.createDatumWriter(SCHEMA$);
+
+  @Override public void writeExternal(java.io.ObjectOutput out)
+    throws java.io.IOException {
+    WRITER$.write(this, SpecificData.getEncoder(out));
+  }
+
+  @SuppressWarnings("unchecked")
+  private static final org.apache.avro.io.DatumReader<${this.mangleTypeIdentifier($schema.getName())}>
+    READER$ = (org.apache.avro.io.DatumReader<${this.mangleTypeIdentifier($schema.getName())}>)MODEL$.createDatumReader(SCHEMA$);
+
+  @Override public void readExternal(java.io.ObjectInput in)
+    throws java.io.IOException {
+    READER$.read(this, SpecificData.getDecoder(in));
+  }
+
+#if ($this.isCustomCodable($schema))
+  @Override protected boolean hasCustomCoders() { return true; }
+
+  @Override public void customEncode(org.apache.avro.io.Encoder out)
+    throws java.io.IOException
+  {
+#set ($nv = 0)## Counter to ensure unique var-names
+#set ($maxnv = 0)## Holds high-water mark during recursion
+#foreach ($field in $schema.getFields())
+#set ($n = $this.mangle($field.name(), $schema.isError()))
+#set ($s = $field.schema())
+#encodeVar(0 "this.${n}" $s)
+
+#set ($nv = $maxnv)
+#end
+  }
+
+  @Override public void customDecode(org.apache.avro.io.ResolvingDecoder in)
+    throws java.io.IOException
+  {
+    org.apache.avro.Schema.Field[] fieldOrder = in.readFieldOrderIfDiff();
+    if (fieldOrder == null) {
+## Common case: order of fields hasn't changed, so read them in a
+## fixed order according to reader's schema
+#set ($nv = 0)## Counter to ensure unique var-names
+#set ($maxnv = 0)## Holds high-water mark during recursion
+#foreach ($field in $schema.getFields())
+#set ($n = $this.mangle($field.name(), $schema.isError()))
+#set ($s = $field.schema())
+#set ($rs = "SCHEMA$.getField(""${n}"").schema()")
+#decodeVar(2 "this.${n}" $s $rs)
+
+#set ($nv = $maxnv)
+#end
+    } else {
+      for (int i = 0; i < $schema.getFields().size(); i++) {
+        switch (fieldOrder[i].pos()) {
+#set ($fieldno = 0)
+#set ($nv = 0)## Counter to ensure unique var-names
+#set ($maxnv = 0)## Holds high-water mark during recursion
+#foreach ($field in $schema.getFields())
+        case $fieldno:
+#set ($n = $this.mangle($field.name(), $schema.isError()))
+#set ($s = $field.schema())
+#set ($rs = "SCHEMA$.getField(""${n}"").schema()")
+#decodeVar(6 "this.${n}" $s $rs)
+          break;
+
+#set ($nv = $maxnv)
+#set ($fieldno = $fieldno + 1)
+#end
+        default:
+          throw new java.io.IOException("Corrupt ResolvingDecoder.");
+        }
+      }
+    }
+  }
+#end
+}
+
+#macro( encodeVar $indent $var $s )
+#set ($I = $this.indent($indent))
+##### Compound types (array, map, and union) require calls
+##### that will recurse back into this encodeVar macro:
+#if ($s.Type.Name.equals("array"))
+#encodeArray($indent $var $s)
+#elseif ($s.Type.Name.equals("map"))
+#encodeMap($indent $var $s)
+#elseif ($s.Type.Name.equals("union"))
+#encodeUnion($indent $var $s)
+##### Use the generated "encode" method as fast way to write
+##### (specific) record types:
+#elseif ($s.Type.Name.equals("record"))
+$I    ${var}.customEncode(out);
+##### For rest of cases, generate calls out.writeXYZ:
+#elseif ($s.Type.Name.equals("null"))
+$I    out.writeNull();
+#elseif ($s.Type.Name.equals("boolean"))
+$I    out.writeBoolean(${var});
+#elseif ($s.Type.Name.equals("int"))
+$I    out.writeInt(${var});
+#elseif ($s.Type.Name.equals("long"))
+$I    out.writeLong(${var});
+#elseif ($s.Type.Name.equals("float"))
+$I    out.writeFloat(${var});
+#elseif ($s.Type.Name.equals("double"))
+$I    out.writeDouble(${var});
+#elseif ($s.Type.Name.equals("string"))
+#if ($this.isStringable($s))
+$I    out.writeString(${var}.toString());
+#else
+$I    out.writeString(${var});
+#end
+#elseif ($s.Type.Name.equals("bytes"))
+$I    out.writeBytes(${var});
+#elseif ($s.Type.Name.equals("fixed"))
+$I    out.writeFixed(${var}.bytes(), 0, ${s.FixedSize});
+#elseif ($s.Type.Name.equals("enum"))
+$I    out.writeEnum(${var}.ordinal());
+#else
+## TODO -- singal a code-gen-time error
+#end
+#end
+
+#macro( encodeArray $indent $var $s )
+#set ($I = $this.indent($indent))
+#set ($et = $this.javaType($s.ElementType))
+$I    long size${nv} = ${var}.size();
+$I    out.writeArrayStart();
+$I    out.setItemCount(size${nv});
+$I    long actualSize${nv} = 0;
+$I    for ($et e${nv}: ${var}) {
+$I      actualSize${nv}++;
+$I      out.startItem();
+#set ($var = "e${nv}")
+#set ($nv = $nv + 1)
+#set ($maxnv = $nv)
+#set ($indent = $indent + 2)
+#encodeVar($indent $var $s.ElementType)
+#set ($nv = $nv - 1)
+#set ($indent = $indent - 2)
+#set ($I = $this.indent($indent))
+$I    }
+$I    out.writeArrayEnd();
+$I    if (actualSize${nv} != size${nv})
+$I      throw new java.util.ConcurrentModificationException("Array-size written was " + size${nv} + ", but element count was " + actualSize${nv} + ".");
+#end
+
+#macro( encodeMap $indent $var $s )
+#set ($I = $this.indent($indent))
+#set ($kt = $this.getStringType($s))
+#set ($vt = $this.javaType($s.ValueType))
+$I    long size${nv} = ${var}.size();
+$I    out.writeMapStart();
+$I    out.setItemCount(size${nv});
+$I    long actualSize${nv} = 0;
+$I    for (java.util.Map.Entry<$kt, $vt> e${nv}: ${var}.entrySet()) {
+$I      actualSize${nv}++;
+$I      out.startItem();
+#if ($this.isStringable($s))
+$I      out.writeString(e${nv}.getKey().toString());
+#else
+$I      out.writeString(e${nv}.getKey());
+#end
+$I      $vt v${nv} = e${nv}.getValue();
+#set ($var = "v${nv}")
+#set ($nv = $nv + 1)
+#set ($maxnv = $nv)
+#set ($indent = $indent + 2)
+#encodeVar($indent $var $s.ValueType)
+#set ($nv = $nv - 1)
+#set ($indent = $indent - 2)
+#set ($I = $this.indent($indent))
+$I    }
+$I    out.writeMapEnd();
+$I    if (actualSize${nv} != size${nv})
+      throw new java.util.ConcurrentModificationException("Map-size written was " + size${nv} + ", but element count was " + actualSize${nv} + ".");
+#end
+
+#macro( encodeUnion $indent $var $s )
+#set ($I = $this.indent($indent))
+#set ($et = $this.javaType($s.Types.get($this.getNonNullIndex($s))))
+$I    if (${var} == null) {
+$I      out.writeIndex(#if($this.getNonNullIndex($s)==0)1#{else}0#end);
+$I      out.writeNull();
+$I    } else {
+$I      out.writeIndex(${this.getNonNullIndex($s)});
+#set ($indent = $indent + 2)
+#encodeVar($indent $var $s.Types.get($this.getNonNullIndex($s)))
+#set ($indent = $indent - 2)
+#set ($I = $this.indent($indent))
+$I    }
+#end
+
+
+#macro( decodeVar $indent $var $s $rs )
+#set ($I = $this.indent($indent))
+##### Compound types (array, map, and union) require calls
+##### that will recurse back into this decodeVar macro:
+#if ($s.Type.Name.equals("array"))
+#decodeArray($indent $var $s $rs)
+#elseif ($s.Type.Name.equals("map"))
+#decodeMap($indent $var $s $rs)
+#elseif ($s.Type.Name.equals("union"))
+#decodeUnion($indent $var $s $rs)
+##### Use the generated "decode" method as fast way to write
+##### (specific) record types:
+#elseif ($s.Type.Name.equals("record"))
+$I    if (${var} == null) {
+$I      ${var} = new ${this.javaType($s)}();
+$I    }
+$I    ${var}.customDecode(in);
+##### For rest of cases, generate calls in.readXYZ:
+#elseif ($s.Type.Name.equals("null"))
+$I    in.readNull();
+#elseif ($s.Type.Name.equals("boolean"))
+$I    $var = in.readBoolean();
+#elseif ($s.Type.Name.equals("int"))
+$I    $var = in.readInt();
+#elseif ($s.Type.Name.equals("long"))
+$I    $var = in.readLong();
+#elseif ($s.Type.Name.equals("float"))
+$I    $var = in.readFloat();
+#elseif ($s.Type.Name.equals("double"))
+$I    $var = in.readDouble();
+#elseif ($s.Type.Name.equals("string"))
+#decodeString( "$I" $var $s )
+#elseif ($s.Type.Name.equals("bytes"))
+$I    $var = in.readBytes(${var});
+#elseif ($s.Type.Name.equals("fixed"))
+$I    if (${var} == null) {
+$I      ${var} = new ${this.javaType($s)}();
+$I    }
+$I    in.readFixed(${var}.bytes(), 0, ${s.FixedSize});
+#elseif ($s.Type.Name.equals("enum"))
+$I    $var = ${this.javaType($s)}.values()[in.readEnum()];
+#else
+## TODO -- singal a code-gen-time error
+#end
+#end
+
+#macro( decodeString $II $var $s )
+#set ($st = ${this.getStringType($s)})
+#if ($this.isStringable($s))
+#if ($st.equals("java.net.URI"))
+$II    try {
+$II      ${var} = new ${st}(in.readString());
+$II    } catch (java.net.URISyntaxException e) {
+$II      throw new java.io.IOException(e.getMessage());
+$II    }
+#elseif ($st.equals("java.net.URL"))
+$II    try {
+$II      ${var} = new ${st}(in.readString());
+$II    } catch (java.net.MalformedURLException e) {
+$II      throw new java.io.IOException(e.getMessage());
+$II    }
+#else
+$II    ${var} = new ${st}(in.readString());
+#end
+#elseif ($st.equals("java.lang.String"))
+$II    $var = in.readString();
+#elseif ($st.equals("org.apache.avro.util.Utf8"))
+$II    $var = in.readString(${var});
+#else
+$II    $var = in.readString(${var} instanceof Utf8 ? (Utf8)${var} : null);
+#end
+#end
+
+#macro( decodeArray $indent $var $s $rs )
+#set ($I = $this.indent($indent))
+#set ($t = $this.javaType($s))
+#set ($et = $this.javaType($s.ElementType))
+#set ($gat = "SpecificData.Array<${et}>")
+$I    long size${nv} = in.readArrayStart();
+## Need fresh variable name due to limitation of macro system
+$I    $t a${nv} = ${var};
+$I    if (a${nv} == null) {
+$I      a${nv} = new ${gat}((int)size${nv}, ${rs});
+$I      $var = a${nv};
+$I    } else a${nv}.clear();
+$I    $gat ga${nv} = (a${nv} instanceof SpecificData.Array ? (${gat})a${nv} : null);
+$I    for ( ; 0 < size${nv}; size${nv} = in.arrayNext()) {
+$I      for ( ; size${nv} != 0; size${nv}--) {
+$I        $et e${nv} = (ga${nv} != null ? ga${nv}.peek() : null);
+#set ($var = "e${nv}")
+#set ($nv = $nv + 1)
+#set ($maxnv = $nv)
+#set ($indent = $indent + 4)
+#decodeVar($indent $var $s.ElementType "${rs}.getElementType()")
+#set ($nv = $nv - 1)
+#set ($indent = $indent - 4)
+#set ($I = $this.indent($indent))
+$I        a${nv}.add(e${nv});
+$I      }
+$I    }
+#end
+
+#macro( decodeMap $indent $var $s $rs )
+#set ($I = $this.indent($indent))
+#set ($t = $this.javaType($s))
+#set ($kt = $this.getStringType($s))
+#set ($vt = $this.javaType($s.ValueType))
+$I    long size${nv} = in.readMapStart();
+$I    $t m${nv} = ${var}; // Need fresh name due to limitation of macro system
+$I    if (m${nv} == null) {
+$I      m${nv} = new java.util.HashMap<${kt},${vt}>((int)size${nv});
+$I      $var = m${nv};
+$I    } else m${nv}.clear();
+$I    for ( ; 0 < size${nv}; size${nv} = in.mapNext()) {
+$I      for ( ; size${nv} != 0; size${nv}--) {
+$I        $kt k${nv} = null;
+#decodeString( "$I    " "k${nv}" $s )
+$I        $vt v${nv} = null;
+#set ($var = "v${nv}")
+#set ($nv = $nv + 1)
+#set ($maxnv = $nv)
+#set ($indent = $indent + 4)
+#decodeVar($indent $var $s.ValueType "${rs}.getValueType()")
+#set ($nv = $nv - 1)
+#set ($indent = $indent - 4)
+#set ($I = $this.indent($indent))
+$I        m${nv}.put(k${nv}, v${nv});
+$I      }
+$I    }
+#end
+
+#macro( decodeUnion $indent $var $s $rs )
+#set ($I = $this.indent($indent))
+#set ($et = $this.javaType($s.Types.get($this.getNonNullIndex($s))))
+#set ($si = $this.getNonNullIndex($s))
+$I    if (in.readIndex() != ${si}) {
+$I      in.readNull();
+$I      ${var} = null;
+$I    } else {
+#set ($indent = $indent + 2)
+#decodeVar($indent $var $s.Types.get($si) "${rs}.getTypes().get(${si})")
+#set ($indent = $indent - 2)
+#set ($I = $this.indent($indent))
+$I    }
+#end
\ No newline at end of file
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/group/GroupsBusinessEventSerializer.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/AbstractBusinessEventSerializer.java
similarity index 54%
copy from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/group/GroupsBusinessEventSerializer.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/AbstractBusinessEventSerializer.java
index b2b4a18c2..bc2c9abab 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/group/GroupsBusinessEventSerializer.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/AbstractBusinessEventSerializer.java
@@ -16,42 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.fineract.infrastructure.event.external.service.serialization.serializer.group;
+package org.apache.fineract.infrastructure.event.external.service.serialization.serializer;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import lombok.RequiredArgsConstructor;
-import org.apache.avro.generic.GenericContainer;
-import org.apache.fineract.avro.generic.v1.CommandProcessingResultV1;
+import lombok.Setter;
+import org.apache.fineract.avro.generator.ByteBufferSerializable;
 import org.apache.fineract.infrastructure.event.business.domain.BusinessEvent;
-import org.apache.fineract.infrastructure.event.business.domain.group.GroupsBusinessEvent;
-import org.apache.fineract.infrastructure.event.external.service.serialization.mapper.generic.CommandProcessingResultMapper;
-import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.BusinessEventSerializer;
 import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
-import org.springframework.stereotype.Component;
+import org.springframework.beans.factory.annotation.Autowired;
 
-@Component
 @RequiredArgsConstructor
-public class GroupsBusinessEventSerializer implements BusinessEventSerializer {
+public abstract class AbstractBusinessEventSerializer implements BusinessEventSerializer {
 
-    private final CommandProcessingResultMapper mapper;
-    private final ByteBufferConverter byteBufferConverter;
-
-    @Override
-    public <T> boolean canSerialize(BusinessEvent<T> event) {
-        return event instanceof GroupsBusinessEvent;
-    }
+    @Setter(onMethod = @__({ @Autowired }))
+    private ByteBufferConverter byteBufferConverter;
 
     @Override
     public <T> byte[] serialize(BusinessEvent<T> rawEvent) throws IOException {
-        GroupsBusinessEvent event = (GroupsBusinessEvent) rawEvent;
-        CommandProcessingResultV1 avroDto = mapper.map(event.get());
+        ByteBufferSerializable avroDto = toAvroDTO(rawEvent);
         ByteBuffer buffer = avroDto.toByteBuffer();
         return byteBufferConverter.convert(buffer);
     }
 
-    @Override
-    public Class<? extends GenericContainer> getSupportedSchema() {
-        return CommandProcessingResultV1.class;
-    }
+    protected abstract <T> ByteBufferSerializable toAvroDTO(BusinessEvent<T> rawEvent);
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/client/ClientBusinessEventSerializer.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/client/ClientBusinessEventSerializer.java
index d53ae4452..1b9a98b05 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/client/ClientBusinessEventSerializer.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/client/ClientBusinessEventSerializer.java
@@ -18,27 +18,24 @@
  */
 package org.apache.fineract.infrastructure.event.external.service.serialization.serializer.client;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import lombok.RequiredArgsConstructor;
 import org.apache.avro.generic.GenericContainer;
 import org.apache.fineract.avro.client.v1.ClientDataV1;
+import org.apache.fineract.avro.generator.ByteBufferSerializable;
 import org.apache.fineract.infrastructure.event.business.domain.BusinessEvent;
 import org.apache.fineract.infrastructure.event.business.domain.client.ClientBusinessEvent;
 import org.apache.fineract.infrastructure.event.external.service.serialization.mapper.client.ClientDataMapper;
-import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.BusinessEventSerializer;
-import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
+import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.AbstractBusinessEventSerializer;
 import org.apache.fineract.portfolio.client.data.ClientData;
 import org.apache.fineract.portfolio.client.service.ClientReadPlatformService;
 import org.springframework.stereotype.Component;
 
 @Component
 @RequiredArgsConstructor
-public class ClientBusinessEventSerializer implements BusinessEventSerializer {
+public class ClientBusinessEventSerializer extends AbstractBusinessEventSerializer {
 
     private final ClientReadPlatformService service;
     private final ClientDataMapper mapper;
-    private final ByteBufferConverter byteBufferConverter;
 
     @Override
     public <T> boolean canSerialize(BusinessEvent<T> event) {
@@ -46,12 +43,10 @@ public class ClientBusinessEventSerializer implements BusinessEventSerializer {
     }
 
     @Override
-    public <T> byte[] serialize(BusinessEvent<T> rawEvent) throws IOException {
+    protected <T> ByteBufferSerializable toAvroDTO(BusinessEvent<T> rawEvent) {
         ClientBusinessEvent event = (ClientBusinessEvent) rawEvent;
         ClientData data = service.retrieveOne(event.get().getId());
-        ClientDataV1 avroDto = mapper.map(data);
-        ByteBuffer buffer = avroDto.toByteBuffer();
-        return byteBufferConverter.convert(buffer);
+        return mapper.map(data);
     }
 
     @Override
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/fixeddeposit/FixedDepositAccountBusinessEventSerializer.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/fixeddeposit/FixedDepositAccountBusinessEventSerializer.java
index 7abfb69bb..04ec6c132 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/fixeddeposit/FixedDepositAccountBusinessEventSerializer.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/fixeddeposit/FixedDepositAccountBusinessEventSerializer.java
@@ -18,16 +18,14 @@
  */
 package org.apache.fineract.infrastructure.event.external.service.serialization.serializer.fixeddeposit;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import lombok.RequiredArgsConstructor;
 import org.apache.avro.generic.GenericContainer;
 import org.apache.fineract.avro.fixeddeposit.v1.FixedDepositAccountDataV1;
+import org.apache.fineract.avro.generator.ByteBufferSerializable;
 import org.apache.fineract.infrastructure.event.business.domain.BusinessEvent;
 import org.apache.fineract.infrastructure.event.business.domain.deposit.FixedDepositAccountBusinessEvent;
 import org.apache.fineract.infrastructure.event.external.service.serialization.mapper.fixeddeposit.FixedDepositAccountDataMapper;
-import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.BusinessEventSerializer;
-import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
+import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.AbstractBusinessEventSerializer;
 import org.apache.fineract.portfolio.savings.DepositAccountType;
 import org.apache.fineract.portfolio.savings.data.FixedDepositAccountData;
 import org.apache.fineract.portfolio.savings.service.DepositAccountReadPlatformService;
@@ -35,11 +33,10 @@ import org.springframework.stereotype.Component;
 
 @Component
 @RequiredArgsConstructor
-public class FixedDepositAccountBusinessEventSerializer implements BusinessEventSerializer {
+public class FixedDepositAccountBusinessEventSerializer extends AbstractBusinessEventSerializer {
 
     private final DepositAccountReadPlatformService service;
     private final FixedDepositAccountDataMapper mapper;
-    private final ByteBufferConverter byteBufferConverter;
 
     @Override
     public <T> boolean canSerialize(BusinessEvent<T> event) {
@@ -47,12 +44,10 @@ public class FixedDepositAccountBusinessEventSerializer implements BusinessEvent
     }
 
     @Override
-    public <T> byte[] serialize(BusinessEvent<T> rawEvent) throws IOException {
+    protected <T> ByteBufferSerializable toAvroDTO(BusinessEvent<T> rawEvent) {
         FixedDepositAccountBusinessEvent event = (FixedDepositAccountBusinessEvent) rawEvent;
         FixedDepositAccountData data = (FixedDepositAccountData) service.retrieveOne(DepositAccountType.FIXED_DEPOSIT, event.get().getId());
-        FixedDepositAccountDataV1 avroDto = mapper.map(data);
-        ByteBuffer buffer = avroDto.toByteBuffer();
-        return byteBufferConverter.convert(buffer);
+        return mapper.map(data);
     }
 
     @Override
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/group/GroupsBusinessEventSerializer.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/group/GroupsBusinessEventSerializer.java
index b2b4a18c2..a28c4407d 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/group/GroupsBusinessEventSerializer.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/group/GroupsBusinessEventSerializer.java
@@ -18,24 +18,21 @@
  */
 package org.apache.fineract.infrastructure.event.external.service.serialization.serializer.group;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import lombok.RequiredArgsConstructor;
 import org.apache.avro.generic.GenericContainer;
+import org.apache.fineract.avro.generator.ByteBufferSerializable;
 import org.apache.fineract.avro.generic.v1.CommandProcessingResultV1;
 import org.apache.fineract.infrastructure.event.business.domain.BusinessEvent;
 import org.apache.fineract.infrastructure.event.business.domain.group.GroupsBusinessEvent;
 import org.apache.fineract.infrastructure.event.external.service.serialization.mapper.generic.CommandProcessingResultMapper;
-import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.BusinessEventSerializer;
-import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
+import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.AbstractBusinessEventSerializer;
 import org.springframework.stereotype.Component;
 
 @Component
 @RequiredArgsConstructor
-public class GroupsBusinessEventSerializer implements BusinessEventSerializer {
+public class GroupsBusinessEventSerializer extends AbstractBusinessEventSerializer {
 
     private final CommandProcessingResultMapper mapper;
-    private final ByteBufferConverter byteBufferConverter;
 
     @Override
     public <T> boolean canSerialize(BusinessEvent<T> event) {
@@ -43,11 +40,9 @@ public class GroupsBusinessEventSerializer implements BusinessEventSerializer {
     }
 
     @Override
-    public <T> byte[] serialize(BusinessEvent<T> rawEvent) throws IOException {
+    protected <T> ByteBufferSerializable toAvroDTO(BusinessEvent<T> rawEvent) {
         GroupsBusinessEvent event = (GroupsBusinessEvent) rawEvent;
-        CommandProcessingResultV1 avroDto = mapper.map(event.get());
-        ByteBuffer buffer = avroDto.toByteBuffer();
-        return byteBufferConverter.convert(buffer);
+        return mapper.map(event.get());
     }
 
     @Override
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/loan/LoanAdjustTransactionBusinessEventSerializer.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/loan/LoanAdjustTransactionBusinessEventSerializer.java
index db7ef7d20..4183a90f7 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/loan/LoanAdjustTransactionBusinessEventSerializer.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/loan/LoanAdjustTransactionBusinessEventSerializer.java
@@ -18,17 +18,15 @@
  */
 package org.apache.fineract.infrastructure.event.external.service.serialization.serializer.loan;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import lombok.RequiredArgsConstructor;
 import org.apache.avro.generic.GenericContainer;
+import org.apache.fineract.avro.generator.ByteBufferSerializable;
 import org.apache.fineract.avro.loan.v1.LoanTransactionAdjustmentDataV1;
 import org.apache.fineract.avro.loan.v1.LoanTransactionDataV1;
 import org.apache.fineract.infrastructure.event.business.domain.BusinessEvent;
 import org.apache.fineract.infrastructure.event.business.domain.loan.LoanAdjustTransactionBusinessEvent;
 import org.apache.fineract.infrastructure.event.external.service.serialization.mapper.loan.LoanTransactionDataMapper;
-import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.BusinessEventSerializer;
-import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
+import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.AbstractBusinessEventSerializer;
 import org.apache.fineract.portfolio.loanaccount.data.LoanTransactionData;
 import org.apache.fineract.portfolio.loanaccount.domain.LoanTransaction;
 import org.apache.fineract.portfolio.loanaccount.service.LoanReadPlatformService;
@@ -36,11 +34,10 @@ import org.springframework.stereotype.Component;
 
 @Component
 @RequiredArgsConstructor
-public class LoanAdjustTransactionBusinessEventSerializer implements BusinessEventSerializer {
+public class LoanAdjustTransactionBusinessEventSerializer extends AbstractBusinessEventSerializer {
 
     private final LoanReadPlatformService service;
     private final LoanTransactionDataMapper mapper;
-    private final ByteBufferConverter byteBufferConverter;
 
     @Override
     public <T> boolean canSerialize(BusinessEvent<T> event) {
@@ -48,7 +45,7 @@ public class LoanAdjustTransactionBusinessEventSerializer implements BusinessEve
     }
 
     @Override
-    public <T> byte[] serialize(BusinessEvent<T> rawEvent) throws IOException {
+    protected <T> ByteBufferSerializable toAvroDTO(BusinessEvent<T> rawEvent) {
         LoanAdjustTransactionBusinessEvent event = (LoanAdjustTransactionBusinessEvent) rawEvent;
         LoanTransaction transactionToAdjust = event.get().getTransactionToAdjust();
         LoanTransactionData transactionToAdjustData = service.retrieveLoanTransaction(transactionToAdjust.getLoan().getId(),
@@ -63,10 +60,7 @@ public class LoanAdjustTransactionBusinessEventSerializer implements BusinessEve
             newTransactionDetailAvroDto = mapper.map(newTransactionDetailData);
 
         }
-        LoanTransactionAdjustmentDataV1 avroDto = new LoanTransactionAdjustmentDataV1(transactionToAdjustAvroDto,
-                newTransactionDetailAvroDto);
-        ByteBuffer buffer = avroDto.toByteBuffer();
-        return byteBufferConverter.convert(buffer);
+        return new LoanTransactionAdjustmentDataV1(transactionToAdjustAvroDto, newTransactionDetailAvroDto);
     }
 
     @Override
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/loan/LoanBusinessEventSerializer.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/loan/LoanBusinessEventSerializer.java
index 8da6be97d..43ad6eacd 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/loan/LoanBusinessEventSerializer.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/loan/LoanBusinessEventSerializer.java
@@ -18,27 +18,24 @@
  */
 package org.apache.fineract.infrastructure.event.external.service.serialization.serializer.loan;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import lombok.RequiredArgsConstructor;
 import org.apache.avro.generic.GenericContainer;
+import org.apache.fineract.avro.generator.ByteBufferSerializable;
 import org.apache.fineract.avro.loan.v1.LoanAccountDataV1;
 import org.apache.fineract.infrastructure.event.business.domain.BusinessEvent;
 import org.apache.fineract.infrastructure.event.business.domain.loan.LoanBusinessEvent;
 import org.apache.fineract.infrastructure.event.external.service.serialization.mapper.loan.LoanAccountDataMapper;
-import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.BusinessEventSerializer;
-import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
+import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.AbstractBusinessEventSerializer;
 import org.apache.fineract.portfolio.loanaccount.data.LoanAccountData;
 import org.apache.fineract.portfolio.loanaccount.service.LoanReadPlatformService;
 import org.springframework.stereotype.Component;
 
 @Component
 @RequiredArgsConstructor
-public class LoanBusinessEventSerializer implements BusinessEventSerializer {
+public class LoanBusinessEventSerializer extends AbstractBusinessEventSerializer {
 
     private final LoanReadPlatformService service;
     private final LoanAccountDataMapper mapper;
-    private final ByteBufferConverter byteBufferConverter;
 
     @Override
     public <T> boolean canSerialize(BusinessEvent<T> event) {
@@ -46,12 +43,10 @@ public class LoanBusinessEventSerializer implements BusinessEventSerializer {
     }
 
     @Override
-    public <T> byte[] serialize(BusinessEvent<T> rawEvent) throws IOException {
+    protected <T> ByteBufferSerializable toAvroDTO(BusinessEvent<T> rawEvent) {
         LoanBusinessEvent event = (LoanBusinessEvent) rawEvent;
         LoanAccountData data = service.retrieveOne(event.get().getId());
-        LoanAccountDataV1 avroDto = mapper.map(data);
-        ByteBuffer buffer = avroDto.toByteBuffer();
-        return byteBufferConverter.convert(buffer);
+        return mapper.map(data);
     }
 
     @Override
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/loan/LoanChargeBusinessEventSerializer.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/loan/LoanChargeBusinessEventSerializer.java
index 8a1bf90b4..065e056e1 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/loan/LoanChargeBusinessEventSerializer.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/loan/LoanChargeBusinessEventSerializer.java
@@ -18,27 +18,24 @@
  */
 package org.apache.fineract.infrastructure.event.external.service.serialization.serializer.loan;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import lombok.RequiredArgsConstructor;
 import org.apache.avro.generic.GenericContainer;
+import org.apache.fineract.avro.generator.ByteBufferSerializable;
 import org.apache.fineract.avro.loan.v1.LoanChargeDataV1;
 import org.apache.fineract.infrastructure.event.business.domain.BusinessEvent;
 import org.apache.fineract.infrastructure.event.business.domain.loan.charge.LoanChargeBusinessEvent;
 import org.apache.fineract.infrastructure.event.external.service.serialization.mapper.loan.LoanChargeDataMapper;
-import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.BusinessEventSerializer;
-import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
+import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.AbstractBusinessEventSerializer;
 import org.apache.fineract.portfolio.loanaccount.data.LoanChargeData;
 import org.apache.fineract.portfolio.loanaccount.service.LoanChargeReadPlatformService;
 import org.springframework.stereotype.Component;
 
 @Component
 @RequiredArgsConstructor
-public class LoanChargeBusinessEventSerializer implements BusinessEventSerializer {
+public class LoanChargeBusinessEventSerializer extends AbstractBusinessEventSerializer {
 
     private final LoanChargeReadPlatformService service;
     private final LoanChargeDataMapper mapper;
-    private final ByteBufferConverter byteBufferConverter;
 
     @Override
     public <T> boolean canSerialize(BusinessEvent<T> event) {
@@ -46,12 +43,10 @@ public class LoanChargeBusinessEventSerializer implements BusinessEventSerialize
     }
 
     @Override
-    public <T> byte[] serialize(BusinessEvent<T> rawEvent) throws IOException {
+    protected <T> ByteBufferSerializable toAvroDTO(BusinessEvent<T> rawEvent) {
         LoanChargeBusinessEvent event = (LoanChargeBusinessEvent) rawEvent;
         LoanChargeData data = service.retrieveLoanChargeDetails(event.get().getId(), event.get().getLoan().getId());
-        LoanChargeDataV1 avroDto = mapper.map(data);
-        ByteBuffer buffer = avroDto.toByteBuffer();
-        return byteBufferConverter.convert(buffer);
+        return mapper.map(data);
     }
 
     @Override
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/loan/LoanChargeDeletedBusinessEventSerializer.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/loan/LoanChargeDeletedBusinessEventSerializer.java
index 1eafe0a64..330d8b3fc 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/loan/LoanChargeDeletedBusinessEventSerializer.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/loan/LoanChargeDeletedBusinessEventSerializer.java
@@ -18,14 +18,13 @@
  */
 package org.apache.fineract.infrastructure.event.external.service.serialization.serializer.loan;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import lombok.RequiredArgsConstructor;
 import org.apache.avro.generic.GenericContainer;
+import org.apache.fineract.avro.generator.ByteBufferSerializable;
 import org.apache.fineract.avro.loan.v1.LoanChargeDeletedV1;
 import org.apache.fineract.infrastructure.event.business.domain.BusinessEvent;
 import org.apache.fineract.infrastructure.event.business.domain.loan.charge.LoanDeleteChargeBusinessEvent;
-import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.BusinessEventSerializer;
+import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.AbstractBusinessEventSerializer;
 import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
 import org.apache.fineract.portfolio.loanaccount.domain.LoanCharge;
 import org.springframework.core.Ordered;
@@ -35,7 +34,7 @@ import org.springframework.stereotype.Component;
 @Component
 @RequiredArgsConstructor
 @Order(Ordered.LOWEST_PRECEDENCE - 1)
-public class LoanChargeDeletedBusinessEventSerializer implements BusinessEventSerializer {
+public class LoanChargeDeletedBusinessEventSerializer extends AbstractBusinessEventSerializer {
 
     private final ByteBufferConverter byteBufferConverter;
 
@@ -45,14 +44,12 @@ public class LoanChargeDeletedBusinessEventSerializer implements BusinessEventSe
     }
 
     @Override
-    public <T> byte[] serialize(BusinessEvent<T> rawEvent) throws IOException {
+    protected <T> ByteBufferSerializable toAvroDTO(BusinessEvent<T> rawEvent) {
         LoanDeleteChargeBusinessEvent event = (LoanDeleteChargeBusinessEvent) rawEvent;
         LoanCharge loanCharge = event.get();
         Long id = loanCharge.getId();
         Long chargeId = loanCharge.getCharge().getId();
-        LoanChargeDeletedV1 avroDto = new LoanChargeDeletedV1(id, chargeId);
-        ByteBuffer buffer = avroDto.toByteBuffer();
-        return byteBufferConverter.convert(buffer);
+        return new LoanChargeDeletedV1(id, chargeId);
     }
 
     @Override
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/loan/LoanProductBusinessEventSerializer.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/loan/LoanProductBusinessEventSerializer.java
index 645ec907d..70229d723 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/loan/LoanProductBusinessEventSerializer.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/loan/LoanProductBusinessEventSerializer.java
@@ -18,27 +18,24 @@
  */
 package org.apache.fineract.infrastructure.event.external.service.serialization.serializer.loan;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import lombok.RequiredArgsConstructor;
 import org.apache.avro.generic.GenericContainer;
+import org.apache.fineract.avro.generator.ByteBufferSerializable;
 import org.apache.fineract.avro.loan.v1.LoanProductDataV1;
 import org.apache.fineract.infrastructure.event.business.domain.BusinessEvent;
 import org.apache.fineract.infrastructure.event.business.domain.loan.product.LoanProductBusinessEvent;
 import org.apache.fineract.infrastructure.event.external.service.serialization.mapper.loan.LoanProductDataMapper;
-import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.BusinessEventSerializer;
-import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
+import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.AbstractBusinessEventSerializer;
 import org.apache.fineract.portfolio.loanproduct.data.LoanProductData;
 import org.apache.fineract.portfolio.loanproduct.service.LoanProductReadPlatformService;
 import org.springframework.stereotype.Component;
 
 @Component
 @RequiredArgsConstructor
-public class LoanProductBusinessEventSerializer implements BusinessEventSerializer {
+public class LoanProductBusinessEventSerializer extends AbstractBusinessEventSerializer {
 
     private final LoanProductReadPlatformService service;
     private final LoanProductDataMapper mapper;
-    private final ByteBufferConverter byteBufferConverter;
 
     @Override
     public <T> boolean canSerialize(BusinessEvent<T> event) {
@@ -46,12 +43,10 @@ public class LoanProductBusinessEventSerializer implements BusinessEventSerializ
     }
 
     @Override
-    public <T> byte[] serialize(BusinessEvent<T> rawEvent) throws IOException {
+    protected <T> ByteBufferSerializable toAvroDTO(BusinessEvent<T> rawEvent) {
         LoanProductBusinessEvent event = (LoanProductBusinessEvent) rawEvent;
         LoanProductData data = service.retrieveLoanProduct(event.get().getId());
-        LoanProductDataV1 avroDto = mapper.map(data);
-        ByteBuffer buffer = avroDto.toByteBuffer();
-        return byteBufferConverter.convert(buffer);
+        return mapper.map(data);
     }
 
     @Override
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/loan/LoanTransactionBusinessEventSerializer.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/loan/LoanTransactionBusinessEventSerializer.java
index 6394623fe..599565d23 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/loan/LoanTransactionBusinessEventSerializer.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/loan/LoanTransactionBusinessEventSerializer.java
@@ -18,27 +18,24 @@
  */
 package org.apache.fineract.infrastructure.event.external.service.serialization.serializer.loan;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import lombok.RequiredArgsConstructor;
 import org.apache.avro.generic.GenericContainer;
+import org.apache.fineract.avro.generator.ByteBufferSerializable;
 import org.apache.fineract.avro.loan.v1.LoanTransactionDataV1;
 import org.apache.fineract.infrastructure.event.business.domain.BusinessEvent;
 import org.apache.fineract.infrastructure.event.business.domain.loan.transaction.LoanTransactionBusinessEvent;
 import org.apache.fineract.infrastructure.event.external.service.serialization.mapper.loan.LoanTransactionDataMapper;
-import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.BusinessEventSerializer;
-import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
+import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.AbstractBusinessEventSerializer;
 import org.apache.fineract.portfolio.loanaccount.data.LoanTransactionData;
 import org.apache.fineract.portfolio.loanaccount.service.LoanReadPlatformService;
 import org.springframework.stereotype.Component;
 
 @Component
 @RequiredArgsConstructor
-public class LoanTransactionBusinessEventSerializer implements BusinessEventSerializer {
+public class LoanTransactionBusinessEventSerializer extends AbstractBusinessEventSerializer {
 
     private final LoanReadPlatformService service;
     private final LoanTransactionDataMapper loanTransactionMapper;
-    private final ByteBufferConverter byteBufferConverter;
 
     @Override
     public <T> boolean canSerialize(BusinessEvent<T> event) {
@@ -46,14 +43,12 @@ public class LoanTransactionBusinessEventSerializer implements BusinessEventSeri
     }
 
     @Override
-    public <T> byte[] serialize(BusinessEvent<T> rawEvent) throws IOException {
+    protected <T> ByteBufferSerializable toAvroDTO(BusinessEvent<T> rawEvent) {
         LoanTransactionBusinessEvent event = (LoanTransactionBusinessEvent) rawEvent;
         Long loanId = event.get().getLoan().getId();
         Long loanTransactionId = event.get().getId();
         LoanTransactionData transactionData = service.retrieveLoanTransaction(loanId, loanTransactionId);
-        LoanTransactionDataV1 avroDto = loanTransactionMapper.map(transactionData);
-        ByteBuffer buffer = avroDto.toByteBuffer();
-        return byteBufferConverter.convert(buffer);
+        return loanTransactionMapper.map(transactionData);
     }
 
     @Override
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/recurringdeposit/RecurringDepositAccountBusinessEventSerializer.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/recurringdeposit/RecurringDepositAccountBusinessEventSerializer.java
index f01ef6adc..3f291b45a 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/recurringdeposit/RecurringDepositAccountBusinessEventSerializer.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/recurringdeposit/RecurringDepositAccountBusinessEventSerializer.java
@@ -18,16 +18,14 @@
  */
 package org.apache.fineract.infrastructure.event.external.service.serialization.serializer.recurringdeposit;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import lombok.RequiredArgsConstructor;
 import org.apache.avro.generic.GenericContainer;
+import org.apache.fineract.avro.generator.ByteBufferSerializable;
 import org.apache.fineract.avro.recurringdeposit.v1.RecurringDepositAccountDataV1;
 import org.apache.fineract.infrastructure.event.business.domain.BusinessEvent;
 import org.apache.fineract.infrastructure.event.business.domain.deposit.RecurringDepositAccountBusinessEvent;
 import org.apache.fineract.infrastructure.event.external.service.serialization.mapper.recurringdeposit.RecurringDepositAccountDataMapper;
-import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.BusinessEventSerializer;
-import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
+import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.AbstractBusinessEventSerializer;
 import org.apache.fineract.portfolio.savings.DepositAccountType;
 import org.apache.fineract.portfolio.savings.data.RecurringDepositAccountData;
 import org.apache.fineract.portfolio.savings.service.DepositAccountReadPlatformService;
@@ -35,11 +33,10 @@ import org.springframework.stereotype.Component;
 
 @Component
 @RequiredArgsConstructor
-public class RecurringDepositAccountBusinessEventSerializer implements BusinessEventSerializer {
+public class RecurringDepositAccountBusinessEventSerializer extends AbstractBusinessEventSerializer {
 
     private final DepositAccountReadPlatformService service;
     private final RecurringDepositAccountDataMapper mapper;
-    private final ByteBufferConverter byteBufferConverter;
 
     @Override
     public <T> boolean canSerialize(BusinessEvent<T> event) {
@@ -47,13 +44,11 @@ public class RecurringDepositAccountBusinessEventSerializer implements BusinessE
     }
 
     @Override
-    public <T> byte[] serialize(BusinessEvent<T> rawEvent) throws IOException {
+    protected <T> ByteBufferSerializable toAvroDTO(BusinessEvent<T> rawEvent) {
         RecurringDepositAccountBusinessEvent event = (RecurringDepositAccountBusinessEvent) rawEvent;
         RecurringDepositAccountData data = (RecurringDepositAccountData) service.retrieveOne(DepositAccountType.RECURRING_DEPOSIT,
                 event.get().getId());
-        RecurringDepositAccountDataV1 avroDto = mapper.map(data);
-        ByteBuffer buffer = avroDto.toByteBuffer();
-        return byteBufferConverter.convert(buffer);
+        return mapper.map(data);
     }
 
     @Override
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/savings/SavingsAccountBusinessEventSerializer.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/savings/SavingsAccountBusinessEventSerializer.java
index 248b1d34e..eba534061 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/savings/SavingsAccountBusinessEventSerializer.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/savings/SavingsAccountBusinessEventSerializer.java
@@ -18,27 +18,24 @@
  */
 package org.apache.fineract.infrastructure.event.external.service.serialization.serializer.savings;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import lombok.RequiredArgsConstructor;
 import org.apache.avro.generic.GenericContainer;
+import org.apache.fineract.avro.generator.ByteBufferSerializable;
 import org.apache.fineract.avro.savings.v1.SavingsAccountDataV1;
 import org.apache.fineract.infrastructure.event.business.domain.BusinessEvent;
 import org.apache.fineract.infrastructure.event.business.domain.savings.SavingsAccountBusinessEvent;
 import org.apache.fineract.infrastructure.event.external.service.serialization.mapper.savings.SavingsAccountDataMapper;
-import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.BusinessEventSerializer;
-import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
+import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.AbstractBusinessEventSerializer;
 import org.apache.fineract.portfolio.savings.data.SavingsAccountData;
 import org.apache.fineract.portfolio.savings.service.SavingsAccountReadPlatformService;
 import org.springframework.stereotype.Component;
 
 @Component
 @RequiredArgsConstructor
-public class SavingsAccountBusinessEventSerializer implements BusinessEventSerializer {
+public class SavingsAccountBusinessEventSerializer extends AbstractBusinessEventSerializer {
 
     private final SavingsAccountReadPlatformService service;
     private final SavingsAccountDataMapper mapper;
-    private final ByteBufferConverter byteBufferConverter;
 
     @Override
     public <T> boolean canSerialize(BusinessEvent<T> event) {
@@ -46,12 +43,10 @@ public class SavingsAccountBusinessEventSerializer implements BusinessEventSeria
     }
 
     @Override
-    public <T> byte[] serialize(BusinessEvent<T> rawEvent) throws IOException {
+    protected <T> ByteBufferSerializable toAvroDTO(BusinessEvent<T> rawEvent) {
         SavingsAccountBusinessEvent event = (SavingsAccountBusinessEvent) rawEvent;
         SavingsAccountData data = service.retrieveOne(event.get().getId());
-        SavingsAccountDataV1 avroDto = mapper.map(data);
-        ByteBuffer buffer = avroDto.toByteBuffer();
-        return byteBufferConverter.convert(buffer);
+        return mapper.map(data);
     }
 
     @Override
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/savings/SavingsAccountTransactionBusinessEventSerializer.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/savings/SavingsAccountTransactionBusinessEventSerializer.java
index c9c3fd1c5..e07d38f55 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/savings/SavingsAccountTransactionBusinessEventSerializer.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/savings/SavingsAccountTransactionBusinessEventSerializer.java
@@ -18,16 +18,14 @@
  */
 package org.apache.fineract.infrastructure.event.external.service.serialization.serializer.savings;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import lombok.RequiredArgsConstructor;
 import org.apache.avro.generic.GenericContainer;
+import org.apache.fineract.avro.generator.ByteBufferSerializable;
 import org.apache.fineract.avro.savings.v1.SavingsAccountTransactionDataV1;
 import org.apache.fineract.infrastructure.event.business.domain.BusinessEvent;
 import org.apache.fineract.infrastructure.event.business.domain.savings.transaction.SavingsAccountTransactionBusinessEvent;
 import org.apache.fineract.infrastructure.event.external.service.serialization.mapper.savings.SavingsAccountTransactionDataMapper;
-import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.BusinessEventSerializer;
-import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
+import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.AbstractBusinessEventSerializer;
 import org.apache.fineract.portfolio.savings.data.SavingsAccountTransactionData;
 import org.apache.fineract.portfolio.savings.domain.SavingsAccountTransaction;
 import org.apache.fineract.portfolio.savings.service.SavingsAccountReadPlatformService;
@@ -35,11 +33,10 @@ import org.springframework.stereotype.Component;
 
 @Component
 @RequiredArgsConstructor
-public class SavingsAccountTransactionBusinessEventSerializer implements BusinessEventSerializer {
+public class SavingsAccountTransactionBusinessEventSerializer extends AbstractBusinessEventSerializer {
 
     private final SavingsAccountReadPlatformService service;
     private final SavingsAccountTransactionDataMapper mapper;
-    private final ByteBufferConverter byteBufferConverter;
 
     @Override
     public <T> boolean canSerialize(BusinessEvent<T> event) {
@@ -47,14 +44,12 @@ public class SavingsAccountTransactionBusinessEventSerializer implements Busines
     }
 
     @Override
-    public <T> byte[] serialize(BusinessEvent<T> rawEvent) throws IOException {
+    protected <T> ByteBufferSerializable toAvroDTO(BusinessEvent<T> rawEvent) {
         SavingsAccountTransactionBusinessEvent event = (SavingsAccountTransactionBusinessEvent) rawEvent;
         SavingsAccountTransaction tx = event.get();
         SavingsAccountTransactionData data = service.retrieveSavingsTransaction(tx.getSavingsAccount().getId(), tx.getId(),
                 tx.getSavingsAccount().depositAccountType());
-        SavingsAccountTransactionDataV1 avroDto = mapper.map(data);
-        ByteBuffer buffer = avroDto.toByteBuffer();
-        return byteBufferConverter.convert(buffer);
+        return mapper.map(data);
     }
 
     @Override
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/share/ShareAccountBusinessEventSerializer.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/share/ShareAccountBusinessEventSerializer.java
index 9d14a5905..138490131 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/share/ShareAccountBusinessEventSerializer.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/share/ShareAccountBusinessEventSerializer.java
@@ -18,27 +18,24 @@
  */
 package org.apache.fineract.infrastructure.event.external.service.serialization.serializer.share;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import lombok.RequiredArgsConstructor;
 import org.apache.avro.generic.GenericContainer;
+import org.apache.fineract.avro.generator.ByteBufferSerializable;
 import org.apache.fineract.avro.share.v1.ShareAccountDataV1;
 import org.apache.fineract.infrastructure.event.business.domain.BusinessEvent;
 import org.apache.fineract.infrastructure.event.business.domain.share.ShareAccountBusinessEvent;
 import org.apache.fineract.infrastructure.event.external.service.serialization.mapper.share.ShareAccountDataMapper;
-import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.BusinessEventSerializer;
-import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
+import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.AbstractBusinessEventSerializer;
 import org.apache.fineract.portfolio.shareaccounts.data.ShareAccountData;
 import org.apache.fineract.portfolio.shareaccounts.service.ShareAccountReadPlatformService;
 import org.springframework.stereotype.Component;
 
 @Component
 @RequiredArgsConstructor
-public class ShareAccountBusinessEventSerializer implements BusinessEventSerializer {
+public class ShareAccountBusinessEventSerializer extends AbstractBusinessEventSerializer {
 
     private final ShareAccountReadPlatformService service;
     private final ShareAccountDataMapper mapper;
-    private final ByteBufferConverter byteBufferConverter;
 
     @Override
     public <T> boolean canSerialize(BusinessEvent<T> event) {
@@ -46,12 +43,10 @@ public class ShareAccountBusinessEventSerializer implements BusinessEventSeriali
     }
 
     @Override
-    public <T> byte[] serialize(BusinessEvent<T> rawEvent) throws IOException {
+    protected <T> ByteBufferSerializable toAvroDTO(BusinessEvent<T> rawEvent) {
         ShareAccountBusinessEvent event = (ShareAccountBusinessEvent) rawEvent;
         ShareAccountData data = service.retrieveOne(event.get().getId(), false);
-        ShareAccountDataV1 avroDto = mapper.map(data);
-        ByteBuffer buffer = avroDto.toByteBuffer();
-        return byteBufferConverter.convert(buffer);
+        return mapper.map(data);
     }
 
     @Override
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/share/ShareProductDividentsCreateBusinessEventSerializer.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/share/ShareProductDividentsCreateBusinessEventSerializer.java
index dadf9c91c..3370333b2 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/share/ShareProductDividentsCreateBusinessEventSerializer.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/share/ShareProductDividentsCreateBusinessEventSerializer.java
@@ -18,27 +18,24 @@
  */
 package org.apache.fineract.infrastructure.event.external.service.serialization.serializer.share;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import lombok.RequiredArgsConstructor;
 import org.apache.avro.generic.GenericContainer;
+import org.apache.fineract.avro.generator.ByteBufferSerializable;
 import org.apache.fineract.avro.share.v1.ShareProductDataV1;
 import org.apache.fineract.infrastructure.event.business.domain.BusinessEvent;
 import org.apache.fineract.infrastructure.event.business.domain.share.ShareProductDividentsCreateBusinessEvent;
 import org.apache.fineract.infrastructure.event.external.service.serialization.mapper.share.ShareProductDataMapper;
-import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.BusinessEventSerializer;
-import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
+import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.AbstractBusinessEventSerializer;
 import org.apache.fineract.portfolio.products.service.ShareProductReadPlatformService;
 import org.apache.fineract.portfolio.shareproducts.data.ShareProductData;
 import org.springframework.stereotype.Component;
 
 @Component
 @RequiredArgsConstructor
-public class ShareProductDividentsCreateBusinessEventSerializer implements BusinessEventSerializer {
+public class ShareProductDividentsCreateBusinessEventSerializer extends AbstractBusinessEventSerializer {
 
     private final ShareProductReadPlatformService service;
     private final ShareProductDataMapper mapper;
-    private final ByteBufferConverter byteBufferConverter;
 
     @Override
     public <T> boolean canSerialize(BusinessEvent<T> event) {
@@ -46,12 +43,10 @@ public class ShareProductDividentsCreateBusinessEventSerializer implements Busin
     }
 
     @Override
-    public <T> byte[] serialize(BusinessEvent<T> rawEvent) throws IOException {
+    protected <T> ByteBufferSerializable toAvroDTO(BusinessEvent<T> rawEvent) {
         ShareProductDividentsCreateBusinessEvent event = (ShareProductDividentsCreateBusinessEvent) rawEvent;
         ShareProductData data = (ShareProductData) service.retrieveOne(event.get(), false);
-        ShareProductDataV1 avroDto = mapper.map(data);
-        ByteBuffer buffer = avroDto.toByteBuffer();
-        return byteBufferConverter.convert(buffer);
+        return mapper.map(data);
     }
 
     @Override