You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2019/01/14 03:18:14 UTC

[servicecomb-java-chassis] 08/16: [SCB-1071][WIP] refactor message schema

This is an automated email from the ASF dual-hosted git repository.

liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git

commit b79a6adab54b4a7a162bd97db80bac305f207782
Author: wujimin <wu...@huawei.com>
AuthorDate: Sat Jan 5 20:38:11 2019 +0800

    [SCB-1071][WIP] refactor message schema
---
 .../java/io/protostuff/runtime/MessageSchema.java  | 268 ---------------------
 .../internal/schema/MessageAsFieldSchema.java      |  50 ++--
 .../schema/deserializer/MessageReadSchema.java     | 161 +++++++++++++
 .../schema/serializer/MessageWriteSchema.java      | 182 ++++++++++++++
 .../schema/serializer/PojoFieldSerializer.java     |  45 ----
 5 files changed, 378 insertions(+), 328 deletions(-)

diff --git a/foundations/foundation-protobuf/src/main/java/io/protostuff/runtime/MessageSchema.java b/foundations/foundation-protobuf/src/main/java/io/protostuff/runtime/MessageSchema.java
deleted file mode 100644
index 065b8e1..0000000
--- a/foundations/foundation-protobuf/src/main/java/io/protostuff/runtime/MessageSchema.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.protostuff.runtime;
-
-import static io.protostuff.runtime.RuntimeSchema.MIN_TAG_FOR_HASH_FIELD_MAP;
-
-import java.io.IOException;
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx;
-import org.apache.servicecomb.foundation.common.utils.bean.Getter;
-import org.apache.servicecomb.foundation.protobuf.ProtoMapper;
-import org.apache.servicecomb.foundation.protobuf.internal.bean.BeanDescriptor;
-import org.apache.servicecomb.foundation.protobuf.internal.bean.PropertyDescriptor;
-import org.apache.servicecomb.foundation.protobuf.internal.schema.FieldSchema;
-import org.apache.servicecomb.foundation.protobuf.internal.schema.serializer.PojoFieldSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.protostuff.Input;
-import io.protostuff.Output;
-import io.protostuff.Schema;
-import io.protostuff.compiler.model.Message;
-import io.protostuff.runtime.RuntimeEnv.Instantiator;
-
-public class MessageSchema implements Schema<Object>, FieldMap<Object> {
-  private static final Logger LOGGER = LoggerFactory.getLogger(MessageSchema.class);
-
-  private ProtoMapper protoMapper;
-
-  private FieldMap<Object> fieldMap;
-
-  private Class<Object> typeClass;
-
-  private Instantiator<Object> instantiator;
-
-  private Message message;
-
-  // one class can bind to different proto message (almost different version)
-  // so save the information only in message, not global
-  private final Map<Type, List<PojoFieldSerializer>> pojoFieldSerializers = new ConcurrentHashMapEx<>();
-
-  public void init(ProtoMapper protoMapper, Collection<Field<Object>> fields, Message message) {
-    init(protoMapper, null, fields, null, message);
-  }
-
-  public void init(ProtoMapper protoMapper, Class<Object> typeClass, Collection<Field<Object>> fields,
-      Instantiator<Object> instantiator,
-      Message message) {
-    this.protoMapper = protoMapper;
-    this.fieldMap = createFieldMap(fields);
-    this.instantiator = instantiator;
-    this.typeClass = typeClass;
-    this.message = message;
-  }
-
-  public Message getMessage() {
-    return message;
-  }
-
-  private FieldMap<Object> createFieldMap(Collection<Field<Object>> fields) {
-    int lastFieldNumber = 0;
-    for (Field<Object> field : fields) {
-      if (field.number > lastFieldNumber) {
-        lastFieldNumber = field.number;
-      }
-    }
-    if (preferHashFieldMap(fields, lastFieldNumber)) {
-      return new HashFieldMap<>(fields);
-    }
-    // array field map should be more efficient
-    return new ArrayFieldMap<>(fields, lastFieldNumber);
-  }
-
-  private boolean preferHashFieldMap(Collection<Field<Object>> fields, int lastFieldNumber) {
-    return lastFieldNumber > MIN_TAG_FOR_HASH_FIELD_MAP && lastFieldNumber >= 2 * fields.size();
-  }
-
-  @Override
-  public String getFieldName(int number) {
-    // only called on writes
-    final Field<Object> field = fieldMap.getFieldByNumber(number);
-    return field == null ? null : field.name;
-  }
-
-  @Override
-  public int getFieldNumber(String name) {
-    final Field<Object> field = fieldMap.getFieldByName(name);
-    return field == null ? null : field.number;
-  }
-
-  @Override
-  public boolean isInitialized(Object message) {
-    return true;
-  }
-
-  @Override
-  public Object newMessage() {
-    return instantiator.newInstance();
-  }
-
-  @Override
-  public String messageName() {
-    return message.getName();
-  }
-
-  @Override
-  public String messageFullName() {
-    return message.getCanonicalName();
-  }
-
-  @Override
-  public Class<? super Object> typeClass() {
-    return typeClass;
-  }
-
-  @Override
-  public void mergeFrom(Input input, Object message) throws IOException {
-    Field<Object> field = null;
-    try {
-      for (int n = input.readFieldNumber(this); n != 0; n = input.readFieldNumber(this)) {
-        field = fieldMap.getFieldByNumber(n);
-        if (field == null) {
-          input.handleUnknownField(n, this);
-        } else {
-          field.mergeFrom(input, message);
-        }
-      }
-    } catch (IOException e) {
-      logError((FieldSchema) field, "deserialize", e);
-      throw e;
-    } catch (RuntimeException e) {
-      logError((FieldSchema) field, "deserialize", e);
-      throw e;
-    }
-  }
-
-  protected void logError(FieldSchema fieldSchema, String action, Throwable e) {
-    if (fieldSchema == null) {
-      return;
-    }
-
-    io.protostuff.compiler.model.Field protoField = fieldSchema.getProtoField();
-    LOGGER.error("Failed to {}, field={}:{}, type={}",
-        action,
-        protoField.getType().getCanonicalName(),
-        protoField.getName(),
-        protoField.getTypeName(),
-        e.getMessage());
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public void writeTo(Output output, Object message) throws IOException {
-    if (message == null) {
-      return;
-    }
-
-    if (message instanceof Map) {
-      writeFromMap(output, (Map<String, Object>) message);
-      return;
-    }
-
-    writeFromPojo(output, message);
-  }
-
-  /**
-   * <pre>
-   * when use with generic
-   * each time serialize the value field, will run with the real type
-   * so there is no problem
-   *
-   * eg: CustomeGeneric&lt;User&gt; someMethod(CustomGeneric&lt;People&gt; input)
-   *   input: {
-   *     People value
-   *   }
-   *   output: {
-   *     User value
-   *   }
-   * </pre>
-   * @param output
-   * @param value
-   * @throws IOException
-   */
-  protected void writeFromPojo(Output output, Object value) throws IOException {
-    List<PojoFieldSerializer> serializers = pojoFieldSerializers
-        .computeIfAbsent(value.getClass(), this::createFieldSerializers);
-    for (PojoFieldSerializer serializer : serializers) {
-      serializer.writeTo(output, value);
-    }
-  }
-
-  protected List<PojoFieldSerializer> createFieldSerializers(Type type) {
-    BeanDescriptor beanDescriptor = protoMapper.getBeanDescriptorManager().getOrCreateBeanDescriptor(type);
-    List<PojoFieldSerializer> pojoFieldSerializers = new ArrayList<>();
-    for (Field<Object> f : fieldMap.getFields()) {
-      PropertyDescriptor propertyDescriptor = beanDescriptor.getPropertyDescriptors().get(f.name);
-      if (propertyDescriptor == null) {
-        continue;
-      }
-
-      Getter getter = propertyDescriptor.getGetter();
-      if (getter == null) {
-        continue;
-      }
-
-      pojoFieldSerializers.add(new PojoFieldSerializer(getter, (FieldSchema) f));
-    }
-
-    return pojoFieldSerializers;
-  }
-
-  protected void writeFromMap(Output output, Map<String, Object> map) throws IOException {
-    for (Entry<String, Object> entry : map.entrySet()) {
-      if (entry.getValue() == null) {
-        continue;
-      }
-
-      Field<Object> field = fieldMap.getFieldByName(entry.getKey());
-      if (field == null) {
-        // not defined in proto, ignore it.
-        continue;
-      }
-
-      field.writeTo(output, entry.getValue());
-    }
-  }
-
-  @Override
-  public Field<Object> getFieldByNumber(int n) {
-    return fieldMap.getFieldByNumber(n);
-  }
-
-  @Override
-  public Field<Object> getFieldByName(String fieldName) {
-    return fieldMap.getFieldByName(fieldName);
-  }
-
-  @Override
-  public int getFieldCount() {
-    return fieldMap.getFieldCount();
-  }
-
-  @Override
-  public List<Field<Object>> getFields() {
-    return fieldMap.getFields();
-  }
-}
diff --git a/foundations/foundation-protobuf/src/main/java/org/apache/servicecomb/foundation/protobuf/internal/schema/MessageAsFieldSchema.java b/foundations/foundation-protobuf/src/main/java/org/apache/servicecomb/foundation/protobuf/internal/schema/MessageAsFieldSchema.java
index cb5df01..5f4fa99 100644
--- a/foundations/foundation-protobuf/src/main/java/org/apache/servicecomb/foundation/protobuf/internal/schema/MessageAsFieldSchema.java
+++ b/foundations/foundation-protobuf/src/main/java/org/apache/servicecomb/foundation/protobuf/internal/schema/MessageAsFieldSchema.java
@@ -18,33 +18,53 @@ package org.apache.servicecomb.foundation.protobuf.internal.schema;
 
 import java.io.IOException;
 
-import io.protostuff.Input;
-import io.protostuff.Output;
-import io.protostuff.Schema;
+import org.apache.servicecomb.foundation.common.utils.bean.Getter;
+import org.apache.servicecomb.foundation.common.utils.bean.Setter;
+import org.apache.servicecomb.foundation.protobuf.internal.bean.PropertyDescriptor;
+
+import io.protostuff.InputEx;
+import io.protostuff.OutputEx;
+import io.protostuff.SchemaEx;
 import io.protostuff.compiler.model.Field;
+import io.protostuff.runtime.FieldSchema;
+
+public class MessageAsFieldSchema<T> extends FieldSchema<T> {
+  protected final SchemaEx<Object> schema;
 
-public class MessageAsFieldSchema extends FieldSchema {
-  protected Schema<Object> schema;
+  protected final Getter<T, Object> getter;
 
-  public MessageAsFieldSchema(Field protoField, Schema<Object> schema) {
-    super(protoField);
+  protected final Setter<T, Object> setter;
+
+  public MessageAsFieldSchema(Field protoField, PropertyDescriptor propertyDescriptor, SchemaEx<Object> schema) {
+    super(protoField, propertyDescriptor.getJavaType());
     this.schema = schema;
+    this.getter = propertyDescriptor.getGetter();
+    this.setter = propertyDescriptor.getSetter();
   }
 
   @Override
-  public void writeTo(Output output, Object value) throws IOException {
-    output.writeObject(number, value, schema, false);
+  public final void getAndWriteTo(OutputEx output, T message) throws IOException {
+    Object value = getter.get(message);
+    if (value == null) {
+      return;
+    }
+
+    output.writeObject(tag, tagSize, value, schema);
   }
 
   @Override
-  public Object readFrom(Input input) throws IOException {
-    return input.mergeObject(null, schema);
+  public final void writeTo(OutputEx output, Object value) throws IOException {
+    output.writeObject(tag, tagSize, value, schema);
   }
 
   @Override
-  public void mergeFrom(Input input, Object message) throws IOException {
-    Object existing = getter.get(message);
-    Object fieldValue = input.mergeObject(existing, schema);
-    setter.set(message, fieldValue);
+  public final int mergeFrom(InputEx input, T message) throws IOException {
+    Object value = getter.get(message);
+    if (value == null) {
+      value = schema.newMessage();
+      setter.set(message, value);
+    }
+    input.mergeObject(value, schema);
+    return input.readFieldNumber();
   }
 }
diff --git a/foundations/foundation-protobuf/src/main/java/org/apache/servicecomb/foundation/protobuf/internal/schema/deserializer/MessageReadSchema.java b/foundations/foundation-protobuf/src/main/java/org/apache/servicecomb/foundation/protobuf/internal/schema/deserializer/MessageReadSchema.java
new file mode 100644
index 0000000..c7d32d4
--- /dev/null
+++ b/foundations/foundation-protobuf/src/main/java/org/apache/servicecomb/foundation/protobuf/internal/schema/deserializer/MessageReadSchema.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.foundation.protobuf.internal.schema.deserializer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.servicecomb.foundation.protobuf.ProtoMapper;
+import org.apache.servicecomb.foundation.protobuf.internal.ProtoConst;
+import org.apache.servicecomb.foundation.protobuf.internal.bean.BeanDescriptor;
+import org.apache.servicecomb.foundation.protobuf.internal.bean.PropertyDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.JavaType;
+
+import io.protostuff.InputEx;
+import io.protostuff.OutputEx;
+import io.protostuff.SchemaEx;
+import io.protostuff.compiler.model.Field;
+import io.protostuff.compiler.model.Message;
+import io.protostuff.runtime.FieldMapEx;
+import io.protostuff.runtime.FieldSchema;
+import io.protostuff.runtime.RuntimeEnv;
+import io.protostuff.runtime.RuntimeEnv.Instantiator;
+
+/**
+ * <pre>
+ * map.put("user", new User())
+ * root write from map, but user should write from pojo
+ * so one schema should support dynamic and concrete logic at the same time
+ * </pre>
+ */
+public class MessageReadSchema<T> implements SchemaEx<T> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(MessageReadSchema.class);
+
+  protected ProtoMapper protoMapper;
+
+  protected Message message;
+
+  private FieldMapEx<T> fieldMap;
+
+  private Instantiator<T> instantiator;
+
+  private JavaType javaType;
+
+  @SuppressWarnings("unchecked")
+  public MessageReadSchema(ProtoMapper protoMapper, Message message, JavaType javaType) {
+    this.protoMapper = protoMapper;
+    this.message = message;
+    this.javaType = javaType;
+    if (javaType.isJavaLangObject() || Map.class.isAssignableFrom(javaType.getRawClass())) {
+      javaType = ProtoConst.MAP_TYPE;
+    }
+    this.instantiator = RuntimeEnv.newInstantiator((Class<T>) javaType.getRawClass());
+  }
+
+  public Message getMessage() {
+    return message;
+  }
+
+  @Override
+  public T newMessage() {
+    return instantiator.newInstance();
+  }
+
+  @Override
+  public String messageName() {
+    return message.getName();
+  }
+
+  public FieldMapEx<T> getFieldMap() {
+    return fieldMap;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void init() {
+    if (Map.class.isAssignableFrom(javaType.getRawClass())) {
+      this.fieldMap = (FieldMapEx<T>) protoMapper.getDeserializerSchemaManager()
+          .createMapFields(message);
+      return;
+    }
+
+    this.createFieldMap();
+  }
+
+  private void createFieldMap() {
+    DeserializerSchemaManager deserializerSchemaManager = protoMapper.getDeserializerSchemaManager();
+    BeanDescriptor beanDescriptor = protoMapper.getBeanDescriptorManager().getOrCreateBeanDescriptor(javaType);
+
+    List<FieldSchema<T>> fieldSchemas = new ArrayList<>();
+    for (PropertyDescriptor propertyDescriptor : beanDescriptor.getPropertyDescriptors().values()) {
+      Field protoField = message.getField(propertyDescriptor.getName());
+      if (protoField == null) {
+        LOGGER.info("java field {}:{} not exist in proto message {}, ignore it.",
+            beanDescriptor.getJavaType().getRawClass().getName(),
+            propertyDescriptor.getName(), message.getCanonicalName());
+        continue;
+      }
+      if (propertyDescriptor.getSetter() == null) {
+        LOGGER.info("no setter for java field {}:{} in proto message {}, ignore it.",
+            beanDescriptor.getJavaType().getRawClass().getName(),
+            propertyDescriptor.getName(), message.getCanonicalName());
+        continue;
+      }
+
+      FieldSchema<T> fieldSchema = deserializerSchemaManager.createSchemaField(protoField, propertyDescriptor);
+      fieldSchemas.add(fieldSchema);
+    }
+
+    this.fieldMap = FieldMapEx.createFieldMap(fieldSchemas);
+  }
+
+  @Override
+  public void mergeFrom(InputEx input, T message) throws IOException {
+    FieldSchema<T> fieldSchema = null;
+    try {
+      for (int n = input.readFieldNumber(); n != 0; ) {
+        fieldSchema = fieldMap.getFieldByNumber(n);
+        if (fieldSchema != null) {
+          n = fieldSchema.mergeFrom(input, message);
+          continue;
+        }
+
+        input.handleUnknownField(n);
+        n = input.readFieldNumber();
+      }
+    } catch (Throwable e) {
+      Field protoField = fieldSchema.getProtoField();
+      LOGGER.error("Failed to mergeFrom, field={}:{}, type={}",
+          protoField.getType().getCanonicalName(),
+          protoField.getName(),
+          protoField.getTypeName(),
+          e.getMessage());
+      throw e;
+    }
+  }
+
+  @Override
+  public void writeTo(OutputEx output, Object value) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git a/foundations/foundation-protobuf/src/main/java/org/apache/servicecomb/foundation/protobuf/internal/schema/serializer/MessageWriteSchema.java b/foundations/foundation-protobuf/src/main/java/org/apache/servicecomb/foundation/protobuf/internal/schema/serializer/MessageWriteSchema.java
new file mode 100644
index 0000000..0b20cf8
--- /dev/null
+++ b/foundations/foundation-protobuf/src/main/java/org/apache/servicecomb/foundation/protobuf/internal/schema/serializer/MessageWriteSchema.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.foundation.protobuf.internal.schema.serializer;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx;
+import org.apache.servicecomb.foundation.protobuf.ProtoMapper;
+import org.apache.servicecomb.foundation.protobuf.internal.bean.BeanDescriptor;
+import org.apache.servicecomb.foundation.protobuf.internal.bean.PropertyDescriptor;
+
+import com.fasterxml.jackson.databind.JavaType;
+
+import io.protostuff.InputEx;
+import io.protostuff.OutputEx;
+import io.protostuff.SchemaEx;
+import io.protostuff.compiler.model.Field;
+import io.protostuff.compiler.model.Message;
+import io.protostuff.runtime.FieldMapEx;
+import io.protostuff.runtime.FieldSchema;
+
+/**
+ * <pre>
+ * map.put("user", new User())
+ * root write from map, but user should write from pojo
+ * so one schema should support dynamic and concrete logic at the same time
+ * </pre>
+ */
+public class MessageWriteSchema<T> implements SchemaEx<T> {
+  protected ProtoMapper protoMapper;
+
+  protected Message message;
+
+  private JavaType javaType;
+
+  // mostly, one message only relate to one pojo
+  private final Class<T> mainPojoCls;
+
+  private FieldMapEx<T> mainPojoFieldMaps;
+
+  private FieldMapEx<Map<Object, Object>> mapFieldMaps;
+
+  // if not equals to mainPojoCls, then will find from pojoFieldMaps
+  private final Map<Class<?>, FieldMapEx<?>> pojoFieldMaps = new ConcurrentHashMapEx<>();
+
+  @SuppressWarnings("unchecked")
+  public MessageWriteSchema(ProtoMapper protoMapper, Message message, JavaType javaType) {
+    this.protoMapper = protoMapper;
+    this.message = message;
+    this.javaType = javaType;
+    this.mainPojoCls = (Class<T>) javaType.getRawClass();
+  }
+
+  public Message getMessage() {
+    return message;
+  }
+
+  @Override
+  public T newMessage() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public String messageName() {
+    return message.getName();
+  }
+
+  public JavaType getJavaType() {
+    return javaType;
+  }
+
+  public Class<T> getMainPojoCls() {
+    return mainPojoCls;
+  }
+
+  public FieldMapEx<T> getMainPojoFieldMaps() {
+    return mainPojoFieldMaps;
+  }
+
+  @Override
+  public void init() {
+    this.mainPojoFieldMaps = createPojoFields(javaType);
+  }
+
+  private FieldMapEx<T> createPojoFields(Type type) {
+    SerializerSchemaManager serializerSchemaManager = protoMapper.getSerializerSchemaManager();
+    BeanDescriptor beanDescriptor = protoMapper.getBeanDescriptorManager().getOrCreateBeanDescriptor(type);
+
+    List<FieldSchema<T>> fieldSchemas = new ArrayList<>();
+    for (Field protoField : message.getFields()) {
+      PropertyDescriptor propertyDescriptor = beanDescriptor.getPropertyDescriptors().get(protoField.getName());
+      if (propertyDescriptor == null) {
+        continue;
+      }
+
+      Object getter = propertyDescriptor.getGetter();
+      if (getter == null) {
+        continue;
+      }
+
+      FieldSchema<T> fieldSchema = serializerSchemaManager.createSchemaField(protoField, propertyDescriptor);
+      fieldSchemas.add(fieldSchema);
+    }
+
+    return FieldMapEx.createFieldMap(fieldSchemas);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void writeTo(OutputEx output, Object value) throws IOException {
+    if (value instanceof Map) {
+      writeFromMap(output, (Map<String, Object>) value);
+      return;
+    }
+
+    if (mainPojoCls == value.getClass()) {
+      writeFromMainPojo(output, (T) value);
+      return;
+    }
+
+    writeDynamicPojo(output, value);
+  }
+
+  private void writeFromMainPojo(OutputEx output, T value) throws IOException {
+    for (FieldSchema<T> fieldSchema : mainPojoFieldMaps.getFields()) {
+      fieldSchema.getAndWriteTo(output, value);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private <T> void writeDynamicPojo(OutputEx output, Object dynamicValue) throws IOException {
+    FieldMapEx<T> fieldMapEx = (FieldMapEx<T>) this.pojoFieldMaps
+        .computeIfAbsent(dynamicValue.getClass(), this::createPojoFields);
+
+    T value = (T) dynamicValue;
+    for (FieldSchema<T> fieldSchema : fieldMapEx.getFields()) {
+      fieldSchema.getAndWriteTo(output, value);
+    }
+  }
+
+  protected final void writeFromMap(OutputEx output, Map<String, Object> map) throws IOException {
+    if (mapFieldMaps == null) {
+      mapFieldMaps = protoMapper.getSerializerSchemaManager().createMapFields(message);
+    }
+
+    for (Entry<String, Object> entry : map.entrySet()) {
+      if (entry.getValue() == null) {
+        continue;
+      }
+
+      FieldSchema<Map<Object, Object>> fieldSchema = mapFieldMaps.getFieldByName(entry.getKey());
+      if (fieldSchema != null) {
+        fieldSchema.writeTo(output, entry.getValue());
+      }
+    }
+  }
+
+  @Override
+  public void mergeFrom(InputEx input, T message) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git a/foundations/foundation-protobuf/src/main/java/org/apache/servicecomb/foundation/protobuf/internal/schema/serializer/PojoFieldSerializer.java b/foundations/foundation-protobuf/src/main/java/org/apache/servicecomb/foundation/protobuf/internal/schema/serializer/PojoFieldSerializer.java
deleted file mode 100644
index baa2b9c..0000000
--- a/foundations/foundation-protobuf/src/main/java/org/apache/servicecomb/foundation/protobuf/internal/schema/serializer/PojoFieldSerializer.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicecomb.foundation.protobuf.internal.schema.serializer;
-
-import java.io.IOException;
-
-import org.apache.servicecomb.foundation.common.utils.bean.Getter;
-import org.apache.servicecomb.foundation.protobuf.internal.schema.FieldSchema;
-
-import io.protostuff.Output;
-
-public class PojoFieldSerializer {
-  private final Getter getter;
-
-  private final FieldSchema fieldSchema;
-
-  public PojoFieldSerializer(Getter getter, FieldSchema fieldSchema) {
-    this.getter = getter;
-    this.fieldSchema = fieldSchema;
-  }
-
-
-  public void writeTo(Output output, Object instance) throws IOException {
-    Object value = getter.get(instance);
-    if (value == null) {
-      return;
-    }
-
-    fieldSchema.writeTo(output, value);
-  }
-}