You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zh...@apache.org on 2019/12/15 12:13:36 UTC

[incubator-doris] branch master updated: Add classes related to "tag". (#2343)

This is an automated email from the ASF dual-hosted git repository.

zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e65a645  Add classes related to "tag". (#2343)
e65a645 is described below

commit e65a6451383fc7c15aa0993f143b0cd20779ec48
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Sun Dec 15 20:13:29 2019 +0800

    Add classes related to "tag". (#2343)
    
    [Tag System]
    This CL includes 2 parts:
    
        Add classes related to "tag"
            Resource: is the collective name of the nodes that provide various service capabilities in Doris cluster.
            Tag: A Tag consists of type and name.
            TagSet: TagSet represents a set of tags.
            TagManager: maintains 2 indexes:
            one is from tag to resource.
            one is from resource to tags
    
        ISSUE #1723
    
        Using JSON as serialization methods of metadata
    
        Introduce GSON library to serialize the new classes mentioned above.
    
        ISSUE #2415 #2389
    
    GSON's version is updated to 2.8.6
---
 fe/checkstyle.xml                                  |   2 +
 fe/pom.xml                                         |   2 +-
 .../org/apache/doris/master/ReportHandler.java     |   2 +-
 .../org/apache/doris/persist/gson/GsonUtils.java   | 274 +++++++++++++
 .../persist/gson/RuntimeTypeAdapterFactory.java    | 324 +++++++++++++++
 .../java/org/apache/doris/qe/ConnectProcessor.java |   4 +-
 .../java/org/apache/doris/resource/Resource.java   |  59 +++
 .../main/java/org/apache/doris/resource/Tag.java   | 118 ++++++
 .../java/org/apache/doris/resource/TagManager.java | 209 ++++++++++
 .../java/org/apache/doris/resource/TagSet.java     | 209 ++++++++++
 .../doris/transaction/GlobalTransactionMgr.java    |   2 +-
 .../gson/GsonDerivedClassSerializationTest.java    | 155 ++++++++
 .../doris/persist/gson/GsonSerializationTest.java  | 433 +++++++++++++++++++++
 .../doris/resource/TagSerializationTest.java       | 113 ++++++
 .../java/org/apache/doris/resource/TagTest.java    | 126 ++++++
 15 files changed, 2027 insertions(+), 5 deletions(-)

diff --git a/fe/checkstyle.xml b/fe/checkstyle.xml
index 8ffb22f..b26f878 100644
--- a/fe/checkstyle.xml
+++ b/fe/checkstyle.xml
@@ -18,6 +18,8 @@
   ~ under the License.
   -->
 
+<!-- See https://checkstyle.org/ for details -->
+
 <!DOCTYPE module PUBLIC "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
         "http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
 <module name="Checker">
diff --git a/fe/pom.xml b/fe/pom.xml
index de663d7..e4c47a7 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -151,7 +151,7 @@ under the License.
         <dependency>
             <groupId>com.google.code.gson</groupId>
             <artifactId>gson</artifactId>
-            <version>2.2.4</version>
+            <version>2.8.6</version>
         </dependency>
 
         <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
diff --git a/fe/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/src/main/java/org/apache/doris/master/ReportHandler.java
index fd27295..b631ce9 100644
--- a/fe/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -260,7 +260,7 @@ public class ReportHandler extends Daemon {
         Map<Long, TTabletInfo> foundTabletsWithInvalidSchema = new HashMap<Long, TTabletInfo>();
         // storage medium -> tablet id
         ListMultimap<TStorageMedium, Long> tabletMigrationMap = LinkedListMultimap.create();
-        
+
         // dbid -> txn id -> [partition info]
         Map<Long, ListMultimap<Long, TPartitionVersionInfo>> transactionsToPublish = Maps.newHashMap();
         ListMultimap<Long, Long> transactionsToClear = LinkedListMultimap.create();
diff --git a/fe/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
new file mode 100644
index 0000000..f997f91
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -0,0 +1,274 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.persist.gson;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Table;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.ExclusionStrategy;
+import com.google.gson.FieldAttributes;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+import com.google.gson.annotations.SerializedName;
+
+import java.lang.reflect.Method;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/*
+ * Some utilities about Gson.
+ * User should get GSON instance from this class to do the serialization.
+ * 
+ *      GsonUtils.GSON.toJson(...)
+ *      GsonUtils.GSON.fromJson(...)
+ * 
+ * More example can be seen in unit test case: "org.apache.doris.common.util.GsonSerializationTest.java".
+ * 
+ * For inherited class serialization, see "org.apache.doris.common.util.GsonDerivedClassSerializationTest.java"
+ * 
+ * And developers may need to add other serialization adapters for custom complex java classes.
+ * You need implement a class to implements JsonSerializer and JsonDeserializer, and register it to GSON_BUILDER.
+ * See the following "GuavaTableAdapter" and "GuavaMultimapAdapter" for example.
+ */
+public class GsonUtils {
+
+    // the builder of GSON instance.
+    // Add any other adapters if necessary.
+    private static final GsonBuilder GSON_BUILDER = new GsonBuilder()
+            .addSerializationExclusionStrategy(new HiddenAnnotationExclusionStrategy())
+            .enableComplexMapKeySerialization()
+            .registerTypeHierarchyAdapter(Table.class, new GuavaTableAdapter())
+            .registerTypeHierarchyAdapter(Multimap.class, new GuavaMultimapAdapter());
+
+    // this instance is thread-safe.
+    public static final Gson GSON = GSON_BUILDER.create();
+
+    /*
+     * The exclusion strategy of GSON serialization.
+     * Any fields without "@SerializedName" annotation with be ignore with
+     * serializing and deserializing.
+     */
+    public static class HiddenAnnotationExclusionStrategy implements ExclusionStrategy {
+        public boolean shouldSkipField(FieldAttributes f) {
+            return f.getAnnotation(SerializedName.class) == null;
+        }
+
+        @Override
+        public boolean shouldSkipClass(Class<?> clazz) {
+            return false;
+        }
+    }
+
+    /*
+     * 
+     * The json adapter for Guava Table.
+     * Current support:
+     * 1. HashBasedTable
+     * 
+     * The RowKey, ColumnKey and Value classes in Table should also be serializable.
+     * 
+     * What is Adapter and Why we should implement it?
+     * 
+     * Adapter is mainly used to provide serialization and deserialization methods for some complex classes.
+     * Complex classes here usually refer to classes that are complex and cannot be modified. 
+     * These classes mainly include third-party library classes or some inherited classes.
+     */
+    private static class GuavaTableAdapter<R, C, V>
+            implements JsonSerializer<Table<R, C, V>>, JsonDeserializer<Table<R, C, V>> {
+        /*
+         * serialize Table<R, C, V> as:
+         * {
+         * "rowKeys": [ "rowKey1", "rowKey2", ...],
+         * "columnKeys": [ "colKey1", "colKey2", ...],
+         * "cells" : [[0, 0, value1], [0, 1, value2], ...]
+         * }
+         * 
+         * the [0, 0] .. in cells are the indexes of rowKeys array and columnKeys array.
+         * This serialization method can reduce the size of json string because it
+         * replace the same row key
+         * and column key to integer.
+         */
+        @Override
+        public JsonElement serialize(Table<R, C, V> src, Type typeOfSrc, JsonSerializationContext context) {
+            JsonArray rowKeysJsonArray = new JsonArray();
+            Map<R, Integer> rowKeyToIndex = new HashMap<>();
+            for (R rowKey : src.rowKeySet()) {
+                rowKeyToIndex.put(rowKey, rowKeyToIndex.size());
+                rowKeysJsonArray.add(context.serialize(rowKey));
+            }
+            JsonArray columnKeysJsonArray = new JsonArray();
+            Map<C, Integer> columnKeyToIndex = new HashMap<>();
+            for (C columnKey : src.columnKeySet()) {
+                columnKeyToIndex.put(columnKey, columnKeyToIndex.size());
+                columnKeysJsonArray.add(context.serialize(columnKey));
+            }
+            JsonArray cellsJsonArray = new JsonArray();
+            for (Table.Cell<R, C, V> cell : src.cellSet()) {
+                int rowIndex = rowKeyToIndex.get(cell.getRowKey());
+                int columnIndex = columnKeyToIndex.get(cell.getColumnKey());
+                cellsJsonArray.add(rowIndex);
+                cellsJsonArray.add(columnIndex);
+                cellsJsonArray.add(context.serialize(cell.getValue()));
+            }
+            JsonObject tableJsonObject = new JsonObject();
+            tableJsonObject.addProperty("clazz", src.getClass().getSimpleName());
+            tableJsonObject.add("rowKeys", rowKeysJsonArray);
+            tableJsonObject.add("columnKeys", columnKeysJsonArray);
+            tableJsonObject.add("cells", cellsJsonArray);
+            return tableJsonObject;
+        }
+
+        @Override
+        public Table<R, C, V> deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) {
+            Type typeOfR;
+            Type typeOfC;
+            Type typeOfV;
+            {
+                ParameterizedType parameterizedType = (ParameterizedType) typeOfT;
+                Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
+                typeOfR = actualTypeArguments[0];
+                typeOfC = actualTypeArguments[1];
+                typeOfV = actualTypeArguments[2];
+            }
+            JsonObject tableJsonObject = json.getAsJsonObject();
+            String tableClazz = tableJsonObject.get("clazz").getAsString();
+            JsonArray rowKeysJsonArray = tableJsonObject.getAsJsonArray("rowKeys");
+            Map<Integer, R> rowIndexToKey = new HashMap<>();
+            for (JsonElement jsonElement : rowKeysJsonArray) {
+                R rowKey = context.deserialize(jsonElement, typeOfR);
+                rowIndexToKey.put(rowIndexToKey.size(), rowKey);
+            }
+            JsonArray columnKeysJsonArray = tableJsonObject.getAsJsonArray("columnKeys");
+            Map<Integer, C> columnIndexToKey = new HashMap<>();
+            for (JsonElement jsonElement : columnKeysJsonArray) {
+                C columnKey = context.deserialize(jsonElement, typeOfC);
+                columnIndexToKey.put(columnIndexToKey.size(), columnKey);
+            }
+            JsonArray cellsJsonArray = tableJsonObject.getAsJsonArray("cells");
+            Table<R, C, V> table = null;
+            switch (tableClazz) {
+                case "HashBasedTable":
+                    table = HashBasedTable.create();
+                    break;
+                default:
+                    Preconditions.checkState(false, "unknown guava table class: " + tableClazz);
+                    break;
+            }
+            for (int i = 0; i < cellsJsonArray.size(); i = i + 3) {
+                // format is [rowIndex, columnIndex, value]
+                int rowIndex = cellsJsonArray.get(i).getAsInt();
+                int columnIndex = cellsJsonArray.get(i + 1).getAsInt();
+                R rowKey = rowIndexToKey.get(rowIndex);
+                C columnKey = columnIndexToKey.get(columnIndex);
+                V value = context.deserialize(cellsJsonArray.get(i + 2), typeOfV);
+                table.put(rowKey, columnKey, value);
+            }
+            return table;
+        }
+    }
+
+    /*
+     * The json adapter for Guava Multimap.
+     * Current support:
+     * 1. ArrayListMultimap
+     * 2. HashMultimap
+     * 3. LinkedListMultimap
+     * 4. LinkedHashMultimap
+     * 
+     * The key and value classes of multi map should also be json serializable.
+     */
+    private static class GuavaMultimapAdapter<K, V>
+            implements JsonSerializer<Multimap<K, V>>, JsonDeserializer<Multimap<K, V>> {
+
+        private static final Type asMapReturnType = getAsMapMethod().getGenericReturnType();
+
+        private static Type asMapType(Type multimapType) {
+            return TypeToken.of(multimapType).resolveType(asMapReturnType).getType();
+        }
+
+        private static Method getAsMapMethod() {
+            try {
+                return Multimap.class.getDeclaredMethod("asMap");
+            } catch (NoSuchMethodException e) {
+                throw new AssertionError(e);
+            }
+        }
+    
+        @Override
+        public JsonElement serialize(Multimap<K, V> map, Type typeOfSrc, JsonSerializationContext context) {
+            JsonObject jsonObject = new JsonObject();
+            jsonObject.addProperty("clazz", map.getClass().getSimpleName());
+            Map<K, Collection<V>> asMap = map.asMap();
+            Type type = asMapType(typeOfSrc);
+            JsonElement jsonElement = context.serialize(asMap, type);
+            jsonObject.add("map", jsonElement);
+            return jsonObject;
+        }
+    
+        @Override
+        public Multimap<K, V> deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context)
+                throws JsonParseException {
+            JsonObject jsonObject = json.getAsJsonObject();
+            String clazz = jsonObject.get("clazz").getAsString();
+
+            JsonElement mapElement = jsonObject.get("map");
+            Map<K, Collection<V>> asMap = context.deserialize(mapElement, asMapType(typeOfT));
+
+            Multimap<K, V> map = null;
+            switch (clazz) {
+                case "ArrayListMultimap":
+                    map = ArrayListMultimap.create();
+                    break;
+                case "HashMultimap":
+                    map = HashMultimap.create();
+                    break;
+                case "LinkedListMultimap":
+                    map = LinkedListMultimap.create();
+                    break;
+                case "LinkedHashMultimap":
+                    map = LinkedHashMultimap.create();
+                    break;
+                default:
+                    Preconditions.checkState(false, "unknown guava multi map class: " + clazz);
+                    break;
+            }
+
+            for (Map.Entry<K, Collection<V>> entry : asMap.entrySet()) {
+                map.putAll(entry.getKey(), entry.getValue());
+            }
+            return map;
+        }
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/persist/gson/RuntimeTypeAdapterFactory.java b/fe/src/main/java/org/apache/doris/persist/gson/RuntimeTypeAdapterFactory.java
new file mode 100644
index 0000000..924dc9d
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/persist/gson/RuntimeTypeAdapterFactory.java
@@ -0,0 +1,324 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Copyright (C) 2011 Google Inc.
+
+package org.apache.doris.persist.gson;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.TypeAdapter;
+import com.google.gson.TypeAdapterFactory;
+import com.google.gson.internal.Streams;
+import com.google.gson.reflect.TypeToken;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * Adapts values whose runtime type may differ from their declaration type. This
+ * is necessary when a field's type is not the same type that GSON should create
+ * when deserializing that field. For example, consider these types:
+ * 
+ * <pre>
+ * {
+ *     &#64;code
+ *     abstract class Shape {
+ *         int x;
+ *         int y;
+ *     }
+ *     class Circle extends Shape {
+ *         int radius;
+ *     }
+ *     class Rectangle extends Shape {
+ *         int width;
+ *         int height;
+ *     }
+ *     class Diamond extends Shape {
+ *         int width;
+ *         int height;
+ *     }
+ *     class Drawing {
+ *         Shape bottomShape;
+ *         Shape topShape;
+ *     }
+ * }
+ * </pre>
+ * <p>
+ * Without additional type information, the serialized JSON is ambiguous. Is
+ * the bottom shape in this drawing a rectangle or a diamond?
+ * 
+ * <pre>
+ *    {@code
+ *   {
+ *     "bottomShape": {
+ *       "width": 10,
+ *       "height": 5,
+ *       "x": 0,
+ *       "y": 0
+ *     },
+ *     "topShape": {
+ *       "radius": 2,
+ *       "x": 4,
+ *       "y": 1
+ *     }
+ *   }}
+ * </pre>
+ * 
+ * This class addresses this problem by adding type information to the
+ * serialized JSON and honoring that type information when the JSON is
+ * deserialized:
+ * 
+ * <pre>
+ *    {@code
+ *   {
+ *     "bottomShape": {
+ *       "type": "Diamond",
+ *       "width": 10,
+ *       "height": 5,
+ *       "x": 0,
+ *       "y": 0
+ *     },
+ *     "topShape": {
+ *       "type": "Circle",
+ *       "radius": 2,
+ *       "x": 4,
+ *       "y": 1
+ *     }
+ *   }}
+ * </pre>
+ * 
+ * Both the type field name ({@code "type"}) and the type labels ({@code
+ * "Rectangle"}) are configurable.
+ *
+ * <h3>Registering Types</h3>
+ * Create a {@code RuntimeTypeAdapterFactory} by passing the base type and type
+ * field
+ * name to the {@link #of} factory method. If you don't supply an explicit type
+ * field name, {@code "type"} will be used.
+ * 
+ * <pre>
+ * {
+ *     &#64;code
+ *     RuntimeTypeAdapterFactory<Shape> shapeAdapterFactory = RuntimeTypeAdapterFactory.of(Shape.class, "type");
+ * }
+ * </pre>
+ * 
+ * Next register all of your subtypes. Every subtype must be explicitly
+ * registered. This protects your application from injection attacks. If you
+ * don't supply an explicit type label, the type's simple name will be used.
+ * 
+ * <pre>
+ *    {@code
+ *   shapeAdapterFactory.registerSubtype(Rectangle.class, "Rectangle");
+ *   shapeAdapterFactory.registerSubtype(Circle.class, "Circle");
+ *   shapeAdapterFactory.registerSubtype(Diamond.class, "Diamond");
+ * }
+ * </pre>
+ * 
+ * Finally, register the type adapter factory in your application's GSON
+ * builder:
+ * 
+ * <pre>
+ * {
+ *     &#64;code
+ *     Gson gson = new GsonBuilder().registerTypeAdapterFactory(shapeAdapterFactory).create();
+ * }
+ * </pre>
+ * 
+ * Like {@code GsonBuilder}, this API supports chaining:
+ * 
+ * <pre>
+ * {
+ *     &#64;code
+ *     RuntimeTypeAdapterFactory<Shape> shapeAdapterFactory = RuntimeTypeAdapterFactory.of(Shape.class)
+ *             .registerSubtype(Rectangle.class).registerSubtype(Circle.class).registerSubtype(Diamond.class);
+ * }
+ * </pre>
+ *
+ * <h3>Serialization and deserialization</h3>
+ * In order to serialize and deserialize a polymorphic object,
+ * you must specify the base type explicitly.
+ * 
+ * <pre>
+ * {
+ *     &#64;code
+ *     Diamond diamond = new Diamond();
+ *     String json = gson.toJson(diamond, Shape.class);
+ * }
+ * </pre>
+ * 
+ * And then:
+ * 
+ * <pre>
+ * {
+ *     &#64;code
+ *     Shape shape = gson.fromJson(json, Shape.class);
+ * }
+ * </pre>
+ */
+public final class RuntimeTypeAdapterFactory<T> implements TypeAdapterFactory {
+    private final Class<?> baseType;
+    private final String typeFieldName;
+    private final Map<String, Class<?>> labelToSubtype = new LinkedHashMap<String, Class<?>>();
+    private final Map<Class<?>, String> subtypeToLabel = new LinkedHashMap<Class<?>, String>();
+    private final boolean maintainType;
+
+    private RuntimeTypeAdapterFactory(Class<?> baseType, String typeFieldName, boolean maintainType) {
+        if (typeFieldName == null || baseType == null) {
+            throw new NullPointerException();
+        }
+        this.baseType = baseType;
+        this.typeFieldName = typeFieldName;
+        this.maintainType = maintainType;
+    }
+
+    /**
+     * Creates a new runtime type adapter using for {@code baseType} using {@code
+     * typeFieldName} as the type field name. Type field names are case sensitive.
+     * {@code maintainType} flag decide if the type will be stored in pojo or not.
+     */
+    public static <T> RuntimeTypeAdapterFactory<T> of(Class<T> baseType, String typeFieldName, boolean maintainType) {
+        return new RuntimeTypeAdapterFactory<T>(baseType, typeFieldName, maintainType);
+    }
+
+    /**
+     * Creates a new runtime type adapter using for {@code baseType} using {@code
+     * typeFieldName} as the type field name. Type field names are case sensitive.
+     */
+    public static <T> RuntimeTypeAdapterFactory<T> of(Class<T> baseType, String typeFieldName) {
+        return new RuntimeTypeAdapterFactory<T>(baseType, typeFieldName, false);
+    }
+
+    /**
+     * Creates a new runtime type adapter for {@code baseType} using {@code "type"}
+     * as
+     * the type field name.
+     */
+    public static <T> RuntimeTypeAdapterFactory<T> of(Class<T> baseType) {
+        return new RuntimeTypeAdapterFactory<T>(baseType, "type", false);
+    }
+
+    /**
+     * Registers {@code type} identified by {@code label}. Labels are case
+     * sensitive.
+     *
+     * @throws IllegalArgumentException
+     *             if either {@code type} or {@code label}
+     *             have already been registered on this type adapter.
+     */
+    public RuntimeTypeAdapterFactory<T> registerSubtype(Class<? extends T> type, String label) {
+        if (type == null || label == null) {
+            throw new NullPointerException();
+        }
+        if (subtypeToLabel.containsKey(type) || labelToSubtype.containsKey(label)) {
+            throw new IllegalArgumentException("types and labels must be unique");
+        }
+        labelToSubtype.put(label, type);
+        subtypeToLabel.put(type, label);
+        return this;
+    }
+
+    /**
+     * Registers {@code type} identified by its {@link Class#getSimpleName simple
+     * name}. Labels are case sensitive.
+     *
+     * @throws IllegalArgumentException
+     *             if either {@code type} or its simple name
+     *             have already been registered on this type adapter.
+     */
+    public RuntimeTypeAdapterFactory<T> registerSubtype(Class<? extends T> type) {
+        return registerSubtype(type, type.getSimpleName());
+    }
+
+    public <R> TypeAdapter<R> create(Gson gson, TypeToken<R> type) {
+        if (type.getRawType() != baseType) {
+            return null;
+        }
+
+        final Map<String, TypeAdapter<?>> labelToDelegate = new LinkedHashMap<String, TypeAdapter<?>>();
+        final Map<Class<?>, TypeAdapter<?>> subtypeToDelegate = new LinkedHashMap<Class<?>, TypeAdapter<?>>();
+        for (Map.Entry<String, Class<?>> entry : labelToSubtype.entrySet()) {
+            TypeAdapter<?> delegate = gson.getDelegateAdapter(this, TypeToken.get(entry.getValue()));
+            labelToDelegate.put(entry.getKey(), delegate);
+            subtypeToDelegate.put(entry.getValue(), delegate);
+        }
+
+        return new TypeAdapter<R>() {
+            @Override
+            public R read(JsonReader in) throws IOException {
+                JsonElement jsonElement = Streams.parse(in);
+                JsonElement labelJsonElement;
+                if (maintainType) {
+                    labelJsonElement = jsonElement.getAsJsonObject().get(typeFieldName);
+                } else {
+                    labelJsonElement = jsonElement.getAsJsonObject().remove(typeFieldName);
+                }
+
+                if (labelJsonElement == null) {
+                    throw new JsonParseException("cannot deserialize " + baseType
+                            + " because it does not define a field named " + typeFieldName);
+                }
+                String label = labelJsonElement.getAsString();
+                @SuppressWarnings("unchecked") // registration requires that subtype extends T
+                TypeAdapter<R> delegate = (TypeAdapter<R>) labelToDelegate.get(label);
+                if (delegate == null) {
+                    throw new JsonParseException("cannot deserialize " + baseType + " subtype named " + label
+                            + "; did you forget to register a subtype?");
+                }
+                return delegate.fromJsonTree(jsonElement);
+            }
+
+            @Override
+            public void write(JsonWriter out, R value) throws IOException {
+                Class<?> srcType = value.getClass();
+                String label = subtypeToLabel.get(srcType);
+                @SuppressWarnings("unchecked") // registration requires that subtype extends T
+                TypeAdapter<R> delegate = (TypeAdapter<R>) subtypeToDelegate.get(srcType);
+                if (delegate == null) {
+                    throw new JsonParseException(
+                            "cannot serialize " + srcType.getName() + "; did you forget to register a subtype?");
+                }
+                JsonObject jsonObject = delegate.toJsonTree(value).getAsJsonObject();
+
+                if (maintainType) {
+                    Streams.write(jsonObject, out);
+                    return;
+                }
+
+                JsonObject clone = new JsonObject();
+
+                if (jsonObject.has(typeFieldName)) {
+                    throw new JsonParseException("cannot serialize " + srcType.getName()
+                            + " because it already defines a field named " + typeFieldName);
+                }
+                clone.add(typeFieldName, new JsonPrimitive(label));
+
+                for (Map.Entry<String, JsonElement> e : jsonObject.entrySet()) {
+                    clone.add(e.getKey(), e.getValue());
+                }
+                Streams.write(clone, out);
+            }
+        }.nullSafe();
+    }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 9450b05..55b36c0 100644
--- a/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -379,12 +379,12 @@ public class ConnectProcessor {
             ctx.getState().setError("Doris process failed: " + e.getMessage());
         } catch (Throwable e) {
             // Catch all throwable.
-            // If reach here, maybe palo bug.
+            // If reach here, maybe Doris bug.
             LOG.warn("Process one query failed because unknown reason: ", e);
             ctx.getState().setError("Unexpected exception: " + e.getMessage());
         }
         // no matter the master execute success or fail, the master must transfer the result to follower
-        // and tell the follwer the current jounalID.
+        // and tell the follower the current jounalID.
         TMasterOpResult result = new TMasterOpResult();
         result.setMaxJournalId(Catalog.getInstance().getMaxJournalId().longValue());
         result.setPacket(getResultPacket());
diff --git a/fe/src/main/java/org/apache/doris/resource/Resource.java b/fe/src/main/java/org/apache/doris/resource/Resource.java
new file mode 100644
index 0000000..cf07428
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/resource/Resource.java
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource;
+
+import com.google.gson.annotations.SerializedName;
+
+/*
+ * Resource is the collective name of the nodes that provide various service capabilities in Doris cluster.
+ * Each resource has a unique ID.
+ * A resource may have one or more tags that represent the functional properties or custom groupings of a resource, etc.
+ * eg:
+ *      Backend, Frontend, Broker, RemoteStorage
+ */
+public abstract class Resource {
+    @SerializedName(value = "id")
+    protected long id;
+    @SerializedName(value = "tagSet")
+    protected TagSet tagSet;
+
+    public Resource(long id, TagSet tagSet) {
+        this.tagSet = tagSet;
+    }
+
+    public long getId() {
+        return id;
+    }
+
+    public TagSet getTagSet() {
+        return tagSet;
+    }
+
+    public void addTag(Tag tag) {
+        tagSet.addTag(tag);
+    }
+
+    public void setTag(Tag tag) {
+        TagSet newTagSet = TagSet.create(tag);
+        tagSet.substituteMerge(newTagSet);
+    }
+
+    public void setTagSet(TagSet tagSet) {
+        this.tagSet = tagSet;
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/resource/Tag.java b/fe/src/main/java/org/apache/doris/resource/Tag.java
new file mode 100644
index 0000000..b146466
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/resource/Tag.java
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource;
+
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Objects;
+
+/*
+ * A Tag consists of type and value.
+ * Tag type and value are both case insensitive, and represented in lower case.
+ * Tag is printed as { "type": "value" }
+ * 
+ * Type is mainly used to categorize a tag. For example, users can customize a certain type of tag. 
+ * And these tags all use the same type. So user can quickly find this type of tags by the type.
+ * Doris reserves several built-in types:
+ *     ROLE: the role of resource, such as FRONTEND, BACKEND, BROKER
+ *     FUNCTION: the function of a tag, such as STORAGE, COMPUTATION
+ *     LOCATION: A type of tags representing location information.
+ *     
+ * Value is customized. And Doris also reserves several built-in values for built-in types:
+ *     FRONTEND, BACKEND, BROKER of type ROLE.
+ *     REMOTE_STORAGE, STORAGE, COMPUTATION for type FUNCTION.
+ * 
+ * A Tag is immutable once it being created.
+ */
+public class Tag implements Writable {
+
+    public static final String TYPE_ROLE = "role";
+    public static final String TYPE_FUNCATION = "function";
+    public static final String TYPE_LOCATION = "location";
+
+    public static final String VALUE_FRONTEND = "frontend";
+    public static final String VALUE_BACKEND = "backend";
+    public static final String VALUE_BROKER = "broker";
+    public static final String VALUE_REMOTE_STORAGE = "remote_storage";
+    public static final String VALUE_STORE = "store";
+    public static final String VALUE_COMPUTATION = "computation";
+    public static final String VALUE_DEFAULT_CLUSTER = "default_cluster";
+
+    public static final ImmutableSet<String> RESERVED_TAG_TYPE = ImmutableSet.of(
+            TYPE_ROLE, TYPE_FUNCATION, TYPE_LOCATION);
+    public static final ImmutableSet<String> RESERVED_TAG_VALUES = ImmutableSet.of(
+            VALUE_FRONTEND, VALUE_BACKEND, VALUE_BROKER, VALUE_REMOTE_STORAGE, VALUE_STORE, VALUE_COMPUTATION,
+            VALUE_DEFAULT_CLUSTER);
+    private static final String TAG_REGEX = "^[a-z][a-z0-9_]{0,32}$";
+
+    @SerializedName(value = "type")
+    public String type;
+    @SerializedName(value = "value")
+    public String value;
+
+    private Tag(String type, String val) {
+        this.type = type.toLowerCase();
+        this.value = val.toLowerCase();
+    }
+
+    public static Tag create(String type, String value) throws AnalysisException {
+        if (!type.matches(TAG_REGEX) || !value.matches(TAG_REGEX)) {
+            throw new AnalysisException("Invalid tag format: " + type + ":" + value);
+        }
+        return new Tag(type, value);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(type, value);
+    }
+    
+    @Override
+    public boolean equals(Object other) {
+        if (other == this) return true;
+        if (!(other instanceof Tag)) {
+            return false;
+        }
+        Tag otherTag = (Tag) other;
+        return type.equalsIgnoreCase(otherTag.type) && value.equalsIgnoreCase(otherTag.value);
+    }
+
+    @Override
+    public String toString() {
+        return "{\"" + type.toString() + "\" : \"" + value + "\"}";
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        Text.writeString(out, GsonUtils.GSON.toJson(this));
+    }
+
+    public static Tag read(DataInput in) throws IOException {
+        String json = Text.readString(in);
+        return GsonUtils.GSON.fromJson(json, Tag.class);
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/resource/TagManager.java b/fe/src/main/java/org/apache/doris/resource/TagManager.java
new file mode 100644
index 0000000..41bef76
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/resource/TagManager.java
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/*
+ * TagManager maintains 2 indexes:
+ *      one is from tag to resource.
+ *      one is from resource to tags 
+ * The caller can get a set of resources based on a given set of Tags
+ */
+public class TagManager implements Writable {
+    // tag -> set of resource id
+    private HashMultimap<Tag, Long> tagIndex = HashMultimap.create();
+
+    @SerializedName(value = "resourceIndex")
+    // resource id -> tag set
+    private Map<Long, TagSet> resourceIndex = Maps.newHashMap();
+
+    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public TagManager() {
+        // TODO Auto-generated constructor stub
+    }
+
+    public boolean addResourceTag(Long resourceId, Tag tag) {
+        lock.writeLock().lock();
+        try {
+            if (resourceIndex.containsKey(resourceId)) {
+                resourceIndex.get(resourceId).addTag(tag);
+            } else {
+                resourceIndex.put(resourceId, TagSet.create(tag));
+            }
+
+            return tagIndex.put(tag, resourceId);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    public void addResourceTags(Long resourceId, TagSet tagSet) {
+        if (tagSet.isEmpty()) {
+            return;
+        }
+        lock.writeLock().lock();
+        try {
+            TagSet existTagSet = resourceIndex.get(resourceId);
+            if (existTagSet == null) {
+                existTagSet = TagSet.create();
+                resourceIndex.put(resourceId, existTagSet);
+            }
+            existTagSet.union(tagSet);
+            for (Tag tag : tagSet.getAllTags()) {
+                tagIndex.put(tag, resourceId);
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    // remove a resource and all its corresponding tags.
+    // return true if resource exist.
+    public boolean removeResource(Long resourceId) {
+        lock.writeLock().lock();
+        try {
+            TagSet tagSet = resourceIndex.remove(resourceId);
+            if (tagSet != null) {
+                for (Tag tag : tagSet.getAllTags()) {
+                    tagIndex.remove(tag, resourceId);
+                }
+                return true;
+            }
+            return false;
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    // remove a tag from specified resource.
+    // return true if resource with this tag exist.
+    public boolean removeResourceTag(Long resourceId, Tag tag) {
+        lock.writeLock().lock();
+        try {
+            if (resourceIndex.containsKey(resourceId)) {
+                TagSet tagSet = resourceIndex.get(resourceId);
+                boolean res = tagSet.deleteTag(tag);
+                if (tagSet.isEmpty()) {
+                    resourceIndex.remove(resourceId);
+                }
+
+                tagIndex.remove(tag, resourceId);
+                return res;
+            }
+            return false;
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    // remove set of tags from specified resource.
+    public void removeResourceTags(Long resourceId, TagSet tagSet) {
+        lock.writeLock().lock();
+        try {
+            if (resourceIndex.containsKey(resourceId)) {
+                TagSet existingTagSet = resourceIndex.get(resourceId);
+                for (Tag tag : tagSet.getAllTags()) {
+                    existingTagSet.deleteTag(tag);
+                    tagIndex.remove(tag, resourceId);
+                }
+
+                if (tagSet.isEmpty()) {
+                    resourceIndex.remove(resourceId);
+                }
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    public Set<Long> getResourceIdsByTag(Tag tag) {
+        lock.readLock().lock();
+        try {
+            return Sets.newHashSet(tagIndex.get(tag));
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    // get resource ids by the given set of tags.
+    // The relationship between these tags is "AND".
+    // return a empty set if tag is empty or no resource meets requirement.
+    public Set<Long> getResourceIdsByTags(TagSet tagSet) {
+        if (tagSet.isEmpty()) {
+            return Sets.newHashSet();
+        }
+        lock.readLock().lock();
+        try {
+            Set<Long> res = null;
+            Set<Tag> tags = tagSet.getAllTags();
+            for (Tag tag : tags) {
+                if (res == null) {
+                    res = Sets.newHashSet(tagIndex.get(tag));
+                } else {
+                    res.retainAll(tagIndex.get(tag));
+                }
+                if (res.isEmpty()) {
+                    // if the result is already empty, break immediately
+                    break;
+                }
+            }
+            return res == null ? Sets.newHashSet() : res;
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    // when replayed from edit log, tagIndex need to be built based on resourceIndex
+    private void rebuildTagIndex() {
+        for (Map.Entry<Long, TagSet> entry : resourceIndex.entrySet()) {
+            long resourceId = entry.getKey();
+            for (Tag tag : entry.getValue().getAllTags()) {
+                tagIndex.put(tag, resourceId);
+            }
+        }
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        String json = GsonUtils.GSON.toJson(this);
+        Text.writeString(out, json);
+    }
+
+    public static TagManager read(DataInput in) throws IOException {
+        String json = Text.readString(in);
+        TagManager tagManager = GsonUtils.GSON.fromJson(json, TagManager.class);
+        tagManager.rebuildTagIndex();
+        return tagManager;
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/resource/TagSet.java b/fe/src/main/java/org/apache/doris/resource/TagSet.java
new file mode 100644
index 0000000..5e55bf4
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/resource/TagSet.java
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource;
+
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/*
+ * TagSet represents a set of tags.
+ * TagSet is printed as { "type1" : "value1,value2", "type2" : "value1" }
+ * TagSet is mutable and not thread safe
+ */
+public class TagSet implements Writable {
+    public static final TagSet EMPTY_TAGSET = new TagSet();
+
+    @SerializedName(value = "tags")
+    private Set<Tag> tags = Sets.newHashSet();
+
+    private TagSet() {
+    }
+
+    private TagSet(TagSet other) {
+        for (Tag tag : other.getAllTags()) {
+            tags.add(tag);
+        }
+    }
+
+    // create TagSet from tag map: { "type1" : "tag1,tag2", "type2" : "tag1" }
+    public static TagSet create(Map<String, String> tagsMap) throws AnalysisException {
+        TagSet tagSet = new TagSet();
+        for (Map.Entry<String, String> entry : tagsMap.entrySet()) {
+            String typeStr = entry.getKey();
+            String tagsStr = entry.getValue();
+            String[] tagParts = tagsStr.split(",");
+            for (String tagPart : tagParts) {
+                Tag tag = Tag.create(typeStr, tagPart.trim());
+                tagSet.addTag(tag);
+            }
+        }
+        return tagSet;
+    }
+
+    // create from a single tag
+    public static TagSet create(String type, String tagName) throws AnalysisException {
+        TagSet tagSet = new TagSet();
+        Tag tag = Tag.create(type, tagName.trim());
+        tagSet.addTag(tag);
+        return tagSet;
+    }
+
+    // create from multi tags
+    public static TagSet create(Tag... tags) {
+        TagSet tagSet = new TagSet();
+        for (Tag tag : tags) {
+            tagSet.addTag(tag);
+        }
+        return tagSet;
+    }
+
+    public static TagSet copyFrom(TagSet other) {
+        TagSet tagSet = new TagSet();
+        for (Tag tag : other.tags) {
+            tagSet.addTag(tag);
+        }
+        return tagSet;
+    }
+
+    // return true if tag doesn't exist
+    public boolean addTag(Tag tag) {
+        return this.tags.add(tag);
+    }
+
+    // return true if tag exist
+    public boolean deleteTag(Tag tag) {
+        return tags.remove(tag);
+    }
+
+    // get a set of tags by tag type
+    public TagSet getTagsByType(String type) {
+        type = type.toLowerCase();
+        TagSet tagSet = new TagSet();
+        for (Tag tag : tags) {
+            if (tag.type.equals(type)) {
+                tagSet.addTag(tag);
+            }
+        }
+        return tagSet;
+    }
+
+    public boolean containsTag(Tag tag) {
+        return tags.contains(tag);
+    }
+
+    // the result is the union of 2 sets.
+    public void union(TagSet other) {
+        for (Tag tag : other.tags) {
+            addTag(tag);
+        }
+    }
+
+    // return all types in this tag set
+    public Set<String> getTypes() {
+        Set<String> set = Sets.newHashSet();
+        for (Tag tag : tags) {
+            set.add(tag.type);
+        }
+        return set;
+    }
+
+    // delete all tags of specified type
+    private void deleteType(String type) {
+        final String lowerType = type.toLowerCase();
+        tags = tags.stream().filter(t -> !t.type.equals(lowerType)).collect(Collectors.toSet());
+    }
+
+    // merge 2 tag sets, but all types of tag in target tag set will be substituted by type in 'other' tagset
+    // eg:
+    // tagset A: { "type1" : "val1,val2", "type2" : "val1" }
+    // tagset B: { "type1" : "val3", "type3" : "val4" }
+    // result of A.substituteMerge(B): { "type1" : "val3", "type2" : "val1", "type3" : "val4" }
+    public void substituteMerge(TagSet other) {
+        Set<String> types = other.getTypes();
+        for (String type : types) {
+            deleteType(type);
+            union(other.getTagsByType(type));
+        }
+    }
+
+    public Set<Tag> getAllTags() {
+        return tags;
+    }
+
+    public boolean isEmpty() {
+        return tags.isEmpty();
+    }
+
+    // print as { "type1" : "tag1,tag2", "type2" : "tag1" }
+    @Override
+    public String toString() {
+        Map<String, String> map = Maps.newHashMap();
+        Gson gson = new Gson();
+        for (String type : getTypes()) {
+            TagSet tagSet = getTagsByType(type);
+            if (!tagSet.isEmpty()) {
+                map.put(type, Joiner.on(",").join(
+                        tagSet.getAllTags().stream().map(t -> t.value).collect(Collectors.toList())));
+            }
+        }
+        return gson.toJson(map);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(tags);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (!(obj instanceof TagSet)) {
+            return false;
+        }
+        TagSet tagSet = (TagSet) obj;
+        return tags.equals(tagSet.tags);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        Text.writeString(out, GsonUtils.GSON.toJson(this));
+    }
+
+    public static TagSet read(DataInput in) throws IOException {
+        String json = Text.readString(in);
+        return GsonUtils.GSON.fromJson(json, TagSet.class);
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index e57bfd8..f1b41b3 100644
--- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -138,7 +138,7 @@ public class GlobalTransactionMgr implements Writable {
     public long beginTransaction(long dbId, String label, TUniqueId requestId,
             String coordinator, LoadJobSourceType sourceType, long listenerId, long timeoutSecond)
             throws AnalysisException, LabelAlreadyUsedException, BeginTransactionException, DuplicatedRequestException {
-        
+
         if (Config.disable_load_job) {
             throw new AnalysisException("disable_load_job is set to true, all load jobs are prevented");
         }
diff --git a/fe/src/test/java/org/apache/doris/persist/gson/GsonDerivedClassSerializationTest.java b/fe/src/test/java/org/apache/doris/persist/gson/GsonDerivedClassSerializationTest.java
new file mode 100644
index 0000000..825da16
--- /dev/null
+++ b/fe/src/test/java/org/apache/doris/persist/gson/GsonDerivedClassSerializationTest.java
@@ -0,0 +1,155 @@
+package org.apache.doris.persist.gson;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils.HiddenAnnotationExclusionStrategy;
+
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.SerializedName;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+/*
+ * This unit test shows how to serialize and deserialize inherited class.
+ * 
+ * ParentClass is the parent class of 2 derived classes:
+ *      ChildClassA
+ *      ChildClassB
+ *      
+ * User need to create a RuntimeTypeAdapterFactory for ParentClass and
+ * register 2 derived classes to the factory. And then register the factory
+ * to the GsonBuilder to create GSON instance.
+ * 
+ * Notice that there is a special field "clazz" in ParentClass. This field is used
+ * to help the RuntimeTypeAdapterFactory to distinguish the kind of derived class.
+ * This field's name should be specified when creating the RuntimeTypeAdapterFactory.
+ * 
+ */
+public class GsonDerivedClassSerializationTest {
+    private static String fileName = "./GsonDerivedClassSerializationTest";
+    
+    @After
+    public void tearDown() {
+        File file = new File(fileName);
+        file.delete();
+    }
+
+    public static class ParentClass implements Writable {
+        @SerializedName(value = "flag")
+        public int flag = 0;
+        @SerializedName(value = "clazz")
+        public String clazz; // a specified field to save the type of derived classed
+
+        public ParentClass(int flag, String clazz) {
+            this.flag = flag;
+            this.clazz = clazz;
+        }
+
+        @Override
+        public void write(DataOutput out) throws IOException {
+            String json = TEST_GSON.toJson(this);
+            System.out.println("write: " + json);
+            Text.writeString(out, json);
+        }
+
+        public static ParentClass read(DataInput in) throws IOException {
+            String json = Text.readString(in);
+            System.out.println("read: " + json);
+            return TEST_GSON.fromJson(json, ParentClass.class);
+        }
+    }
+
+    public static class ChildClassA extends ParentClass {
+        @SerializedName(value = "tag")
+        public String tagA;
+
+        public ChildClassA(int flag, String tag) {
+            // pass "ChildClassA.class.getSimpleName()" to field "clazz"
+            super(flag, ChildClassA.class.getSimpleName());
+            this.tagA = tag;
+        }
+    }
+
+    public static class ChildClassB extends ParentClass {
+        @SerializedName(value = "mapB")
+        public Map<Long, String> mapB = Maps.newConcurrentMap();
+
+        public ChildClassB(int flag) {
+            // pass "ChildClassB.class.getSimpleName()" to field "clazz"
+            super(flag, ChildClassB.class.getSimpleName());
+            this.mapB.put(1L, "B1");
+            this.mapB.put(2L, "B2");
+        }
+    }
+
+    private static RuntimeTypeAdapterFactory<ParentClass> runtimeTypeAdapterFactory = RuntimeTypeAdapterFactory
+            // the "clazz" here is the name of "clazz" field in ParentClass.
+            .of(ParentClass.class, "clazz")
+            // register 2 derived classes, the second parameter must be same to the value of field "clazz"
+            .registerSubtype(ChildClassA.class, ChildClassA.class.getSimpleName())
+            .registerSubtype(ChildClassB.class, ChildClassB.class.getSimpleName());
+
+    private static Gson TEST_GSON = new GsonBuilder()
+            .addSerializationExclusionStrategy(new HiddenAnnotationExclusionStrategy())
+            .enableComplexMapKeySerialization()
+            // register the RuntimeTypeAdapterFactory
+            .registerTypeAdapterFactory(runtimeTypeAdapterFactory)
+            .create();
+
+    @Test
+    public void testDerivedClassA() throws IOException {
+        // 1. Write objects to file
+        File file = new File(fileName);
+        file.createNewFile();
+        DataOutputStream out = new DataOutputStream(new FileOutputStream(file));
+
+        ChildClassA childClassA = new ChildClassA(1, "A");
+        childClassA.write(out);
+        out.flush();
+        out.close();
+
+        // 2. Read objects from file
+        DataInputStream in = new DataInputStream(new FileInputStream(file));
+        ParentClass parentClass = ParentClass.read(in);
+        Assert.assertTrue(parentClass instanceof ChildClassA);
+        Assert.assertEquals(1, ((ChildClassA) parentClass).flag);
+        Assert.assertEquals("A", ((ChildClassA) parentClass).tagA);
+    }
+
+    @Test
+    public void testDerivedClassB() throws IOException {
+        // 1. Write objects to file
+        File file = new File(fileName);
+        file.createNewFile();
+        DataOutputStream out = new DataOutputStream(new FileOutputStream(file));
+
+        ChildClassB childClassB = new ChildClassB(2);
+        childClassB.write(out);
+        out.flush();
+        out.close();
+
+        // 2. Read objects from file
+        DataInputStream in = new DataInputStream(new FileInputStream(file));
+        ParentClass parentClass = ParentClass.read(in);
+        Assert.assertTrue(parentClass instanceof ChildClassB);
+        Assert.assertEquals(2, ((ChildClassB) parentClass).flag);
+        Assert.assertEquals(2, ((ChildClassB) parentClass).mapB.size());
+        Assert.assertEquals("B1", ((ChildClassB) parentClass).mapB.get(1L));
+        Assert.assertEquals("B2", ((ChildClassB) parentClass).mapB.get(2L));
+    }
+
+}
diff --git a/fe/src/test/java/org/apache/doris/persist/gson/GsonSerializationTest.java b/fe/src/test/java/org/apache/doris/persist/gson/GsonSerializationTest.java
new file mode 100644
index 0000000..c89709f
--- /dev/null
+++ b/fe/src/test/java/org/apache/doris/persist/gson/GsonSerializationTest.java
@@ -0,0 +1,433 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.persist.gson;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonSerializationTest.Key.MyEnum;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Table;
+import com.google.gson.annotations.SerializedName;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/*
+ * This unit test provides examples about how to make a class serializable.
+ * 
+ *    "OrigClassA" is a class includes user-defined class "InnerClassA".
+ *    And "InnerClassA" includes some collections which contain another user-defined class "InnerClassB".
+ *    
+ *    And there are 2 other classes "OriginClassADifferentMembers" and "OriginClassADifferentMemberName".
+ *    "OriginClassADifferentMembers" shows how to add/remove members of a serializable class.
+ *    "OriginClassADifferentMemberName" shows how to modify members' name of a serializable class.
+ *    
+ *    Every fields which need to be serialized should be with annotation @SerializedName.
+ *    @SerializedName has 2 attributes:
+ *      1. value(required): the name of this field in Json string.
+ *      2. alternate(optional): if we want to use new name for a field and its value in annotation, use alternate.
+ *    
+ */
+public class GsonSerializationTest {
+    private static String fileName = "./GsonSerializationTest";
+
+    public static class OrigClassA implements Writable {
+        @SerializedName(value = "classA1")
+        public InnerClassA classA1;
+        public InnerClassA ignoreClassA2;
+        @SerializedName(value = "flag")
+        public int flag = 0;
+
+        public OrigClassA(int flag) {
+            this.flag = flag;
+            classA1 = new InnerClassA(1);
+            ignoreClassA2 = new InnerClassA(2);
+        }
+
+        @Override
+        public void write(DataOutput out) throws IOException {
+            String json = GsonUtils.GSON.toJson(this);
+            System.out.println(json);
+            Text.writeString(out, json);
+        }
+
+        public static OrigClassA read(DataInput in) throws IOException {
+            String json = Text.readString(in);
+            System.out.println(json);
+            return GsonUtils.GSON.fromJson(json, OrigClassA.class);
+        }
+    }
+
+    public static class InnerClassA implements Writable {
+        @SerializedName(value = "list1")
+        public List<String> list1 = Lists.newArrayList();
+        @SerializedName(value = "map1")
+        public Map<Long, String> map1 = Maps.newHashMap();
+        @SerializedName(value = "map2")
+        public Map<Integer, InnerClassB> map2 = Maps.newHashMap();
+        @SerializedName(value = "set1")
+        public Set<String> set1 = Sets.newHashSet();
+        @SerializedName(value = "flag")
+        public int flag = 0;
+
+        public InnerClassA(int flag) {
+            list1.add("string1");
+            list1.add("string2");
+            
+            map1.put(1L, "value1");
+            map1.put(2L, "value2");
+            
+            map2.put(1, new InnerClassB(1));
+            map2.put(2, new InnerClassB(2));
+
+            set1.add("set1");
+            set1.add("set2");
+
+            this.flag = flag;
+        }
+
+        @Override
+        public void write(DataOutput out) throws IOException {
+            Text.writeString(out, GsonUtils.GSON.toJson(this));
+        }
+
+        public static InnerClassA read(DataInput in) throws IOException {
+            return GsonUtils.GSON.fromJson(Text.readString(in), InnerClassA.class);
+        }
+    }
+
+    public static class InnerClassB implements Writable {
+        @SerializedName(value = "flag")
+        public int flag = 0;
+        @SerializedName(value = "hashMultimap")
+        public Multimap<Long, String> hashMultimap = HashMultimap.create();
+        @SerializedName(value = "hashBasedTable")
+        public Table<Long, String, Long> hashBasedTable = HashBasedTable.create();
+        @SerializedName(value = "arrayListMultimap")
+        public Multimap<Long, String> arrayListMultimap = ArrayListMultimap.create();
+
+        public int ignoreField = 0;
+
+        public InnerClassB(int flag) {
+            this.flag = flag;
+
+            this.hashMultimap.put(1L, "string1");
+            this.hashMultimap.put(1L, "string2");
+            this.hashMultimap.put(2L, "string3");
+
+            this.hashBasedTable.put(1L, "col1", 1L);
+            this.hashBasedTable.put(2L, "col2", 2L);
+            
+            this.arrayListMultimap.put(1L, "value1");
+            this.arrayListMultimap.put(1L, "value2");
+
+            this.ignoreField = flag;
+        }
+
+        @Override
+        public void write(DataOutput out) throws IOException {
+            Text.writeString(out, GsonUtils.GSON.toJson(this));
+        }
+
+        public static InnerClassB read(DataInput in) throws IOException {
+            return GsonUtils.GSON.fromJson(Text.readString(in), InnerClassB.class);
+        }
+    }
+
+    // same as OriginClassA, but:
+    // 1. without member classA1;
+    // 2. add a new member classA3;
+    public static class OriginClassADifferentMembers implements Writable {
+        @SerializedName(value = "classA3")
+        public InnerClassA classA3;
+        public InnerClassA ignoreClassA2;
+        @SerializedName(value = "flag")
+        public int flag = 0;
+
+        public OriginClassADifferentMembers(int flag) {
+            this.flag = flag;
+            classA3 = new InnerClassA(3);
+            ignoreClassA2 = new InnerClassA(2);
+        }
+
+        @Override
+        public void write(DataOutput out) throws IOException {
+            String json = GsonUtils.GSON.toJson(this);
+            System.out.println(json);
+            Text.writeString(out, json);
+        }
+
+        public static OriginClassADifferentMembers read(DataInput in) throws IOException {
+            String json = Text.readString(in);
+            System.out.println(json);
+            return GsonUtils.GSON.fromJson(json, OriginClassADifferentMembers.class);
+        }
+    }
+
+    // same as OriginClassA, but:
+    // 1. change classA1's name to classA1ChangeName
+    // 2. change ignoreClassA2's name to ignoreClassA2ChangeName
+    // 3. change flag's name to flagChangeName, and also change its serialized name to flagChangeName
+    public static class OriginClassADifferentMemberName implements Writable {
+        @SerializedName(value = "classA1")
+        public InnerClassA classA1ChangeName;
+        public InnerClassA ignoreClassA2ChangeName;
+        @SerializedName(value = "flagChangeName", alternate = { "flag" })
+        public int flagChangeName = 0;
+
+        public OriginClassADifferentMemberName(int flag) {
+            this.flagChangeName = flag;
+            classA1ChangeName = new InnerClassA(1);
+            ignoreClassA2ChangeName = new InnerClassA(2);
+        }
+
+        @Override
+        public void write(DataOutput out) throws IOException {
+            String json = GsonUtils.GSON.toJson(this);
+            System.out.println(json);
+            Text.writeString(out, json);
+        }
+
+        public static OriginClassADifferentMemberName read(DataInput in) throws IOException {
+            String json = Text.readString(in);
+            System.out.println(json);
+            return GsonUtils.GSON.fromJson(json, OriginClassADifferentMemberName.class);
+        }
+    }
+    
+    @After
+    public void tearDown() {
+        File file = new File(fileName);
+        file.delete();
+    }
+
+    /*
+     * Test write read with same classes.
+     */
+    @Test
+    public void testNormal() throws IOException {
+        // 1. Write objects to file
+        File file = new File(fileName);
+        file.createNewFile();
+        DataOutputStream out = new DataOutputStream(new FileOutputStream(file));
+
+        OrigClassA classA = new OrigClassA(1);
+        classA.write(out);
+        out.flush();
+        out.close();
+
+        // 2. Read objects from file
+        DataInputStream in = new DataInputStream(new FileInputStream(file));
+
+        OrigClassA readClassA = OrigClassA.read(in);
+        Assert.assertEquals(1, readClassA.flag);
+        Assert.assertEquals(1, readClassA.classA1.flag);
+        Assert.assertNull(readClassA.ignoreClassA2);
+
+        Assert.assertEquals(Lists.newArrayList("string1", "string2"), readClassA.classA1.list1);
+        Assert.assertTrue(readClassA.classA1.map1.containsKey(1L));
+        Assert.assertTrue(readClassA.classA1.map1.containsKey(2L));
+        Assert.assertEquals("value1", readClassA.classA1.map1.get(1L));
+        Assert.assertEquals("value2", readClassA.classA1.map1.get(2L));
+
+        Assert.assertTrue(readClassA.classA1.map2.containsKey(1));
+        Assert.assertTrue(readClassA.classA1.map2.containsKey(2));
+        Assert.assertEquals(1, readClassA.classA1.map2.get(1).flag);
+        Assert.assertEquals(2, readClassA.classA1.map2.get(2).flag);
+        Assert.assertEquals(0, readClassA.classA1.map2.get(1).ignoreField);
+        Assert.assertEquals(0, readClassA.classA1.map2.get(2).ignoreField);
+        Assert.assertEquals(Sets.newHashSet("set1", "set2"), readClassA.classA1.set1);
+        
+        Table<Long, String, Long> hashBasedTable = readClassA.classA1.map2.get(1).hashBasedTable;
+        Assert.assertEquals("HashBasedTable", hashBasedTable.getClass().getSimpleName());
+        Multimap<Long, String> hashMultimap = readClassA.classA1.map2.get(1).hashMultimap;
+        Assert.assertEquals("HashMultimap", hashMultimap.getClass().getSimpleName());
+        Multimap<Long, String> arrayListMultimap = readClassA.classA1.map2.get(1).arrayListMultimap;
+        Assert.assertEquals("ArrayListMultimap", arrayListMultimap.getClass().getSimpleName());
+        Assert.assertEquals(Lists.newArrayList("value1", "value2"), arrayListMultimap.get(1L));
+
+        in.close();
+    }
+
+    /*
+     * Test write origin class, and read in new class with different members
+     */
+    @Test
+    public void testWithDifferentMembers() throws IOException {
+        // 1. Write objects to file
+        File file = new File(fileName);
+        file.createNewFile();
+        DataOutputStream out = new DataOutputStream(new FileOutputStream(file));
+
+        OrigClassA classA = new OrigClassA(1);
+        classA.write(out);
+        out.flush();
+        out.close();
+
+        // 2. Read objects from file
+        DataInputStream in = new DataInputStream(new FileInputStream(file));
+
+        OriginClassADifferentMembers readClassA = OriginClassADifferentMembers.read(in);
+        Assert.assertEquals(1, readClassA.flag);
+        Assert.assertNull(readClassA.classA3);
+        Assert.assertNull(readClassA.ignoreClassA2);
+        in.close();
+    }
+
+    /*
+     * Test write origin class, and read in new class with different member names
+     */
+    @Test
+    public void testWithDifferentMemberNames() throws IOException {
+        // 1. Write objects to file
+        File file = new File(fileName);
+        file.createNewFile();
+        DataOutputStream out = new DataOutputStream(new FileOutputStream(file));
+
+        OrigClassA classA = new OrigClassA(1);
+        classA.write(out);
+        out.flush();
+        out.close();
+
+        // 2. Read objects from file
+        DataInputStream in = new DataInputStream(new FileInputStream(file));
+
+        OriginClassADifferentMemberName readClassA = OriginClassADifferentMemberName.read(in);
+        Assert.assertEquals(1, readClassA.flagChangeName);
+        Assert.assertEquals(1, readClassA.classA1ChangeName.flag);
+        Assert.assertNull(readClassA.ignoreClassA2ChangeName);
+
+        Assert.assertEquals(Lists.newArrayList("string1", "string2"), readClassA.classA1ChangeName.list1);
+        Assert.assertTrue(readClassA.classA1ChangeName.map1.containsKey(1L));
+        Assert.assertTrue(readClassA.classA1ChangeName.map1.containsKey(2L));
+        Assert.assertEquals("value1", readClassA.classA1ChangeName.map1.get(1L));
+        Assert.assertEquals("value2", readClassA.classA1ChangeName.map1.get(2L));
+
+        Assert.assertTrue(readClassA.classA1ChangeName.map2.containsKey(1));
+        Assert.assertTrue(readClassA.classA1ChangeName.map2.containsKey(2));
+        Assert.assertEquals(1, readClassA.classA1ChangeName.map2.get(1).flag);
+        Assert.assertEquals(2, readClassA.classA1ChangeName.map2.get(2).flag);
+        Assert.assertEquals(0, readClassA.classA1ChangeName.map2.get(1).ignoreField);
+        Assert.assertEquals(0, readClassA.classA1ChangeName.map2.get(2).ignoreField);
+        Assert.assertEquals(Sets.newHashSet("set1", "set2"), readClassA.classA1ChangeName.set1);
+        in.close();
+    }
+
+    public static class MultiMapClassA implements Writable {
+        @SerializedName(value = "map")
+        public Multimap<Key, Long> map = HashMultimap.create();
+
+        public MultiMapClassA() {
+            map.put(new Key(MyEnum.TYPE_A, "key1"), 1L);
+            map.put(new Key(MyEnum.TYPE_B, "key2"), 2L);
+        }
+
+        @Override
+        public void write(DataOutput out) throws IOException {
+            String json = GsonUtils.GSON.toJson(this);
+            Text.writeString(out, json);
+        }
+
+        public static MultiMapClassA read(DataInput in) throws IOException {
+            String json = Text.readString(in);
+            MultiMapClassA classA = GsonUtils.GSON.fromJson(json, MultiMapClassA.class);
+            return classA;
+        }
+    }
+
+    public static class Key {
+        public enum MyEnum {
+            TYPE_A, TYPE_B
+        }
+
+        @SerializedName(value = "type")
+        public MyEnum type;
+        @SerializedName(value = "value")
+        public String value;
+
+        public Key(MyEnum type, String value) {
+            this.type = type;
+            this.value = value;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(type, value);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (!(obj instanceof Key)) {
+                return false;
+            }
+
+            if (this == obj) {
+                return true;
+            }
+
+            Key other = (Key) obj;
+            return other.type == this.type && other.value.equals(this.value);
+        }
+
+        @Override
+        public String toString() {
+            return type + ":" + value;
+        }
+    }
+
+    @Test
+    public void testMultiMapWithCustomKey() throws IOException {
+        // 1. Write objects to file
+        File file = new File(fileName);
+        file.createNewFile();
+        DataOutputStream out = new DataOutputStream(new FileOutputStream(file));
+
+        MultiMapClassA classA = new MultiMapClassA();
+        classA.write(out);
+        out.flush();
+        out.close();
+
+        // 2. Read objects from file
+        DataInputStream in = new DataInputStream(new FileInputStream(file));
+
+        MultiMapClassA readClassA = MultiMapClassA.read(in);
+        Assert.assertEquals(Sets.newHashSet(new Key(MyEnum.TYPE_A, "key1"), new Key(MyEnum.TYPE_B, "key2")),
+                readClassA.map.keySet());
+    }
+}
diff --git a/fe/src/test/java/org/apache/doris/resource/TagSerializationTest.java b/fe/src/test/java/org/apache/doris/resource/TagSerializationTest.java
new file mode 100644
index 0000000..98af1d3
--- /dev/null
+++ b/fe/src/test/java/org/apache/doris/resource/TagSerializationTest.java
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource;
+
+
+import org.apache.doris.common.AnalysisException;
+
+import com.google.common.collect.Sets;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+/*
+ * Author: Chenmingyu
+ * Date: Dec 9, 2019
+ */
+
+public class TagSerializationTest {
+
+    private static String fileName = "./TagSerializationTest";
+
+    @After
+    public void tearDown() {
+        File file = new File(fileName);
+        file.delete();
+    }
+
+    @Test
+    public void testSerializeTag() throws IOException, AnalysisException {
+        // 1. Write objects to file
+        File file = new File(fileName);
+        file.createNewFile();
+        DataOutputStream out = new DataOutputStream(new FileOutputStream(file));
+
+        Tag tag = Tag.create(Tag.TYPE_LOCATION, "rack1");
+        tag.write(out);
+        out.flush();
+        out.close();
+
+        // 2. Read objects from file
+        DataInputStream in = new DataInputStream(new FileInputStream(file));
+
+        Tag readTag = Tag.read(in);
+        Assert.assertEquals(tag, readTag);
+    }
+
+    @Test
+    public void testSerializeTagSet() throws IOException, AnalysisException {
+        // 1. Write objects to file
+        File file = new File(fileName);
+        file.createNewFile();
+        DataOutputStream out = new DataOutputStream(new FileOutputStream(file));
+
+        TagSet tagSet = TagSet.create(Tag.create(Tag.TYPE_LOCATION, "rack1"), Tag.create(Tag.TYPE_LOCATION, "rack2"),
+                Tag.create(Tag.TYPE_ROLE, "backend"));
+        tagSet.write(out);
+        out.flush();
+        out.close();
+
+        // 2. Read objects from file
+        DataInputStream in = new DataInputStream(new FileInputStream(file));
+
+        TagSet readTag = TagSet.read(in);
+        Assert.assertEquals(tagSet, readTag);
+    }
+
+    @Test
+    public void testSerializeTagManager() throws IOException, AnalysisException {
+        // 1. Write objects to file
+        File file = new File(fileName);
+        file.createNewFile();
+        DataOutputStream out = new DataOutputStream(new FileOutputStream(file));
+
+        TagManager tagManager = new TagManager();
+        tagManager.addResourceTag(1L, Tag.create(Tag.TYPE_LOCATION, "rack1"));
+        tagManager.addResourceTags(2L, TagSet.create( Tag.create(Tag.TYPE_LOCATION, "rack1"),  Tag.create(Tag.TYPE_LOCATION, "rack2")));
+        tagManager.write(out);
+        out.flush();
+        out.close();
+
+        // 2. Read objects from file
+        DataInputStream in = new DataInputStream(new FileInputStream(file));
+
+        TagManager readTagManager = TagManager.read(in);
+        Assert.assertEquals(Sets.newHashSet(1L, 2L), readTagManager.getResourceIdsByTag(Tag.create(Tag.TYPE_LOCATION, "rack1")));
+        Assert.assertEquals(Sets.newHashSet(2L), readTagManager.getResourceIdsByTags(TagSet.create(Tag.create(Tag.TYPE_LOCATION, "rack2"))));
+
+        in.close();
+    }
+}
diff --git a/fe/src/test/java/org/apache/doris/resource/TagTest.java b/fe/src/test/java/org/apache/doris/resource/TagTest.java
new file mode 100644
index 0000000..2fce1e2
--- /dev/null
+++ b/fe/src/test/java/org/apache/doris/resource/TagTest.java
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource;
+
+import org.apache.doris.common.AnalysisException;
+
+import com.google.common.collect.Maps;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class TagTest {
+
+    @Test(expected = AnalysisException.class)
+    public void testTagName1() throws AnalysisException {
+        Tag.create("location", "_tag1");
+    }
+
+    @Test(expected = AnalysisException.class)
+    public void testTagName2() throws AnalysisException {
+        Tag.create("location", "asdlajwdjdawhkjldjawlkdjawldjlkwasdasdsadasdd");
+    }
+
+    @Test
+    public void testTagName3() throws AnalysisException {
+        Tag.create("unknown", "test1");
+    }
+
+    @Test
+    public void testTagName4() throws AnalysisException {
+        Tag tag = Tag.create("location", "zone1");
+        Assert.assertEquals("{\"location\" : \"zone1\"}", tag.toString());
+    }
+
+    @Test
+    public void testTagSet1() throws AnalysisException {
+        Map<String, String> map = Maps.newHashMap();
+        map.put("location", "zone1, zone2");
+        map.put("unknown", "tag1, tag2");
+        TagSet tagSet = TagSet.create(map);
+    }
+
+    @Test(expected = AnalysisException.class)
+    public void testTagSet2() throws AnalysisException {
+        Map<String, String> map = Maps.newHashMap();
+        map.put("location", "zone1, zone2");
+        map.put("type", "tag1, _tag2");
+        TagSet tagSet = TagSet.create(map);
+    }
+
+    @Test
+    public void testTagSet3() throws AnalysisException {
+        Map<String, String> map = Maps.newHashMap();
+        map.put("location", "zone1, zone2");
+        map.put("type", "backend");
+        map.put("function", "store,computation");
+        TagSet tagSet = TagSet.create(map);
+        Assert.assertTrue(tagSet.containsTag(Tag.create("location", "zone1")));
+        Assert.assertTrue(tagSet.containsTag(Tag.create("location", "zone2")));
+        Assert.assertTrue(tagSet.containsTag(Tag.create("type", "backend")));
+        Assert.assertTrue(tagSet.containsTag(Tag.create("function", "store")));
+        Assert.assertTrue(tagSet.containsTag(Tag.create("function", "computation")));
+        Assert.assertFalse(tagSet.containsTag(Tag.create("function", "load")));
+
+        // test union
+        Map<String, String> map2 = Maps.newHashMap();
+        map2.put("function", "load");
+        TagSet tagSet2 = TagSet.create(map2);
+        tagSet.union(tagSet2);
+        Assert.assertTrue(tagSet.containsTag(Tag.create("function", "store")));
+        Assert.assertTrue(tagSet.containsTag(Tag.create("function", "computation")));
+        Assert.assertTrue(tagSet.containsTag(Tag.create("function", "load")));
+
+        // test substitute merge
+        tagSet.substituteMerge(tagSet2);
+        Assert.assertFalse(tagSet.containsTag(Tag.create("function", "store")));
+        Assert.assertFalse(tagSet.containsTag(Tag.create("function", "computation")));
+        Assert.assertTrue(tagSet.containsTag(Tag.create("function", "load")));
+    }
+
+    @Test
+    public void testTagManager() throws AnalysisException {
+        TagManager tagManager = new TagManager();
+        tagManager.addResourceTag(1L, Tag.create("location", "zone1"));
+        tagManager.addResourceTag(2L, Tag.create("location", "zone1"));
+        tagManager.addResourceTag(2L, Tag.create("location", "zone2"));
+        tagManager.addResourceTag(2L, Tag.create("function", "store"));
+
+        Assert.assertEquals(2, tagManager.getResourceIdsByTag(Tag.create("location", "zone1")).size());
+        Assert.assertEquals(0, tagManager.getResourceIdsByTag(Tag.create("location", "zone3")).size());
+        Map<String, String> map = Maps.newHashMap();
+        map.put("location", "zone1, zone2");
+        TagSet tagSet = TagSet.create(map);
+        Assert.assertEquals(1, tagManager.getResourceIdsByTags(tagSet).size());
+
+        tagManager.removeResourceTag(2L, Tag.create("location", "zone2"));
+        Assert.assertEquals(0, tagManager.getResourceIdsByTags(tagSet).size());
+
+        tagManager.addResourceTags(3L, tagSet);
+        tagManager.addResourceTags(4L, tagSet);
+        Assert.assertEquals(2, tagManager.getResourceIdsByTags(tagSet).size());
+
+        tagManager.removeResourceTags(4L, tagSet);
+        Assert.assertEquals(1, tagManager.getResourceIdsByTags(tagSet).size());
+
+        tagManager.removeResource(3L);
+        Assert.assertEquals(0, tagManager.getResourceIdsByTags(tagSet).size());
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org