You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/08/31 07:16:20 UTC

[skywalking-banyandb-java-client] 02/02: Initial project codebase.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb-java-client.git

commit 4575ecf012c130fe17197f3bbed7b7a1836a38a9
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Tue Aug 31 15:16:13 2021 +0800

    Initial project codebase.
---
 .gitignore                                         |  23 ++
 .mvn/wrapper/MavenWrapperDownloader.java           | 117 +++++++
 .mvn/wrapper/maven-wrapper.properties              |   3 +
 HEADER                                             |  14 +
 LICENSE                                            | 202 ++++++++++++
 NOTICE                                             |   5 +
 apm-checkstyle/CHECKSTYLE_HEAD                     |  18 ++
 apm-checkstyle/checkStyle.xml                      | 133 ++++++++
 apm-checkstyle/importControl.xml                   |  28 ++
 mvnw                                               | 310 ++++++++++++++++++
 mvnw.cmd                                           | 182 +++++++++++
 pom.xml                                            | 357 +++++++++++++++++++++
 .../banyandb/commons/datacarrier/DataCarrier.java  | 166 ++++++++++
 .../banyandb/commons/datacarrier/EnvUtil.java      |  50 +++
 .../buffer/ArrayBlockingQueueBuffer.java           |  67 ++++
 .../commons/datacarrier/buffer/Buffer.java         |  76 +++++
 .../commons/datacarrier/buffer/BufferStrategy.java |  23 ++
 .../commons/datacarrier/buffer/Channels.java       |  93 ++++++
 .../commons/datacarrier/buffer/QueueBuffer.java    |  46 +++
 .../datacarrier/common/AtomicRangeInteger.java     |  76 +++++
 .../datacarrier/consumer/BulkConsumePool.java      | 118 +++++++
 .../datacarrier/consumer/ConsumeDriver.java        | 137 ++++++++
 .../consumer/ConsumerCannotBeCreatedException.java |  25 ++
 .../commons/datacarrier/consumer/ConsumerPool.java |  30 ++
 .../datacarrier/consumer/ConsumerPoolFactory.java  |  50 +++
 .../datacarrier/consumer/ConsumerThread.java       | 105 ++++++
 .../commons/datacarrier/consumer/IConsumer.java    |  40 +++
 .../commons/datacarrier/consumer/IDriver.java      |  32 ++
 .../consumer/MultipleChannelsConsumer.java         | 124 +++++++
 .../datacarrier/partition/IDataPartitioner.java    |  32 ++
 .../partition/ProducerThreadPartitioner.java       |  37 +++
 .../partition/SimpleRollingPartitioner.java        |  37 +++
 .../banyandb/v1/client/BanyanDBClient.java         | 197 ++++++++++++
 .../banyandb/v1/client/BulkWriteProcessor.java     | 129 ++++++++
 .../skywalking/banyandb/v1/client/Field.java       | 157 +++++++++
 .../banyandb/v1/client/FieldAndValue.java          | 101 ++++++
 .../skywalking/banyandb/v1/client/Options.java     |  43 +++
 .../banyandb/v1/client/PairQueryCondition.java     | 307 ++++++++++++++++++
 .../skywalking/banyandb/v1/client/RowEntity.java   |  59 ++++
 .../banyandb/v1/client/SerializableField.java      |  29 ++
 .../banyandb/v1/client/TimestampRange.java         |  53 +++
 .../v1/client/TraceBulkWriteProcessor.java         | 102 ++++++
 .../skywalking/banyandb/v1/client/TraceQuery.java  | 137 ++++++++
 .../banyandb/v1/client/TraceQueryResponse.java     |  45 +++
 .../skywalking/banyandb/v1/client/TraceWrite.java  |  81 +++++
 src/main/proto/banyandb/v1/banyandb-trace.proto    | 107 ++++++
 src/main/proto/banyandb/v1/banyandb.proto          | 136 ++++++++
 .../v1/client/BanyanDBClientQueryTest.java         | 245 ++++++++++++++
 .../v1/client/BanyanDBClientWriteTest.java         | 155 +++++++++
 49 files changed, 4839 insertions(+)

diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..562e568
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,23 @@
+/build/
+target/
+.idea/
+*.iml
+.classpath
+.project
+.settings/
+.DS_Store
+*~
+packages/
+**/dependency-reduced-pom.xml
+**/dist/
+/docker/snapshot/*.gz
+.mvn/wrapper/*.jar
+.factorypath
+.vscode
+.checkstyle
+.externalToolBuilders
+/test/plugin/dist
+/test/plugin/workspace
+/test/jacoco/classes
+/test/jacoco/*.exec
+test/jacoco
\ No newline at end of file
diff --git a/.mvn/wrapper/MavenWrapperDownloader.java b/.mvn/wrapper/MavenWrapperDownloader.java
new file mode 100644
index 0000000..187216f
--- /dev/null
+++ b/.mvn/wrapper/MavenWrapperDownloader.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2007-present the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.net.*;
+import java.io.*;
+import java.nio.channels.*;
+import java.util.Properties;
+
+public class MavenWrapperDownloader {
+
+    private static final String WRAPPER_VERSION = "0.5.5";
+    /**
+     * Default URL to download the maven-wrapper.jar from, if no 'downloadUrl' is provided.
+     */
+    private static final String DEFAULT_DOWNLOAD_URL = "https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/"
+        + WRAPPER_VERSION + "/maven-wrapper-" + WRAPPER_VERSION + ".jar";
+
+    /**
+     * Path to the maven-wrapper.properties file, which might contain a downloadUrl property to
+     * use instead of the default one.
+     */
+    private static final String MAVEN_WRAPPER_PROPERTIES_PATH =
+        ".mvn/wrapper/maven-wrapper.properties";
+
+    /**
+     * Path where the maven-wrapper.jar will be saved to.
+     */
+    private static final String MAVEN_WRAPPER_JAR_PATH =
+        ".mvn/wrapper/maven-wrapper.jar";
+
+    /**
+     * Name of the property which should be used to override the default download url for the wrapper.
+     */
+    private static final String PROPERTY_NAME_WRAPPER_URL = "wrapperUrl";
+
+    public static void main(String args[]) {
+        System.out.println("- Downloader started");
+        File baseDirectory = new File(args[0]);
+        System.out.println("- Using base directory: " + baseDirectory.getAbsolutePath());
+
+        // If the maven-wrapper.properties exists, read it and check if it contains a custom
+        // wrapperUrl parameter.
+        File mavenWrapperPropertyFile = new File(baseDirectory, MAVEN_WRAPPER_PROPERTIES_PATH);
+        String url = DEFAULT_DOWNLOAD_URL;
+        if(mavenWrapperPropertyFile.exists()) {
+            FileInputStream mavenWrapperPropertyFileInputStream = null;
+            try {
+                mavenWrapperPropertyFileInputStream = new FileInputStream(mavenWrapperPropertyFile);
+                Properties mavenWrapperProperties = new Properties();
+                mavenWrapperProperties.load(mavenWrapperPropertyFileInputStream);
+                url = mavenWrapperProperties.getProperty(PROPERTY_NAME_WRAPPER_URL, url);
+            } catch (IOException e) {
+                System.out.println("- ERROR loading '" + MAVEN_WRAPPER_PROPERTIES_PATH + "'");
+            } finally {
+                try {
+                    if(mavenWrapperPropertyFileInputStream != null) {
+                        mavenWrapperPropertyFileInputStream.close();
+                    }
+                } catch (IOException e) {
+                    // Ignore ...
+                }
+            }
+        }
+        System.out.println("- Downloading from: " + url);
+
+        File outputFile = new File(baseDirectory.getAbsolutePath(), MAVEN_WRAPPER_JAR_PATH);
+        if(!outputFile.getParentFile().exists()) {
+            if(!outputFile.getParentFile().mkdirs()) {
+                System.out.println(
+                    "- ERROR creating output directory '" + outputFile.getParentFile().getAbsolutePath() + "'");
+            }
+        }
+        System.out.println("- Downloading to: " + outputFile.getAbsolutePath());
+        try {
+            downloadFileFromURL(url, outputFile);
+            System.out.println("Done");
+            System.exit(0);
+        } catch (Throwable e) {
+            System.out.println("- Error downloading");
+            e.printStackTrace();
+            System.exit(1);
+        }
+    }
+
+    private static void downloadFileFromURL(String urlString, File destination) throws Exception {
+        if (System.getenv("MVNW_USERNAME") != null && System.getenv("MVNW_PASSWORD") != null) {
+            String username = System.getenv("MVNW_USERNAME");
+            char[] password = System.getenv("MVNW_PASSWORD").toCharArray();
+            Authenticator.setDefault(new Authenticator() {
+                @Override
+                protected PasswordAuthentication getPasswordAuthentication() {
+                    return new PasswordAuthentication(username, password);
+                }
+            });
+        }
+        URL website = new URL(urlString);
+        ReadableByteChannel rbc;
+        rbc = Channels.newChannel(website.openStream());
+        FileOutputStream fos = new FileOutputStream(destination);
+        fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
+        fos.close();
+        rbc.close();
+    }
+
+}
diff --git a/.mvn/wrapper/maven-wrapper.properties b/.mvn/wrapper/maven-wrapper.properties
new file mode 100644
index 0000000..54ab0bc
--- /dev/null
+++ b/.mvn/wrapper/maven-wrapper.properties
@@ -0,0 +1,3 @@
+distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.6.1/apache-maven-3.6.1-bin.zip
+wrapperUrl=https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.5/maven-wrapper-0.5.5.jar
+
diff --git a/HEADER b/HEADER
new file mode 100644
index 0000000..1745cfe
--- /dev/null
+++ b/HEADER
@@ -0,0 +1,14 @@
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..8f71f43
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,202 @@
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "{}"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright {yyyy} {name of copyright owner}
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
diff --git a/NOTICE b/NOTICE
new file mode 100644
index 0000000..2ca1b04
--- /dev/null
+++ b/NOTICE
@@ -0,0 +1,5 @@
+Apache SkyWalking
+Copyright 2017-2021 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
diff --git a/apm-checkstyle/CHECKSTYLE_HEAD b/apm-checkstyle/CHECKSTYLE_HEAD
new file mode 100755
index 0000000..00e3b09
--- /dev/null
+++ b/apm-checkstyle/CHECKSTYLE_HEAD
@@ -0,0 +1,18 @@
+^<\?xml version="1\.0" encoding="UTF-8"\?>$
+^<!--$
+^ *(/\*|#|~)$
+^ *(\*|#|~) +Licensed to the Apache Software Foundation \(ASF\) under one or more$
+^ *(\*|#|~) +contributor license agreements\.  See the NOTICE file distributed with$
+^ *(\*|#|~) +this work for additional information regarding copyright ownership\.$
+^ *(\*|#|~) +The ASF licenses this file to You under the Apache License, Version 2\.0$
+^ *(\*|#|~) +\(the "License"\); you may not use this file except in compliance with$
+^ *(\*|#|~) +the License\.  You may obtain a copy of the License at$
+^ *(\*|#|~)$
+^ *(\*|#|~) +http://www\.apache\.org/licenses/LICENSE-2\.0$
+^ *(\*|#|~)$
+^ *(\*|#|~) +Unless required by applicable law or agreed to in writing, software$
+^ *(\*|#|~) +distributed under the License is distributed on an "AS IS" BASIS,$
+^ *(\*|#|~) +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied\.$
+^ *(\*|#|~) +See the License for the specific language governing permissions and$
+^ *(\*|#|~) +limitations under the License\.$
+^ *(\*|#|~)$
diff --git a/apm-checkstyle/checkStyle.xml b/apm-checkstyle/checkStyle.xml
new file mode 100755
index 0000000..d51e0c7
--- /dev/null
+++ b/apm-checkstyle/checkStyle.xml
@@ -0,0 +1,133 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<!DOCTYPE module PUBLIC
+    "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
+    "http://checkstyle.org/dtds/configuration_1_3.dtd">
+<module name="Checker">
+
+    <property name="localeLanguage" value="en"/>
+
+    <module name="FileTabCharacter">
+        <property name="eachLine" value="true"/>
+    </module>
+
+    <module name="RegexpHeader">
+        <property name="headerFile" value="${checkstyle.header.file}"/>
+        <property name="multiLines" value="1, 2, 3, 18"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="System\.out\.println"/>
+        <property name="message" value="Prohibit invoking System.out.println in source code !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="^\s*\*\s*@author"/>
+        <property name="minimum" value="0"/>
+        <property name="maximum" value="0"/>
+        <property name="message" value="ASF project doesn't allow @author copyright."/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format"
+                  value=".*[\u3400-\u4DB5\u4E00-\u9FA5\u9FA6-\u9FBB\uF900-\uFA2D\uFA30-\uFA6A\uFA70-\uFAD9\uFF00-\uFFEF\u2E80-\u2EFF\u3000-\u303F\u31C0-\u31EF]+.*"/>
+        <property name="message" value="Not allow chinese character !"/>
+    </module>
+
+    <module name="FileLength">
+        <property name="max" value="3000"/>
+    </module>
+
+    <module name="TreeWalker">
+
+        <module name="UnusedImports"/>
+        <module name="RedundantImport"/>
+        <module name="AvoidStarImport"/>
+
+        <module name="NonEmptyAtclauseDescription"/>
+
+        <!--Checks that classes that override equals() also override hashCode()-->
+        <module name="EqualsHashCode"/>
+        <!--Checks for over-complicated boolean expressions. Currently finds code like if (topic == true), topic || true, !false, etc.-->
+        <module name="SimplifyBooleanExpression"/>
+        <module name="OneStatementPerLine"/>
+        <module name="UnnecessaryParentheses"/>
+        <!--Checks for over-complicated boolean return statements. For example the following code-->
+        <module name="SimplifyBooleanReturn"/>
+
+        <!--Check that the default is after all the cases in producerGroup switch statement-->
+        <module name="DefaultComesLast"/>
+        <!--Detects empty statements (standalone ";" semicolon)-->
+        <module name="EmptyStatement"/>
+        <!--Checks that long constants are defined with an upper ell-->
+        <module name="UpperEll"/>
+        <module name="ConstantName">
+            <property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)"/>
+        </module>
+        <!--Checks that local, non-final variable names conform to producerGroup format specified by the format property-->
+        <module name="LocalVariableName"/>
+        <!--Validates identifiers for local, final variables, including catch parameters-->
+        <module name="LocalFinalVariableName"/>
+        <!--Validates identifiers for non-static fields-->
+        <module name="MemberName"/>
+        <!--Validates identifiers for class type parameters-->
+        <module name="ClassTypeParameterName">
+            <property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)"/>
+        </module>
+        <!--Validates identifiers for method type parameters-->
+        <module name="MethodTypeParameterName">
+            <property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)"/>
+        </module>
+        <module name="PackageName">
+            <property name="format" value="^(org|test)\.apache\.skywalking(\.[a-zA-Z][a-zA-Z0-9]*)+$"/>
+        </module>
+        <module name="ParameterName"/>
+        <module name="StaticVariableName">
+            <property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)"/>
+        </module>
+        <module name="TypeName">
+            <property name="format" value="(^[A-Z][a-zA-Z0-9]*$)|(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)"/>
+        </module>
+        <module name="MissingOverride"/>
+
+        <!--whitespace-->
+        <module name="GenericWhitespace"/>
+        <module name="WhitespaceAfter"/>
+        <module name="WhitespaceAround"/>
+        <module name="MethodParamPad"/>
+        <module name="ParenPad"/>
+        <module name="TypecastParenPad"/>
+        <module name="EmptyLineSeparator">
+            <property name="allowNoEmptyLineBetweenFields" value="true"/>
+            <property name="allowMultipleEmptyLines" value="false"/>
+            <property name="allowMultipleEmptyLinesInsideClassMembers" value="false"/>
+        </module>
+
+        <module name="ImportControl">
+            <property name="file" value="${import.control}"/>
+            <property name="path" value="apm-sniffer/(apm-sdk-plugin|bootstrap-plugins|optional-plugins)/.+/src/main/.+Instrumentation.java$"/>
+        </module>
+
+        <module name="ImportControl">
+            <property name="file" value="${import.control}"/>
+            <property name="path" value="apm-sniffer/apm-toolkit-activation/.+/src/main/.+Activation.java$"/>
+        </module>
+    </module>
+</module>
diff --git a/apm-checkstyle/importControl.xml b/apm-checkstyle/importControl.xml
new file mode 100755
index 0000000..ed6c0a4
--- /dev/null
+++ b/apm-checkstyle/importControl.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<!DOCTYPE import-control PUBLIC
+        "-//Puppy Crawl//DTD Import Control 1.4//EN"
+        "https://checkstyle.org/dtds/import_control_1_4.dtd">
+
+<import-control pkg="org.apache.skywalking.apm.(toolkit|plugin)" regex="true">
+    <allow pkg="java"/>
+    <allow pkg="org.apache.skywalking"/>
+    <allow pkg="net.bytebuddy"/>
+</import-control>
diff --git a/mvnw b/mvnw
new file mode 100755
index 0000000..a3925bb
--- /dev/null
+++ b/mvnw
@@ -0,0 +1,310 @@
+#!/bin/sh
+# ----------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# ----------------------------------------------------------------------------
+
+# ----------------------------------------------------------------------------
+# Maven Start Up Batch script
+#
+# Required ENV vars:
+# ------------------
+#   JAVA_HOME - location of a JDK home dir
+#
+# Optional ENV vars
+# -----------------
+#   M2_HOME - location of maven2's installed home dir
+#   MAVEN_OPTS - parameters passed to the Java VM when running Maven
+#     e.g. to debug Maven itself, use
+#       set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
+#   MAVEN_SKIP_RC - flag to disable loading of mavenrc files
+# ----------------------------------------------------------------------------
+
+if [ -z "$MAVEN_SKIP_RC" ] ; then
+
+  if [ -f /etc/mavenrc ] ; then
+    . /etc/mavenrc
+  fi
+
+  if [ -f "$HOME/.mavenrc" ] ; then
+    . "$HOME/.mavenrc"
+  fi
+
+fi
+
+# OS specific support.  $var _must_ be set to either true or false.
+cygwin=false;
+darwin=false;
+mingw=false
+case "`uname`" in
+  CYGWIN*) cygwin=true ;;
+  MINGW*) mingw=true;;
+  Darwin*) darwin=true
+    # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home
+    # See https://developer.apple.com/library/mac/qa/qa1170/_index.html
+    if [ -z "$JAVA_HOME" ]; then
+      if [ -x "/usr/libexec/java_home" ]; then
+        export JAVA_HOME="`/usr/libexec/java_home`"
+      else
+        export JAVA_HOME="/Library/Java/Home"
+      fi
+    fi
+    ;;
+esac
+
+if [ -z "$JAVA_HOME" ] ; then
+  if [ -r /etc/gentoo-release ] ; then
+    JAVA_HOME=`java-config --jre-home`
+  fi
+fi
+
+if [ -z "$M2_HOME" ] ; then
+  ## resolve links - $0 may be a link to maven's home
+  PRG="$0"
+
+  # need this for relative symlinks
+  while [ -h "$PRG" ] ; do
+    ls=`ls -ld "$PRG"`
+    link=`expr "$ls" : '.*-> \(.*\)$'`
+    if expr "$link" : '/.*' > /dev/null; then
+      PRG="$link"
+    else
+      PRG="`dirname "$PRG"`/$link"
+    fi
+  done
+
+  saveddir=`pwd`
+
+  M2_HOME=`dirname "$PRG"`/..
+
+  # make it fully qualified
+  M2_HOME=`cd "$M2_HOME" && pwd`
+
+  cd "$saveddir"
+  # echo Using m2 at $M2_HOME
+fi
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched
+if $cygwin ; then
+  [ -n "$M2_HOME" ] &&
+    M2_HOME=`cygpath --unix "$M2_HOME"`
+  [ -n "$JAVA_HOME" ] &&
+    JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
+  [ -n "$CLASSPATH" ] &&
+    CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
+fi
+
+# For Mingw, ensure paths are in UNIX format before anything is touched
+if $mingw ; then
+  [ -n "$M2_HOME" ] &&
+    M2_HOME="`(cd "$M2_HOME"; pwd)`"
+  [ -n "$JAVA_HOME" ] &&
+    JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
+fi
+
+if [ -z "$JAVA_HOME" ]; then
+  javaExecutable="`which javac`"
+  if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then
+    # readlink(1) is not available as standard on Solaris 10.
+    readLink=`which readlink`
+    if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then
+      if $darwin ; then
+        javaHome="`dirname \"$javaExecutable\"`"
+        javaExecutable="`cd \"$javaHome\" && pwd -P`/javac"
+      else
+        javaExecutable="`readlink -f \"$javaExecutable\"`"
+      fi
+      javaHome="`dirname \"$javaExecutable\"`"
+      javaHome=`expr "$javaHome" : '\(.*\)/bin'`
+      JAVA_HOME="$javaHome"
+      export JAVA_HOME
+    fi
+  fi
+fi
+
+if [ -z "$JAVACMD" ] ; then
+  if [ -n "$JAVA_HOME"  ] ; then
+    if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+      # IBM's JDK on AIX uses strange locations for the executables
+      JAVACMD="$JAVA_HOME/jre/sh/java"
+    else
+      JAVACMD="$JAVA_HOME/bin/java"
+    fi
+  else
+    JAVACMD="`which java`"
+  fi
+fi
+
+if [ ! -x "$JAVACMD" ] ; then
+  echo "Error: JAVA_HOME is not defined correctly." >&2
+  echo "  We cannot execute $JAVACMD" >&2
+  exit 1
+fi
+
+if [ -z "$JAVA_HOME" ] ; then
+  echo "Warning: JAVA_HOME environment variable is not set."
+fi
+
+CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher
+
+# traverses directory structure from process work directory to filesystem root
+# first directory with .mvn subdirectory is considered project base directory
+find_maven_basedir() {
+
+  if [ -z "$1" ]
+  then
+    echo "Path not specified to find_maven_basedir"
+    return 1
+  fi
+
+  basedir="$1"
+  wdir="$1"
+  while [ "$wdir" != '/' ] ; do
+    if [ -d "$wdir"/.mvn ] ; then
+      basedir=$wdir
+      break
+    fi
+    # workaround for JBEAP-8937 (on Solaris 10/Sparc)
+    if [ -d "${wdir}" ]; then
+      wdir=`cd "$wdir/.."; pwd`
+    fi
+    # end of workaround
+  done
+  echo "${basedir}"
+}
+
+# concatenates all lines of a file
+concat_lines() {
+  if [ -f "$1" ]; then
+    echo "$(tr -s '\n' ' ' < "$1")"
+  fi
+}
+
+BASE_DIR=`find_maven_basedir "$(pwd)"`
+if [ -z "$BASE_DIR" ]; then
+  exit 1;
+fi
+
+##########################################################################################
+# Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
+# This allows using the maven wrapper in projects that prohibit checking in binary data.
+##########################################################################################
+if [ -r "$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" ]; then
+    if [ "$MVNW_VERBOSE" = true ]; then
+      echo "Found .mvn/wrapper/maven-wrapper.jar"
+    fi
+else
+    if [ "$MVNW_VERBOSE" = true ]; then
+      echo "Couldn't find .mvn/wrapper/maven-wrapper.jar, downloading it ..."
+    fi
+    if [ -n "$MVNW_REPOURL" ]; then
+      jarUrl="$MVNW_REPOURL/io/takari/maven-wrapper/0.5.5/maven-wrapper-0.5.5.jar"
+    else
+      jarUrl="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.5/maven-wrapper-0.5.5.jar"
+    fi
+    while IFS="=" read key value; do
+      case "$key" in (wrapperUrl) jarUrl="$value"; break ;;
+      esac
+    done < "$BASE_DIR/.mvn/wrapper/maven-wrapper.properties"
+    if [ "$MVNW_VERBOSE" = true ]; then
+      echo "Downloading from: $jarUrl"
+    fi
+    wrapperJarPath="$BASE_DIR/.mvn/wrapper/maven-wrapper.jar"
+    if $cygwin; then
+      wrapperJarPath=`cygpath --path --windows "$wrapperJarPath"`
+    fi
+
+    if command -v wget > /dev/null; then
+        if [ "$MVNW_VERBOSE" = true ]; then
+          echo "Found wget ... using wget"
+        fi
+        if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
+            wget "$jarUrl" -O "$wrapperJarPath"
+        else
+            wget --http-user=$MVNW_USERNAME --http-password=$MVNW_PASSWORD "$jarUrl" -O "$wrapperJarPath"
+        fi
+    elif command -v curl > /dev/null; then
+        if [ "$MVNW_VERBOSE" = true ]; then
+          echo "Found curl ... using curl"
+        fi
+        if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
+            curl -o "$wrapperJarPath" "$jarUrl" -f
+        else
+            curl --user $MVNW_USERNAME:$MVNW_PASSWORD -o "$wrapperJarPath" "$jarUrl" -f
+        fi
+
+    else
+        if [ "$MVNW_VERBOSE" = true ]; then
+          echo "Falling back to using Java to download"
+        fi
+        javaClass="$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.java"
+        # For Cygwin, switch paths to Windows format before running javac
+        if $cygwin; then
+          javaClass=`cygpath --path --windows "$javaClass"`
+        fi
+        if [ -e "$javaClass" ]; then
+            if [ ! -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then
+                if [ "$MVNW_VERBOSE" = true ]; then
+                  echo " - Compiling MavenWrapperDownloader.java ..."
+                fi
+                # Compiling the Java class
+                ("$JAVA_HOME/bin/javac" "$javaClass")
+            fi
+            if [ -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then
+                # Running the downloader
+                if [ "$MVNW_VERBOSE" = true ]; then
+                  echo " - Running MavenWrapperDownloader.java ..."
+                fi
+                ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$MAVEN_PROJECTBASEDIR")
+            fi
+        fi
+    fi
+fi
+##########################################################################################
+# End of extension
+##########################################################################################
+
+export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}
+if [ "$MVNW_VERBOSE" = true ]; then
+  echo $MAVEN_PROJECTBASEDIR
+fi
+MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin; then
+  [ -n "$M2_HOME" ] &&
+    M2_HOME=`cygpath --path --windows "$M2_HOME"`
+  [ -n "$JAVA_HOME" ] &&
+    JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
+  [ -n "$CLASSPATH" ] &&
+    CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
+  [ -n "$MAVEN_PROJECTBASEDIR" ] &&
+    MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"`
+fi
+
+# Provide a "standardized" way to retrieve the CLI args that will
+# work with both Windows and non-Windows executions.
+MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@"
+export MAVEN_CMD_LINE_ARGS
+
+WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
+
+exec "$JAVACMD" \
+  $MAVEN_OPTS \
+  -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
+  "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
+  ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@"
diff --git a/mvnw.cmd b/mvnw.cmd
new file mode 100755
index 0000000..15123ea
--- /dev/null
+++ b/mvnw.cmd
@@ -0,0 +1,182 @@
+@REM ----------------------------------------------------------------------------
+@REM Licensed to the Apache Software Foundation (ASF) under one
+@REM or more contributor license agreements.  See the NOTICE file
+@REM distributed with this work for additional information
+@REM regarding copyright ownership.  The ASF licenses this file
+@REM to you under the Apache License, Version 2.0 (the
+@REM "License"); you may not use this file except in compliance
+@REM with the License.  You may obtain a copy of the License at
+@REM
+@REM    http://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM Unless required by applicable law or agreed to in writing,
+@REM software distributed under the License is distributed on an
+@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+@REM KIND, either express or implied.  See the License for the
+@REM specific language governing permissions and limitations
+@REM under the License.
+@REM ----------------------------------------------------------------------------
+
+@REM ----------------------------------------------------------------------------
+@REM Maven Start Up Batch script
+@REM
+@REM Required ENV vars:
+@REM JAVA_HOME - location of a JDK home dir
+@REM
+@REM Optional ENV vars
+@REM M2_HOME - location of maven2's installed home dir
+@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
+@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending
+@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
+@REM     e.g. to debug Maven itself, use
+@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
+@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
+@REM ----------------------------------------------------------------------------
+
+@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
+@echo off
+@REM set title of command window
+title %0
+@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on'
+@if "%MAVEN_BATCH_ECHO%" == "on"  echo %MAVEN_BATCH_ECHO%
+
+@REM set %HOME% to equivalent of $HOME
+if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
+
+@REM Execute a user defined script before this one
+if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
+@REM check for pre script, once with legacy .bat ending and once with .cmd ending
+if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
+if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
+:skipRcPre
+
+@setlocal
+
+set ERROR_CODE=0
+
+@REM To isolate internal variables from possible post scripts, we use another setlocal
+@setlocal
+
+@REM ==== START VALIDATION ====
+if not "%JAVA_HOME%" == "" goto OkJHome
+
+echo.
+echo Error: JAVA_HOME not found in your environment. >&2
+echo Please set the JAVA_HOME variable in your environment to match the >&2
+echo location of your Java installation. >&2
+echo.
+goto error
+
+:OkJHome
+if exist "%JAVA_HOME%\bin\java.exe" goto init
+
+echo.
+echo Error: JAVA_HOME is set to an invalid directory. >&2
+echo JAVA_HOME = "%JAVA_HOME%" >&2
+echo Please set the JAVA_HOME variable in your environment to match the >&2
+echo location of your Java installation. >&2
+echo.
+goto error
+
+@REM ==== END VALIDATION ====
+
+:init
+
+@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
+@REM Fallback to current working directory if not found.
+
+set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
+IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
+
+set EXEC_DIR=%CD%
+set WDIR=%EXEC_DIR%
+:findBaseDir
+IF EXIST "%WDIR%"\.mvn goto baseDirFound
+cd ..
+IF "%WDIR%"=="%CD%" goto baseDirNotFound
+set WDIR=%CD%
+goto findBaseDir
+
+:baseDirFound
+set MAVEN_PROJECTBASEDIR=%WDIR%
+cd "%EXEC_DIR%"
+goto endDetectBaseDir
+
+:baseDirNotFound
+set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
+cd "%EXEC_DIR%"
+
+:endDetectBaseDir
+
+IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
+
+@setlocal EnableExtensions EnableDelayedExpansion
+for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
+@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
+
+:endReadAdditionalConfig
+
+SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
+set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
+set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
+
+set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.5/maven-wrapper-0.5.5.jar"
+
+FOR /F "tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO (
+    IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B
+)
+
+@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
+@REM This allows using the maven wrapper in projects that prohibit checking in binary data.
+if exist %WRAPPER_JAR% (
+    if "%MVNW_VERBOSE%" == "true" (
+        echo Found %WRAPPER_JAR%
+    )
+) else (
+    if not "%MVNW_REPOURL%" == "" (
+        SET DOWNLOAD_URL="%MVNW_REPOURL%/io/takari/maven-wrapper/0.5.5/maven-wrapper-0.5.5.jar"
+    )
+    if "%MVNW_VERBOSE%" == "true" (
+        echo Couldn't find %WRAPPER_JAR%, downloading it ...
+        echo Downloading from: %DOWNLOAD_URL%
+    )
+
+    powershell -Command "&{"^
+		"$webclient = new-object System.Net.WebClient;"^
+		"if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^
+		"$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^
+		"}"^
+		"[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"^
+		"}"
+    if "%MVNW_VERBOSE%" == "true" (
+        echo Finished downloading %WRAPPER_JAR%
+    )
+)
+@REM End of extension
+
+@REM Provide a "standardized" way to retrieve the CLI args that will
+@REM work with both Windows and non-Windows executions.
+set MAVEN_CMD_LINE_ARGS=%*
+
+%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
+if ERRORLEVEL 1 goto error
+goto end
+
+:error
+set ERROR_CODE=1
+
+:end
+@endlocal & set ERROR_CODE=%ERROR_CODE%
+
+if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
+@REM check for post script, once with legacy .bat ending and once with .cmd ending
+if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
+if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
+:skipRcPost
+
+@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
+if "%MAVEN_BATCH_PAUSE%" == "on" pause
+
+if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
+
+exit /B %ERROR_CODE%
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..fb36c46
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,357 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.skywalking</groupId>
+    <artifactId>banyandb-java-client</artifactId>
+    <version>0.1.0-SNAPSHOT</version>
+
+    <parent>
+        <groupId>org.apache</groupId>
+        <artifactId>apache</artifactId>
+        <version>21</version>
+        <relativePath/>
+    </parent>
+
+    <packaging>jar</packaging>
+
+    <name>banyandb-java-client</name>
+    <url>https://github.com/apache/skywalking-banyandb-java-client</url>
+
+    <scm>
+        <url>https://github.com/apache/skywalking-banyandb-java-client</url>
+        <connection>scm:git:https://github.com/apache/skywalking-banyandb-java-client.git</connection>
+        <developerConnection>scm:git:https://github.com/apache/skywalking-banyandb-java-client.git</developerConnection>
+        <tag>HEAD</tag>
+    </scm>
+
+    <issueManagement>
+        <system>GitHub</system>
+        <url>https://github.com/apache/skywalking/issues</url>
+    </issueManagement>
+
+    <licenses>
+        <license>
+            <name>The Apache Software License, Version 2.0</name>
+            <url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
+        </license>
+    </licenses>
+
+    <mailingLists>
+        <mailingList>
+            <name>SkyWalking Developer List</name>
+            <post>dev@skywalking.apache.org</post>
+            <subscribe>dev-subscribe@skywalking.apache.org</subscribe>
+            <unsubscribe>dev-unsubscribe@skywalking.apache.org</unsubscribe>
+        </mailingList>
+        <mailingList>
+            <name>SkyWalking Commits</name>
+            <post>commits@skywalking.apache.org</post>
+            <subscribe>commits-subscribe@skywalking.apache.org</subscribe>
+            <unsubscribe>commits-unsubscribe@skywalking.apache.org</unsubscribe>
+        </mailingList>
+    </mailingLists>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <compiler.version>1.8</compiler.version>
+        <powermock.version>2.0.7</powermock.version>
+        <checkstyle.version>6.18</checkstyle.version>
+        <junit.version>4.12</junit.version>
+        <mockito-core.version>3.5.13</mockito-core.version>
+        <lombok.version>1.18.20</lombok.version>
+
+        <!-- core lib dependency -->
+        <bytebuddy.version>1.10.19</bytebuddy.version>
+        <grpc.version>1.32.1</grpc.version>
+        <gson.version>2.8.6</gson.version>
+        <os-maven-plugin.version>1.6.2</os-maven-plugin.version>
+        <protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
+        <com.google.protobuf.protoc.version>3.12.0</com.google.protobuf.protoc.version>
+        <protoc-gen-grpc-java.plugin.version>1.32.1</protoc-gen-grpc-java.plugin.version>
+        <netty-tcnative-boringssl-static.version>2.0.39.Final</netty-tcnative-boringssl-static.version>
+        <javax.annotation-api.version>1.3.2</javax.annotation-api.version>
+        <!-- necessary for Java 9+ -->
+        <org.apache.tomcat.annotations-api.version>6.0.53</org.apache.tomcat.annotations-api.version>
+        <slf4j.version>1.7.30</slf4j.version>
+
+        <!-- Plugin versions -->
+        <docker.plugin.version>0.4.13</docker.plugin.version>
+        <takari-maven-plugin.version>0.6.1</takari-maven-plugin.version>
+        <exec-maven-plugin.version>1.6.0</exec-maven-plugin.version>
+        <maven-antrun-plugin.version>1.8</maven-antrun-plugin.version>
+        <maven-dependency-plugin.version>2.10</maven-dependency-plugin.version>
+        <maven-deploy-plugin.version>2.8.2</maven-deploy-plugin.version>
+        <maven-assembly-plugin.version>3.1.0</maven-assembly-plugin.version>
+        <maven-failsafe-plugin.version>2.22.0</maven-failsafe-plugin.version>
+        <build-helper-maven-plugin.version>3.2.0</build-helper-maven-plugin.version>
+        <maven-jar-plugin.version>3.1.0</maven-jar-plugin.version>
+        <maven-enforcer-plugin.version>3.0.0-M2</maven-enforcer-plugin.version>
+        <maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
+        <maven-resource-plugin.version>3.1.0</maven-resource-plugin.version>
+        <maven-source-plugin.version>3.0.1</maven-source-plugin.version>
+        <versions-maven-plugin.version>2.5</versions-maven-plugin.version>
+        <coveralls-maven-plugin.version>4.3.0</coveralls-maven-plugin.version>
+        <maven-checkstyle-plugin.version>3.1.0</maven-checkstyle-plugin.version>
+        <jmh.version>1.21</jmh.version>
+        <checkstyle.fails.on.error>true</checkstyle.fails.on.error>
+
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-netty-shaded</artifactId>
+            <version>${grpc.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-protobuf</artifactId>
+            <version>${grpc.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-stub</artifactId>
+            <version>${grpc.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-tcnative-boringssl-static</artifactId>
+            <version>${netty-tcnative-boringssl-static.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+        <dependency> <!-- necessary for Java 9+ -->
+            <groupId>org.apache.tomcat</groupId>
+            <artifactId>annotations-api</artifactId>
+            <version>${org.apache.tomcat.annotations-api.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-testing</artifactId>
+            <version>${grpc.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${mockito-core.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito2</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>${lombok.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>javax.annotation</groupId>
+            <artifactId>javax.annotation-api</artifactId>
+            <version>${javax.annotation-api.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>kr.motd.maven</groupId>
+                <artifactId>os-maven-plugin</artifactId>
+                <version>${os-maven-plugin.version}</version>
+                <executions>
+                    <execution>
+                        <phase>initialize</phase>
+                        <goals>
+                            <goal>detect</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.xolstice.maven.plugins</groupId>
+                <artifactId>protobuf-maven-plugin</artifactId>
+                <version>${protobuf-maven-plugin.version}</version>
+                <configuration>
+                    <!--
+                      The version of protoc must match protobuf-java. If you don't depend on
+                      protobuf-java directly, you will be transitively depending on the
+                      protobuf-java version that grpc depends on.
+                    -->
+                    <protocArtifact>
+                        com.google.protobuf:protoc:${com.google.protobuf.protoc.version}:exe:${os.detected.classifier}
+                    </protocArtifact>
+                    <pluginId>grpc-java</pluginId>
+                    <pluginArtifact>
+                        io.grpc:protoc-gen-grpc-java:${protoc-gen-grpc-java.plugin.version}:exe:${os.detected.classifier}
+                    </pluginArtifact>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>grpc-build</id>
+                        <goals>
+                            <goal>compile</goal>
+                            <goal>compile-custom</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-enforcer-plugin</artifactId>
+                <version>${maven-enforcer-plugin.version}</version>
+                <executions>
+                    <execution>
+                        <id>enforce-java</id>
+                        <goals>
+                            <goal>enforce</goal>
+                        </goals>
+                        <phase>validate</phase>
+                        <configuration>
+                            <rules>
+                                <requireJavaVersion>
+                                    <!-- Build has not yet been updated for Java 9+ -->
+                                    <version>1.8</version>
+                                </requireJavaVersion>
+                                <requireMavenVersion>
+                                    <version>3.6</version>
+                                </requireMavenVersion>
+                            </rules>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>${maven-compiler-plugin.version}</version>
+                <configuration>
+                    <source>${compiler.version}</source>
+                    <target>${compiler.version}</target>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>${maven-resource-plugin.version}</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>com.spotify</groupId>
+                <artifactId>docker-maven-plugin</artifactId>
+                <version>${docker.plugin.version}</version>
+                <configuration>
+                    <skipDocker>true</skipDocker>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>${maven-source-plugin.version}</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <phase>none</phase>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <version>${maven-checkstyle-plugin.version}</version>
+                <configuration>
+                    <configLocation>${maven.multiModuleProjectDirectory}/apm-checkstyle/checkStyle.xml</configLocation>
+                    <headerLocation>${maven.multiModuleProjectDirectory}/apm-checkstyle/CHECKSTYLE_HEAD</headerLocation>
+                    <encoding>UTF-8</encoding>
+                    <consoleOutput>true</consoleOutput>
+                    <includeTestSourceDirectory>true</includeTestSourceDirectory>
+                    <failOnViolation>${checkstyle.fails.on.error}</failOnViolation>
+                    <sourceDirectories>
+                        <sourceDirectory>${project.build.sourceDirectory}</sourceDirectory>
+                        <sourceDirectory>${project.build.testSourceDirectory}</sourceDirectory>
+                    </sourceDirectories>
+                    <resourceIncludes>
+                        **/*.properties,
+                        **/*.sh,
+                        **/*.bat,
+                        **/*.yml,
+                        **/*.yaml,
+                        **/*.xml
+                    </resourceIncludes>
+                    <resourceExcludes>
+                        **/.asf.yaml,
+                        **/.github/**
+                    </resourceExcludes>
+                    <excludes>
+                        **/target/generated-test-sources/**,
+                        org/apache/skywalking/apm/network/register/v2/**/*.java,
+                        org/apache/skywalking/apm/network/common/**/*.java,
+                        org/apache/skywalking/apm/network/servicemesh/**/*.java,
+                        org/apache/skywalking/apm/network/language/**/*.java,
+                        org/apache/skywalking/oap/server/core/remote/grpc/proto/*.java,
+                        org/apache/skywalking/oal/rt/grammar/*.java,
+                        org/apache/skywalking/oap/server/exporter/grpc/*.java,
+                        org/apache/skywalking/oap/server/configuration/service/*.java,
+                        **/generated/*_jmhType*.java,
+                        **/generated/*_jmhTest.java
+                    </excludes>
+                    <propertyExpansion>
+                        import.control=${maven.multiModuleProjectDirectory}/apm-checkstyle/importControl.xml
+                    </propertyExpansion>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>validate</id>
+                        <phase>process-sources</phase>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/DataCarrier.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/DataCarrier.java
new file mode 100644
index 0000000..db2ed2d
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/DataCarrier.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier;
+
+import java.util.Properties;
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.BufferStrategy;
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Channels;
+import org.apache.skywalking.banyandb.commons.datacarrier.consumer.ConsumeDriver;
+import org.apache.skywalking.banyandb.commons.datacarrier.consumer.ConsumerPool;
+import org.apache.skywalking.banyandb.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.banyandb.commons.datacarrier.consumer.IDriver;
+import org.apache.skywalking.banyandb.commons.datacarrier.partition.IDataPartitioner;
+import org.apache.skywalking.banyandb.commons.datacarrier.partition.SimpleRollingPartitioner;
+
+/**
+ * DataCarrier main class. use this instance to set Producer/Consumer Model.
+ */
+public class DataCarrier<T> {
+    private Channels<T> channels;
+    private IDriver driver;
+    private String name;
+
+    public DataCarrier(int channelSize, int bufferSize) {
+        this("DEFAULT", channelSize, bufferSize);
+    }
+
+    public DataCarrier(String name, int channelSize, int bufferSize) {
+        this(name, name, channelSize, bufferSize);
+    }
+
+    public DataCarrier(String name, String envPrefix, int channelSize, int bufferSize) {
+        this(name, envPrefix, channelSize, bufferSize, BufferStrategy.BLOCKING);
+    }
+
+    public DataCarrier(String name, String envPrefix, int channelSize, int bufferSize, BufferStrategy strategy) {
+        this.name = name;
+        bufferSize = EnvUtil.getInt(envPrefix + "_BUFFER_SIZE", bufferSize);
+        channelSize = EnvUtil.getInt(envPrefix + "_CHANNEL_SIZE", channelSize);
+        channels = new Channels<>(channelSize, bufferSize, new SimpleRollingPartitioner<T>(), strategy);
+    }
+
+    public DataCarrier(int channelSize, int bufferSize, BufferStrategy strategy) {
+        this("DEFAULT", "DEFAULT", channelSize, bufferSize, strategy);
+    }
+
+    /**
+     * set a new IDataPartitioner. It will cover the current one or default one.(Default is {@link
+     * SimpleRollingPartitioner}
+     *
+     * @param dataPartitioner to partition data into different channel by some rules.
+     * @return DataCarrier instance for chain
+     */
+    public DataCarrier setPartitioner(IDataPartitioner<T> dataPartitioner) {
+        this.channels.setPartitioner(dataPartitioner);
+        return this;
+    }
+
+    /**
+     * produce data to buffer, using the given {@link BufferStrategy}.
+     *
+     * @return false means produce data failure. The data will not be consumed.
+     */
+    public boolean produce(T data) {
+        if (driver != null) {
+            if (!driver.isRunning(channels)) {
+                return false;
+            }
+        }
+
+        return this.channels.save(data);
+    }
+
+    /**
+     * set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work.
+     *
+     * @param consumerClass class of consumer
+     * @param num           number of consumer threads
+     * @param properties    for initializing consumer.
+     */
+    public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass,
+                               int num,
+                               long consumeCycle,
+                               Properties properties) {
+        if (driver != null) {
+            driver.close(channels);
+        }
+        driver = new ConsumeDriver<T>(this.name, this.channels, consumerClass, num, consumeCycle, properties);
+        driver.begin(channels);
+        return this;
+    }
+
+    /**
+     * set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work with 20
+     * millis consume cycle.
+     *
+     * @param consumerClass class of consumer
+     * @param num           number of consumer threads
+     */
+    public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass, int num) {
+        return this.consume(consumerClass, num, 20, new Properties());
+    }
+
+    /**
+     * set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work.
+     *
+     * @param consumer single instance of consumer, all consumer threads will all use this instance.
+     * @param num      number of consumer threads
+     */
+    public DataCarrier consume(IConsumer<T> consumer, int num, long consumeCycle) {
+        if (driver != null) {
+            driver.close(channels);
+        }
+        driver = new ConsumeDriver<T>(this.name, this.channels, consumer, num, consumeCycle);
+        driver.begin(channels);
+        return this;
+    }
+
+    /**
+     * set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work with 20
+     * millis consume cycle.
+     *
+     * @param consumer single instance of consumer, all consumer threads will all use this instance.
+     * @param num      number of consumer threads
+     */
+    public DataCarrier consume(IConsumer<T> consumer, int num) {
+        return this.consume(consumer, num, 20);
+    }
+
+    /**
+     * Set a consumer pool to manage the channels of this DataCarrier. Then consumerPool could use its own consuming
+     * model to adjust the consumer thread and throughput.
+     */
+    public DataCarrier consume(ConsumerPool consumerPool, IConsumer<T> consumer) {
+        driver = consumerPool;
+        consumerPool.add(this.name, channels, consumer);
+        driver.begin(channels);
+        return this;
+    }
+
+    /**
+     * shutdown all consumer threads, if consumer threads are running. Notice {@link BufferStrategy}: if {@link
+     * BufferStrategy} == {@link BufferStrategy#BLOCKING}, shutdown consumeDriver maybe cause blocking when producing.
+     * Better way to change consumeDriver are use {@link DataCarrier#consume}
+     */
+    public void shutdownConsumers() {
+        if (driver != null) {
+            driver.close(channels);
+        }
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/EnvUtil.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/EnvUtil.java
new file mode 100644
index 0000000..36f0cb6
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/EnvUtil.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier;
+
+/**
+ * Read value from system env.
+ */
+public class EnvUtil {
+    public static int getInt(String envName, int defaultValue) {
+        int value = defaultValue;
+        String envValue = System.getenv(envName);
+        if (envValue != null) {
+            try {
+                value = Integer.parseInt(envValue);
+            } catch (NumberFormatException e) {
+
+            }
+        }
+        return value;
+    }
+
+    public static long getLong(String envName, long defaultValue) {
+        long value = defaultValue;
+        String envValue = System.getenv(envName);
+        if (envValue != null) {
+            try {
+                value = Long.parseLong(envValue);
+            } catch (NumberFormatException e) {
+
+            }
+        }
+        return value;
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/ArrayBlockingQueueBuffer.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/ArrayBlockingQueueBuffer.java
new file mode 100644
index 0000000..feb2a99
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/ArrayBlockingQueueBuffer.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.buffer;
+
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+
+/**
+ * The buffer implementation based on JDK ArrayBlockingQueue.
+ * <p>
+ * This implementation has better performance in server side. We are still trying to research whether this is suitable
+ * for agent side, which is more sensitive about blocks.
+ */
+public class ArrayBlockingQueueBuffer<T> implements QueueBuffer<T> {
+    private BufferStrategy strategy;
+    private ArrayBlockingQueue<T> queue;
+    private int bufferSize;
+
+    ArrayBlockingQueueBuffer(int bufferSize, BufferStrategy strategy) {
+        this.strategy = strategy;
+        this.queue = new ArrayBlockingQueue<T>(bufferSize);
+        this.bufferSize = bufferSize;
+    }
+
+    @Override
+    public boolean save(T data) {
+        //only BufferStrategy.BLOCKING
+        try {
+            queue.put(data);
+        } catch (InterruptedException e) {
+            // Ignore the error
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public void setStrategy(BufferStrategy strategy) {
+        this.strategy = strategy;
+    }
+
+    @Override
+    public void obtain(List<T> consumeList) {
+        queue.drainTo(consumeList);
+    }
+
+    @Override
+    public int getBufferSize() {
+        return bufferSize;
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/Buffer.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/Buffer.java
new file mode 100644
index 0000000..38e92af
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/Buffer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.buffer;
+
+import java.util.List;
+import org.apache.skywalking.banyandb.commons.datacarrier.common.AtomicRangeInteger;
+
+/**
+ * Self implementation ring queue.
+ */
+public class Buffer<T> implements QueueBuffer<T> {
+    private final Object[] buffer;
+    private BufferStrategy strategy;
+    private AtomicRangeInteger index;
+
+    Buffer(int bufferSize, BufferStrategy strategy) {
+        buffer = new Object[bufferSize];
+        this.strategy = strategy;
+        index = new AtomicRangeInteger(0, bufferSize);
+    }
+
+    @Override
+    public void setStrategy(BufferStrategy strategy) {
+        this.strategy = strategy;
+    }
+
+    @Override
+    public boolean save(T data) {
+        int i = index.getAndIncrement();
+        if (buffer[i] != null) {
+            switch (strategy) {
+                case IF_POSSIBLE:
+                    return false;
+                default:
+            }
+        }
+        buffer[i] = data;
+        return true;
+    }
+
+    @Override
+    public int getBufferSize() {
+        return buffer.length;
+    }
+
+    @Override
+    public void obtain(List<T> consumeList) {
+        this.obtain(consumeList, 0, buffer.length);
+    }
+
+    void obtain(List<T> consumeList, int start, int end) {
+        for (int i = start; i < end; i++) {
+            if (buffer[i] != null) {
+                consumeList.add((T) buffer[i]);
+                buffer[i] = null;
+            }
+        }
+    }
+
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/BufferStrategy.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/BufferStrategy.java
new file mode 100644
index 0000000..9dbc556
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/BufferStrategy.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.buffer;
+
+public enum BufferStrategy {
+    BLOCKING, IF_POSSIBLE
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/Channels.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/Channels.java
new file mode 100644
index 0000000..46ce98f
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/Channels.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.buffer;
+
+import org.apache.skywalking.banyandb.commons.datacarrier.partition.IDataPartitioner;
+
+/**
+ * Channels of Buffer It contains all buffer data which belongs to this channel. It supports several strategy when
+ * buffer is full. The Default is BLOCKING <p> Created by wusheng on 2016/10/25.
+ */
+public class Channels<T> {
+    private final QueueBuffer<T>[] bufferChannels;
+    private IDataPartitioner<T> dataPartitioner;
+    private final BufferStrategy strategy;
+    private final long size;
+
+    public Channels(int channelSize, int bufferSize, IDataPartitioner<T> partitioner, BufferStrategy strategy) {
+        this.dataPartitioner = partitioner;
+        this.strategy = strategy;
+        bufferChannels = new QueueBuffer[channelSize];
+        for (int i = 0; i < channelSize; i++) {
+            if (BufferStrategy.BLOCKING.equals(strategy)) {
+                bufferChannels[i] = new ArrayBlockingQueueBuffer<>(bufferSize, strategy);
+            } else {
+                bufferChannels[i] = new Buffer<>(bufferSize, strategy);
+            }
+        }
+        // noinspection PointlessArithmeticExpression
+        size = 1L * channelSize * bufferSize; // it's not pointless, it prevents numeric overflow before assigning an integer to a long
+    }
+
+    public boolean save(T data) {
+        int index = dataPartitioner.partition(bufferChannels.length, data);
+        int retryCountDown = 1;
+        if (BufferStrategy.IF_POSSIBLE.equals(strategy)) {
+            int maxRetryCount = dataPartitioner.maxRetryCount();
+            if (maxRetryCount > 1) {
+                retryCountDown = maxRetryCount;
+            }
+        }
+        for (; retryCountDown > 0; retryCountDown--) {
+            if (bufferChannels[index].save(data)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public void setPartitioner(IDataPartitioner<T> dataPartitioner) {
+        this.dataPartitioner = dataPartitioner;
+    }
+
+    /**
+     * override the strategy at runtime. Notice, this will override several channels one by one. So, when running
+     * setStrategy, each channel may use different BufferStrategy
+     */
+    public void setStrategy(BufferStrategy strategy) {
+        for (QueueBuffer<T> buffer : bufferChannels) {
+            buffer.setStrategy(strategy);
+        }
+    }
+
+    /**
+     * get channelSize
+     */
+    public int getChannelSize() {
+        return this.bufferChannels.length;
+    }
+
+    public long size() {
+        return size;
+    }
+
+    public QueueBuffer<T> getBuffer(int index) {
+        return this.bufferChannels[index];
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/QueueBuffer.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/QueueBuffer.java
new file mode 100644
index 0000000..4baf83c
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/QueueBuffer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.buffer;
+
+import java.util.List;
+
+/**
+ * Queue buffer interface.
+ */
+public interface QueueBuffer<T> {
+    /**
+     * Save data into the queue;
+     *
+     * @param data to add.
+     * @return true if saved
+     */
+    boolean save(T data);
+
+    /**
+     * Set different strategy when queue is full.
+     */
+    void setStrategy(BufferStrategy strategy);
+
+    /**
+     * Obtain the existing data from the queue
+     */
+    void obtain(List<T> consumeList);
+
+    int getBufferSize();
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/common/AtomicRangeInteger.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/common/AtomicRangeInteger.java
new file mode 100644
index 0000000..cb2b4be
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/common/AtomicRangeInteger.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.common;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicIntegerArray;
+
+public class AtomicRangeInteger extends Number implements Serializable {
+    private static final long serialVersionUID = -4099792402691141643L;
+    private AtomicIntegerArray values;
+
+    private static final int VALUE_OFFSET = 15;
+
+    private int startValue;
+    private int endValue;
+
+    public AtomicRangeInteger(int startValue, int maxValue) {
+        this.values = new AtomicIntegerArray(31);
+        this.values.set(VALUE_OFFSET, startValue);
+        this.startValue = startValue;
+        this.endValue = maxValue - 1;
+    }
+
+    public final int getAndIncrement() {
+        int next;
+        do {
+            next = this.values.incrementAndGet(VALUE_OFFSET);
+            if (next > endValue && this.values.compareAndSet(VALUE_OFFSET, next, startValue)) {
+                return endValue;
+            }
+        }
+        while (next > endValue);
+
+        return next - 1;
+    }
+
+    public final int get() {
+        return this.values.get(VALUE_OFFSET);
+    }
+
+    @Override
+    public int intValue() {
+        return this.values.get(VALUE_OFFSET);
+    }
+
+    @Override
+    public long longValue() {
+        return this.values.get(VALUE_OFFSET);
+    }
+
+    @Override
+    public float floatValue() {
+        return this.values.get(VALUE_OFFSET);
+    }
+
+    @Override
+    public double doubleValue() {
+        return this.values.get(VALUE_OFFSET);
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/BulkConsumePool.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/BulkConsumePool.java
new file mode 100644
index 0000000..fd45499
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/BulkConsumePool.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import org.apache.skywalking.banyandb.commons.datacarrier.EnvUtil;
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Channels;
+
+/**
+ * BulkConsumePool works for consuming data from multiple channels(DataCarrier instances), with multiple {@link
+ * MultipleChannelsConsumer}s.
+ * <p>
+ * In typical case, the number of {@link MultipleChannelsConsumer} should be less than the number of channels.
+ */
+public class BulkConsumePool implements ConsumerPool {
+    private List<MultipleChannelsConsumer> allConsumers;
+    private volatile boolean isStarted = false;
+
+    public BulkConsumePool(String name, int size, long consumeCycle) {
+        size = EnvUtil.getInt(name + "_THREAD", size);
+        allConsumers = new ArrayList<MultipleChannelsConsumer>(size);
+        for (int i = 0; i < size; i++) {
+            MultipleChannelsConsumer multipleChannelsConsumer = new MultipleChannelsConsumer("DataCarrier." + name + ".BulkConsumePool." + i + ".Thread", consumeCycle);
+            multipleChannelsConsumer.setDaemon(true);
+            allConsumers.add(multipleChannelsConsumer);
+        }
+    }
+
+    @Override
+    synchronized public void add(String name, Channels channels, IConsumer consumer) {
+        MultipleChannelsConsumer multipleChannelsConsumer = getLowestPayload();
+        multipleChannelsConsumer.addNewTarget(channels, consumer);
+    }
+
+    /**
+     * Get the lowest payload consumer thread based on current allocate status.
+     *
+     * @return the lowest consumer.
+     */
+    private MultipleChannelsConsumer getLowestPayload() {
+        MultipleChannelsConsumer winner = allConsumers.get(0);
+        for (int i = 1; i < allConsumers.size(); i++) {
+            MultipleChannelsConsumer option = allConsumers.get(i);
+            if (option.size() < winner.size()) {
+                winner = option;
+            }
+        }
+        return winner;
+    }
+
+    /**
+     *
+     */
+    @Override
+    public boolean isRunning(Channels channels) {
+        return isStarted;
+    }
+
+    @Override
+    public void close(Channels channels) {
+        for (MultipleChannelsConsumer consumer : allConsumers) {
+            consumer.shutdown();
+        }
+    }
+
+    @Override
+    public void begin(Channels channels) {
+        if (isStarted) {
+            return;
+        }
+        for (MultipleChannelsConsumer consumer : allConsumers) {
+            consumer.start();
+        }
+        isStarted = true;
+    }
+
+    /**
+     * The creator for {@link BulkConsumePool}.
+     */
+    public static class Creator implements Callable<ConsumerPool> {
+        private String name;
+        private int size;
+        private long consumeCycle;
+
+        public Creator(String name, int poolSize, long consumeCycle) {
+            this.name = name;
+            this.size = poolSize;
+            this.consumeCycle = consumeCycle;
+        }
+
+        @Override
+        public ConsumerPool call() {
+            return new BulkConsumePool(name, size, consumeCycle);
+        }
+
+        public static int recommendMaxSize() {
+            return Runtime.getRuntime().availableProcessors() * 2;
+        }
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumeDriver.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumeDriver.java
new file mode 100644
index 0000000..4535a95
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumeDriver.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Properties;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Channels;
+
+/**
+ * Pool of consumers <p> Created by wusheng on 2016/10/25.
+ */
+public class ConsumeDriver<T> implements IDriver {
+    private boolean running;
+    private ConsumerThread[] consumerThreads;
+    private Channels<T> channels;
+    private ReentrantLock lock;
+
+    public ConsumeDriver(String name,
+                         Channels<T> channels, Class<? extends IConsumer<T>> consumerClass,
+                         int num,
+                         long consumeCycle,
+                         Properties properties) {
+        this(channels, num);
+        for (int i = 0; i < num; i++) {
+            consumerThreads[i] = new ConsumerThread(
+                "DataCarrier." + name + ".Consumer." + i + ".Thread", getNewConsumerInstance(consumerClass, properties),
+                consumeCycle
+            );
+            consumerThreads[i].setDaemon(true);
+        }
+    }
+
+    public ConsumeDriver(String name, Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) {
+        this(channels, num);
+        prototype.init(new Properties());
+        for (int i = 0; i < num; i++) {
+            consumerThreads[i] = new ConsumerThread(
+                "DataCarrier." + name + ".Consumer." + i + ".Thread", prototype, consumeCycle);
+            consumerThreads[i].setDaemon(true);
+        }
+
+    }
+
+    private ConsumeDriver(Channels<T> channels, int num) {
+        running = false;
+        this.channels = channels;
+        consumerThreads = new ConsumerThread[num];
+        lock = new ReentrantLock();
+    }
+
+    private IConsumer<T> getNewConsumerInstance(Class<? extends IConsumer<T>> consumerClass, Properties properties) {
+        try {
+            IConsumer<T> inst = consumerClass.getDeclaredConstructor().newInstance();
+            inst.init(properties);
+            return inst;
+        } catch (InstantiationException e) {
+            throw new ConsumerCannotBeCreatedException(e);
+        } catch (IllegalAccessException e) {
+            throw new ConsumerCannotBeCreatedException(e);
+        } catch (NoSuchMethodException e) {
+            throw new ConsumerCannotBeCreatedException(e);
+        } catch (InvocationTargetException e) {
+            throw new ConsumerCannotBeCreatedException(e);
+        }
+    }
+
+    @Override
+    public void begin(Channels channels) {
+        if (running) {
+            return;
+        }
+        lock.lock();
+        try {
+            this.allocateBuffer2Thread();
+            for (ConsumerThread consumerThread : consumerThreads) {
+                consumerThread.start();
+            }
+            running = true;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public boolean isRunning(Channels channels) {
+        return running;
+    }
+
+    private void allocateBuffer2Thread() {
+        int channelSize = this.channels.getChannelSize();
+        /**
+         * if consumerThreads.length < channelSize
+         * each consumer will process several channels.
+         *
+         * if consumerThreads.length == channelSize
+         * each consumer will process one channel.
+         *
+         * if consumerThreads.length > channelSize
+         * there will be some threads do nothing.
+         */
+        for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {
+            int consumerIndex = channelIndex % consumerThreads.length;
+            consumerThreads[consumerIndex].addDataSource(channels.getBuffer(channelIndex));
+        }
+
+    }
+
+    @Override
+    public void close(Channels channels) {
+        lock.lock();
+        try {
+            this.running = false;
+            for (ConsumerThread consumerThread : consumerThreads) {
+                consumerThread.shutdown();
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerCannotBeCreatedException.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerCannotBeCreatedException.java
new file mode 100644
index 0000000..0d0c8de
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerCannotBeCreatedException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
+
+public class ConsumerCannotBeCreatedException extends RuntimeException {
+    ConsumerCannotBeCreatedException(Throwable t) {
+        super(t);
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerPool.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerPool.java
new file mode 100644
index 0000000..fd93dfb
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerPool.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
+
+import org.apache.skywalking.banyandb.commons.datacarrier.DataCarrier;
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Channels;
+
+/**
+ * The Consumer pool could support data consumer from multiple {@link DataCarrier}s, by using different consume thread
+ * management models.
+ */
+public interface ConsumerPool extends IDriver {
+    void add(String name, Channels channels, IConsumer consumer);
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerPoolFactory.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerPoolFactory.java
new file mode 100644
index 0000000..8eb1581
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerPoolFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+/**
+ * Consumer Pool Factory provides global management for all Consumer Pool.
+ */
+public enum ConsumerPoolFactory {
+    INSTANCE;
+
+    private final Map<String, ConsumerPool> pools;
+
+    ConsumerPoolFactory() {
+        pools = new HashMap<>();
+    }
+
+    public synchronized boolean createIfAbsent(String poolName, Callable<ConsumerPool> creator) throws Exception {
+        if (pools.containsKey(poolName)) {
+            return false;
+        } else {
+            pools.put(poolName, creator.call());
+            return true;
+        }
+    }
+
+    public ConsumerPool get(String poolName) {
+        return pools.get(poolName);
+    }
+
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerThread.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerThread.java
new file mode 100644
index 0000000..b5f5478
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerThread.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Buffer;
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.QueueBuffer;
+
+public class ConsumerThread<T> extends Thread {
+    private volatile boolean running;
+    private IConsumer<T> consumer;
+    private List<DataSource> dataSources;
+    private long consumeCycle;
+
+    ConsumerThread(String threadName, IConsumer<T> consumer, long consumeCycle) {
+        super(threadName);
+        this.consumer = consumer;
+        running = false;
+        dataSources = new ArrayList<DataSource>(1);
+        this.consumeCycle = consumeCycle;
+    }
+
+    /**
+     * add whole buffer to consume
+     */
+    void addDataSource(QueueBuffer<T> sourceBuffer) {
+        this.dataSources.add(new DataSource(sourceBuffer));
+    }
+
+    @Override
+    public void run() {
+        running = true;
+
+        final List<T> consumeList = new ArrayList<T>(1500);
+        while (running) {
+            if (!consume(consumeList)) {
+                try {
+                    Thread.sleep(consumeCycle);
+                } catch (InterruptedException e) {
+                }
+            }
+        }
+
+        // consumer thread is going to stop
+        // consume the last time
+        consume(consumeList);
+
+        consumer.onExit();
+    }
+
+    private boolean consume(List<T> consumeList) {
+        for (DataSource dataSource : dataSources) {
+            dataSource.obtain(consumeList);
+        }
+
+        if (!consumeList.isEmpty()) {
+            try {
+                consumer.consume(consumeList);
+            } catch (Throwable t) {
+                consumer.onError(consumeList, t);
+            } finally {
+                consumeList.clear();
+            }
+            return true;
+        }
+        consumer.nothingToConsume();
+        return false;
+    }
+
+    void shutdown() {
+        running = false;
+    }
+
+    /**
+     * DataSource is a refer to {@link Buffer}.
+     */
+    class DataSource {
+        private QueueBuffer<T> sourceBuffer;
+
+        DataSource(QueueBuffer<T> sourceBuffer) {
+            this.sourceBuffer = sourceBuffer;
+        }
+
+        void obtain(List<T> consumeList) {
+            sourceBuffer.obtain(consumeList);
+        }
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/IConsumer.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/IConsumer.java
new file mode 100644
index 0000000..9d965e1
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/IConsumer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
+
+import java.util.List;
+import java.util.Properties;
+
+public interface IConsumer<T> {
+    void init(final Properties properties);
+
+    void consume(List<T> data);
+
+    void onError(List<T> data, Throwable t);
+
+    void onExit();
+
+    /**
+     * Notify the implementation, if there is nothing fetched from the queue. This could be used as a timer to trigger
+     * reaction if the queue has no element.
+     */
+    default void nothingToConsume() {
+        return;
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/IDriver.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/IDriver.java
new file mode 100644
index 0000000..7011c4e
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/IDriver.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
+
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Channels;
+
+/**
+ * The driver of consumer.
+ */
+public interface IDriver {
+    boolean isRunning(Channels channels);
+
+    void close(Channels channels);
+
+    void begin(Channels channels);
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/MultipleChannelsConsumer.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/MultipleChannelsConsumer.java
new file mode 100644
index 0000000..19969fa
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/MultipleChannelsConsumer.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Channels;
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.QueueBuffer;
+
+/**
+ * MultipleChannelsConsumer represent a single consumer thread, but support multiple channels with their {@link
+ * IConsumer}s
+ */
+public class MultipleChannelsConsumer extends Thread {
+    private volatile boolean running;
+    private volatile ArrayList<Group> consumeTargets;
+    @SuppressWarnings("NonAtomicVolatileUpdate")
+    private volatile long size;
+    private final long consumeCycle;
+
+    public MultipleChannelsConsumer(String threadName, long consumeCycle) {
+        super(threadName);
+        this.consumeTargets = new ArrayList<Group>();
+        this.consumeCycle = consumeCycle;
+    }
+
+    @Override
+    public void run() {
+        running = true;
+
+        final List consumeList = new ArrayList(2000);
+        while (running) {
+            boolean hasData = false;
+            for (Group target : consumeTargets) {
+                boolean consume = consume(target, consumeList);
+                hasData = hasData || consume;
+            }
+
+            if (!hasData) {
+                try {
+                    Thread.sleep(consumeCycle);
+                } catch (InterruptedException e) {
+                }
+            }
+        }
+
+        // consumer thread is going to stop
+        // consume the last time
+        for (Group target : consumeTargets) {
+            consume(target, consumeList);
+
+            target.consumer.onExit();
+        }
+    }
+
+    private boolean consume(Group target, List consumeList) {
+        for (int i = 0; i < target.channels.getChannelSize(); i++) {
+            QueueBuffer buffer = target.channels.getBuffer(i);
+            buffer.obtain(consumeList);
+        }
+
+        if (!consumeList.isEmpty()) {
+            try {
+                target.consumer.consume(consumeList);
+            } catch (Throwable t) {
+                target.consumer.onError(consumeList, t);
+            } finally {
+                consumeList.clear();
+            }
+            return true;
+        }
+        target.consumer.nothingToConsume();
+        return false;
+    }
+
+    /**
+     * Add a new target channels.
+     */
+    public void addNewTarget(Channels channels, IConsumer consumer) {
+        Group group = new Group(channels, consumer);
+        // Recreate the new list to avoid change list while the list is used in consuming.
+        ArrayList<Group> newList = new ArrayList<Group>();
+        for (Group target : consumeTargets) {
+            newList.add(target);
+        }
+        newList.add(group);
+        consumeTargets = newList;
+        size += channels.size();
+    }
+
+    public long size() {
+        return size;
+    }
+
+    void shutdown() {
+        running = false;
+    }
+
+    private static class Group {
+        private Channels channels;
+        private IConsumer consumer;
+
+        public Group(Channels channels, IConsumer consumer) {
+            this.channels = channels;
+            this.consumer = consumer;
+        }
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/IDataPartitioner.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/IDataPartitioner.java
new file mode 100644
index 0000000..4b6c576
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/IDataPartitioner.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.partition;
+
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.BufferStrategy;
+
+public interface IDataPartitioner<T> {
+    int partition(int total, T data);
+
+    /**
+     * @return an integer represents how many times should retry when {@link BufferStrategy#IF_POSSIBLE}.
+     * <p>
+     * Less or equal 1, means not support retry.
+     */
+    int maxRetryCount();
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/ProducerThreadPartitioner.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/ProducerThreadPartitioner.java
new file mode 100644
index 0000000..b83fd1f
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/ProducerThreadPartitioner.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.partition;
+
+/**
+ * use threadid % total to partition
+ */
+public class ProducerThreadPartitioner<T> implements IDataPartitioner<T> {
+    public ProducerThreadPartitioner() {
+    }
+
+    @Override
+    public int partition(int total, T data) {
+        return (int) Thread.currentThread().getId() % total;
+    }
+
+    @Override
+    public int maxRetryCount() {
+        return 1;
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/SimpleRollingPartitioner.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/SimpleRollingPartitioner.java
new file mode 100644
index 0000000..e22f92f
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/SimpleRollingPartitioner.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.partition;
+
+/**
+ * use normal int to rolling.
+ */
+public class SimpleRollingPartitioner<T> implements IDataPartitioner<T> {
+    @SuppressWarnings("NonAtomicVolatileUpdate")
+    private volatile int i = 0;
+
+    @Override
+    public int partition(int total, T data) {
+        return Math.abs(i++ % total);
+    }
+
+    @Override
+    public int maxRetryCount() {
+        return 3;
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
new file mode 100644
index 0000000..ea12319
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.NameResolverRegistry;
+import io.grpc.internal.DnsNameResolverProvider;
+import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+import org.apache.skywalking.banyandb.v1.trace.TraceServiceGrpc;
+
+/**
+ * BanyanDBClient represents a client instance interacting with BanyanDB server. This is built on the top of BanyanDB v1
+ * gRPC APIs.
+ */
+@Slf4j
+public class BanyanDBClient implements Closeable {
+    /**
+     * The hostname of BanyanDB server.
+     */
+    private final String host;
+    /**
+     * The port of BanyanDB server.
+     */
+    private final int port;
+    /**
+     * The instance name.
+     */
+    private final String group;
+    /**
+     * Options for server connection.
+     */
+    private Options options;
+    /**
+     * Managed gRPC connection.
+     */
+    private volatile ManagedChannel managedChannel;
+    /**
+     * gRPC client stub
+     */
+    private volatile TraceServiceGrpc.TraceServiceStub traceServiceStub;
+    /**
+     * gRPC blocking stub.
+     */
+    private volatile TraceServiceGrpc.TraceServiceBlockingStub traceServiceBlockingStub;
+    /**
+     * The connection status.
+     */
+    private volatile boolean isConnected = false;
+    /**
+     * A lock to control the race condition in establishing and disconnecting network connection.
+     */
+    private volatile ReentrantLock connectionEstablishLock;
+
+    /**
+     * Create a BanyanDB client instance
+     *
+     * @param host  IP or domain name
+     * @param port  Server port
+     * @param group Database instance name
+     */
+    public BanyanDBClient(final String host, final int port, final String group) {
+        this(host, port, group, new Options());
+    }
+
+    /**
+     * Create a BanyanDB client instance with custom options
+     *
+     * @param host    IP or domain name
+     * @param port    Server port
+     * @param group   Database instance name
+     * @param options for database connection
+     */
+    public BanyanDBClient(final String host,
+                          final int port,
+                          final String group,
+                          final Options options) {
+        this.host = host;
+        this.port = port;
+        this.group = group;
+        this.options = options;
+        this.connectionEstablishLock = new ReentrantLock();
+
+        NameResolverRegistry.getDefaultRegistry().register(new DnsNameResolverProvider());
+    }
+
+    /**
+     * Connect to the server.
+     *
+     * @throws RuntimeException if server is not reachable.
+     */
+    public void connect() {
+        connectionEstablishLock.lock();
+        try {
+            if (!isConnected) {
+                final ManagedChannelBuilder<?> nettyChannelBuilder = NettyChannelBuilder.forAddress(host, port).usePlaintext();
+                nettyChannelBuilder.maxInboundMessageSize(options.getMaxInboundMessageSize());
+
+                managedChannel = nettyChannelBuilder.build();
+                traceServiceStub = TraceServiceGrpc.newStub(managedChannel);
+                traceServiceBlockingStub = TraceServiceGrpc.newBlockingStub(
+                        managedChannel);
+                isConnected = true;
+            }
+        } finally {
+            connectionEstablishLock.unlock();
+        }
+    }
+
+    /**
+     * Connect to the mock server.
+     * Created for testing purpose.
+     *
+     * @param channel the channel used for communication.
+     *                For tests, it is normally an in-process channel.
+     */
+    void connect(ManagedChannel channel) {
+        connectionEstablishLock.lock();
+        try {
+            if (!isConnected) {
+                traceServiceStub = TraceServiceGrpc.newStub(channel);
+                traceServiceBlockingStub = TraceServiceGrpc.newBlockingStub(
+                        channel);
+                isConnected = true;
+            }
+        } finally {
+            connectionEstablishLock.unlock();
+        }
+    }
+
+    /**
+     * Create a build process for trace write.
+     *
+     * @param maxBulkSize   the max bulk size for the flush operation
+     * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger
+     *                      automatically. Unit is second
+     * @param concurrency   the number of concurrency would run for the flush max
+     * @return trace bulk write processor
+     */
+    public TraceBulkWriteProcessor buildTraceWriteProcessor(int maxBulkSize, int flushInterval, int concurrency) {
+        return new TraceBulkWriteProcessor(group, traceServiceStub, maxBulkSize, flushInterval, concurrency);
+    }
+
+    /**
+     * Query trace according to given conditions
+     *
+     * @param traceQuery condition for query
+     * @return hint traces.
+     */
+    public TraceQueryResponse queryTraces(TraceQuery traceQuery) {
+        final BanyandbTrace.QueryResponse response = traceServiceBlockingStub
+                .withDeadlineAfter(options.getDeadline(), TimeUnit.SECONDS)
+                .query(traceQuery.build(group));
+        return new TraceQueryResponse(response);
+    }
+
+    @Override
+    public void close() throws IOException {
+        connectionEstablishLock.lock();
+        try {
+            if (isConnected) {
+                this.managedChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+                isConnected = false;
+            }
+        } catch (InterruptedException interruptedException) {
+            log.warn("fail to wait for channel termination, shutdown now!", interruptedException);
+            this.managedChannel.shutdownNow();
+            isConnected = false;
+        } finally {
+            connectionEstablishLock.unlock();
+        }
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java
new file mode 100644
index 0000000..2fda151
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.skywalking.banyandb.commons.datacarrier.DataCarrier;
+import org.apache.skywalking.banyandb.commons.datacarrier.consumer.IConsumer;
+
+/**
+ * BulkWriteProcessor is a timeline and size dual driven processor.
+ * <p>
+ * It includes an internal queue and timer, and accept the data sequentially. With the given thresholds of time and
+ * size, it could activate flush to continue the process to the next step.
+ */
+public abstract class BulkWriteProcessor implements Closeable {
+    protected final int flushInterval;
+    protected DataCarrier buffer;
+
+    /**
+     * Create the processor.
+     *
+     * @param maxBulkSize   the max bulk size for the flush operation
+     * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger
+     *                      automatically. Unit is second.
+     * @param concurrency   the number of concurrency would run for the flush max.
+     */
+    protected BulkWriteProcessor(String processorName, int maxBulkSize, int flushInterval, int concurrency) {
+        this.flushInterval = flushInterval;
+        this.buffer = new DataCarrier(processorName, maxBulkSize, concurrency);
+        Properties properties = new Properties();
+        properties.put("maxBulkSize", maxBulkSize);
+        properties.put("flushInterval", flushInterval);
+        properties.put("BulkWriteProcessor", this);
+        buffer.consume(QueueWatcher.class, concurrency, 20, properties);
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.buffer.shutdownConsumers();
+    }
+
+    /**
+     * The internal queue consumer for build process.
+     */
+    public static class QueueWatcher implements IConsumer {
+        private long lastFlushTimestamp;
+        private int maxBulkSize;
+        private int flushInterval;
+        private List cachedData;
+        private BulkWriteProcessor bulkWriteProcessor;
+
+        public QueueWatcher() {
+        }
+
+        @Override
+        public void init(Properties properties) {
+            lastFlushTimestamp = System.currentTimeMillis();
+            maxBulkSize = (Integer) properties.get("maxBulkSize");
+            flushInterval = (Integer) properties.get("flushInterval") * 1000;
+            cachedData = new ArrayList(maxBulkSize);
+            bulkWriteProcessor = (BulkWriteProcessor) properties.get("BulkWriteProcessor");
+        }
+
+        @Override
+        public void consume(final List data) {
+            if (data.size() >= maxBulkSize) {
+                // The data#size actually wouldn't over the maxBulkSize due to the DataCarrier channel's max size.
+                // This is just to preventing unexpected case and avoid confusion about dropping into else section.
+                bulkWriteProcessor.flush(data);
+                lastFlushTimestamp = System.currentTimeMillis();
+            } else {
+                data.forEach(element -> {
+                    cachedData.add(element);
+                    if (cachedData.size() >= maxBulkSize) {
+                        // Flush and re-init.
+                        bulkWriteProcessor.flush(cachedData);
+                        cachedData = new ArrayList(maxBulkSize);
+                        lastFlushTimestamp = System.currentTimeMillis();
+                    }
+                });
+            }
+        }
+
+        @Override
+        public void onError(final List data, final Throwable t) {
+
+        }
+
+        @Override
+        public void onExit() {
+
+        }
+
+        @Override
+        public void nothingToConsume() {
+            if (System.currentTimeMillis() - lastFlushTimestamp > flushInterval) {
+                bulkWriteProcessor.flush(cachedData);
+                cachedData = new ArrayList(maxBulkSize);
+                lastFlushTimestamp = System.currentTimeMillis();
+            }
+        }
+    }
+
+    /**
+     * @param data to be flush.
+     */
+    protected abstract void flush(List data);
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/Field.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/Field.java
new file mode 100644
index 0000000..cf2c04b
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/Field.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import java.util.List;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+
+import static com.google.protobuf.NullValue.NULL_VALUE;
+
+/**
+ * Field represents a value of column/field in the write-op or response.
+ */
+@EqualsAndHashCode
+public abstract class Field<T> {
+    @Getter
+    protected final T value;
+
+    protected Field(T value) {
+        this.value = value;
+    }
+
+    /**
+     * NullField is a value which can be converted to {@link com.google.protobuf.NullValue}.
+     * Users should use the singleton instead of create a new instance everytime.
+     */
+    public static class NullField extends Field<Object> implements SerializableField {
+        private static final NullField INSTANCE = new NullField();
+
+        private NullField() {
+            super(null);
+        }
+
+        @Override
+        public Banyandb.Field toField() {
+            return Banyandb.Field.newBuilder().setNull(NULL_VALUE).build();
+        }
+    }
+
+    /**
+     * The value of a String type field.
+     */
+    public static class StringField extends Field<String> implements SerializableField {
+        private StringField(String value) {
+            super(value);
+        }
+
+        @Override
+        public Banyandb.Field toField() {
+            return Banyandb.Field.newBuilder().setStr(Banyandb.Str.newBuilder().setValue(value)).build();
+        }
+    }
+
+    /**
+     * The value of a String array type field.
+     */
+    public static class StringArrayField extends Field<List<String>> implements SerializableField {
+        private StringArrayField(List<String> value) {
+            super(value);
+        }
+
+        @Override
+        public Banyandb.Field toField() {
+            return Banyandb.Field.newBuilder().setStrArray(Banyandb.StrArray.newBuilder().addAllValue(value)).build();
+        }
+    }
+
+    /**
+     * The value of an int64(Long) type field.
+     */
+    public static class LongField extends Field<Long> implements SerializableField {
+        private LongField(Long value) {
+            super(value);
+        }
+
+        @Override
+        public Banyandb.Field toField() {
+            return Banyandb.Field.newBuilder().setInt(Banyandb.Int.newBuilder().setValue(value)).build();
+        }
+    }
+
+    /**
+     * The value of an int64(Long) array type field.
+     */
+    public static class LongArrayField extends Field<List<Long>> implements SerializableField {
+        private LongArrayField(List<Long> value) {
+            super(value);
+        }
+
+        @Override
+        public Banyandb.Field toField() {
+            return Banyandb.Field.newBuilder().setIntArray(Banyandb.IntArray.newBuilder().addAllValue(value)).build();
+        }
+    }
+
+    /**
+     * Construct a string field
+     *
+     * @param val payload
+     * @return Anonymous field with String payload
+     */
+    public static SerializableField stringField(String val) {
+        return new StringField(val);
+    }
+
+    /**
+     * Construct a numeric field
+     *
+     * @param val payload
+     * @return Anonymous field with numeric payload
+     */
+    public static SerializableField longField(long val) {
+        return new LongField(val);
+    }
+
+    /**
+     * Construct a string array field
+     *
+     * @param val payload
+     * @return Anonymous field with string array payload
+     */
+    public static SerializableField stringArrayField(List<String> val) {
+        return new StringArrayField(val);
+    }
+
+    /**
+     * Construct a long array field
+     *
+     * @param val payload
+     * @return Anonymous field with numeric array payload
+     */
+    public static SerializableField longArrayField(List<Long> val) {
+        return new LongArrayField(val);
+    }
+
+    public static SerializableField nullField() {
+        return NullField.INSTANCE;
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/FieldAndValue.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/FieldAndValue.java
new file mode 100644
index 0000000..5182544
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/FieldAndValue.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import java.util.List;
+
+import lombok.EqualsAndHashCode;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+
+/**
+ * FieldAndValue represents a value of column in the response
+ */
+@EqualsAndHashCode(callSuper = true)
+public abstract class FieldAndValue<T> extends Field<T> {
+    protected final String fieldName;
+
+    protected FieldAndValue(String fieldName, T value) {
+        super(value);
+        this.fieldName = fieldName;
+    }
+
+    /**
+     * @return field name
+     */
+    public String getFieldName() {
+        return this.fieldName;
+    }
+
+    /**
+     * @return true if value is null;
+     */
+    public boolean isNull() {
+        return this.value == null;
+    }
+
+    static FieldAndValue<?> build(Banyandb.TypedPair typedPair) {
+        if (typedPair.hasNullPair()) {
+            switch (typedPair.getNullPair().getType()) {
+                case FIELD_TYPE_INT:
+                    return new LongFieldPair(typedPair.getKey(), null);
+                case FIELD_TYPE_INT_ARRAY:
+                    return new LongArrayFieldPair(typedPair.getKey(), null);
+                case FIELD_TYPE_STRING:
+                    return new StringFieldPair(typedPair.getKey(), null);
+                case FIELD_TYPE_STRING_ARRAY:
+                    return new StringArrayFieldPair(typedPair.getKey(), null);
+                default:
+                    throw new IllegalArgumentException("Unrecognized NullType, " + typedPair.getNullPair().getType());
+            }
+        } else if (typedPair.hasIntPair()) {
+            return new LongFieldPair(typedPair.getKey(), typedPair.getIntPair().getValue());
+        } else if (typedPair.hasStrPair()) {
+            return new StringFieldPair(typedPair.getKey(), typedPair.getStrPair().getValue());
+        } else if (typedPair.hasIntArrayPair()) {
+            return new LongArrayFieldPair(typedPair.getKey(), typedPair.getIntArrayPair().getValueList());
+        } else if (typedPair.hasStrArrayPair()) {
+            return new StringArrayFieldPair(typedPair.getKey(), typedPair.getStrArrayPair().getValueList());
+        }
+        throw new IllegalArgumentException("Unrecognized TypedPair, " + typedPair);
+    }
+
+    public static class StringFieldPair extends FieldAndValue<String> {
+        StringFieldPair(final String fieldName, final String value) {
+            super(fieldName, value);
+        }
+    }
+
+    public static class StringArrayFieldPair extends FieldAndValue<List<String>> {
+        StringArrayFieldPair(final String fieldName, final List<String> value) {
+            super(fieldName, value);
+        }
+    }
+
+    public static class LongFieldPair extends FieldAndValue<Long> {
+        LongFieldPair(final String fieldName, final Long value) {
+            super(fieldName, value);
+        }
+    }
+
+    public static class LongArrayFieldPair extends FieldAndValue<List<Long>> {
+        LongArrayFieldPair(final String fieldName, final List<Long> value) {
+            super(fieldName, value);
+        }
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
new file mode 100644
index 0000000..89b8038
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * Client connection options.
+ */
+@Setter
+@Getter(AccessLevel.PACKAGE)
+public class Options {
+    /**
+     * Max inbound message size
+     */
+    private int maxInboundMessageSize = 1024 * 1024 * 50;
+    /**
+     * Threshold of gRPC blocking query, unit is second
+     */
+    private int deadline = 30;
+
+    Options() {
+    }
+
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java
new file mode 100644
index 0000000..78f714f
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import org.apache.skywalking.banyandb.v1.Banyandb;
+
+import java.util.List;
+
+/**
+ * PairQuery represents a query condition, including field name, operator, and value(s);
+ */
+public abstract class PairQueryCondition<T> extends FieldAndValue<T> {
+    protected final Banyandb.PairQuery.BinaryOp op;
+
+    private PairQueryCondition(String fieldName, Banyandb.PairQuery.BinaryOp op, T value) {
+        super(fieldName, value);
+        this.op = op;
+    }
+
+    Banyandb.PairQuery build() {
+        return Banyandb.PairQuery.newBuilder()
+                .setOp(this.op)
+                .setCondition(buildTypedPair()).build();
+    }
+
+    /**
+     * The various implementations should build different TypedPair
+     *
+     * @return typedPair to be included
+     */
+    protected abstract Banyandb.TypedPair buildTypedPair();
+
+    /**
+     * LongQueryCondition represents `Field(Long) $op value` condition.
+     */
+    public static class LongQueryCondition extends PairQueryCondition<Long> {
+        private LongQueryCondition(String fieldName, Banyandb.PairQuery.BinaryOp op, Long value) {
+            super(fieldName, op, value);
+        }
+
+        @Override
+        protected Banyandb.TypedPair buildTypedPair() {
+            return Banyandb.TypedPair.newBuilder()
+                    .setKey(fieldName)
+                    .setIntPair(Banyandb.Int.newBuilder()
+                            .setValue(value)).build();
+        }
+
+        /**
+         * Build a query condition for {@link Long} type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_EQ} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `Long == value`
+         */
+        public static PairQueryCondition<Long> eq(String fieldName, Long val) {
+            return new LongQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_EQ, val);
+        }
+
+        /**
+         * Build a query condition for {@link Long} type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_NE} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `Long != value`
+         */
+        public static PairQueryCondition<Long> ne(String fieldName, Long val) {
+            return new LongQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_NE, val);
+        }
+
+        /**
+         * Build a query condition for {@link Long} type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_GT} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `Long &gt; value`
+         */
+        public static PairQueryCondition<Long> gt(String fieldName, Long val) {
+            return new LongQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_GT, val);
+        }
+
+        /**
+         * Build a query condition for {@link Long} type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_GE} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `Long &ge; value`
+         */
+        public static PairQueryCondition<Long> ge(String fieldName, Long val) {
+            return new LongQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_GE, val);
+        }
+
+        /**
+         * Build a query condition for {@link Long} type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_LT} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `Long &lt; value`
+         */
+        public static PairQueryCondition<Long> lt(String fieldName, Long val) {
+            return new LongQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_LT, val);
+        }
+
+        /**
+         * Build a query condition for {@link Long} type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_LE} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `Long &le; value`
+         */
+        public static PairQueryCondition<Long> le(String fieldName, Long val) {
+            return new LongQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_LE, val);
+        }
+    }
+
+    /**
+     * StringQueryCondition represents `Field(String) $op value` condition.
+     */
+    public static class StringQueryCondition extends PairQueryCondition<String> {
+        private StringQueryCondition(String fieldName, Banyandb.PairQuery.BinaryOp op, String value) {
+            super(fieldName, op, value);
+        }
+
+        @Override
+        protected Banyandb.TypedPair buildTypedPair() {
+            return Banyandb.TypedPair.newBuilder()
+                    .setKey(fieldName)
+                    .setStrPair(Banyandb.Str.newBuilder().setValue(value)).build();
+        }
+
+        /**
+         * Build a query condition for {@link String} type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_EQ} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `String == value`
+         */
+        public static PairQueryCondition<String> eq(String fieldName, String val) {
+            return new StringQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_EQ, val);
+        }
+
+        /**
+         * Build a query condition for {@link String} type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_NE} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `String != value`
+         */
+        public static PairQueryCondition<String> ne(String fieldName, String val) {
+            return new StringQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_NE, val);
+        }
+    }
+
+    /**
+     * StringArrayQueryCondition represents `Field(List of String) $op value` condition.
+     */
+    public static class StringArrayQueryCondition extends PairQueryCondition<List<String>> {
+        private StringArrayQueryCondition(String fieldName, Banyandb.PairQuery.BinaryOp op, List<String> value) {
+            super(fieldName, op, value);
+        }
+
+        @Override
+        protected Banyandb.TypedPair buildTypedPair() {
+            return Banyandb.TypedPair.newBuilder()
+                    .setKey(fieldName)
+                    .setStrArrayPair(Banyandb.StrArray.newBuilder()
+                            .addAllValue(value)).build();
+        }
+
+        /**
+         * Build a query condition for {@link List} of {@link String} as the type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_EQ} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `[String] == values`
+         */
+        public static PairQueryCondition<List<String>> eq(String fieldName, List<String> val) {
+            return new StringArrayQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_EQ, val);
+        }
+
+        /**
+         * Build a query condition for {@link List} of {@link String} as the type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_NE} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `[String] != values`
+         */
+        public static PairQueryCondition<List<String>> ne(String fieldName, List<String> val) {
+            return new StringArrayQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_NE, val);
+        }
+
+        /**
+         * Build a query condition for {@link List} of {@link String} as the type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_HAVING} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `[String] having values`
+         */
+        public static PairQueryCondition<List<String>> having(String fieldName, List<String> val) {
+            return new StringArrayQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_HAVING, val);
+        }
+
+        /**
+         * Build a query condition for {@link List} of {@link String} as the type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_NOT_HAVING} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `[String] not having values`
+         */
+        public static PairQueryCondition<List<String>> notHaving(String fieldName, List<String> val) {
+            return new StringArrayQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_NOT_HAVING, val);
+        }
+    }
+
+    /**
+     * LongArrayQueryCondition represents `Field(List of Long) $op value` condition.
+     */
+    public static class LongArrayQueryCondition extends PairQueryCondition<List<Long>> {
+        private LongArrayQueryCondition(String fieldName, Banyandb.PairQuery.BinaryOp op, List<Long> value) {
+            super(fieldName, op, value);
+        }
+
+        @Override
+        protected Banyandb.TypedPair buildTypedPair() {
+            return Banyandb.TypedPair.newBuilder()
+                    .setKey(fieldName)
+                    .setIntArrayPair(Banyandb.IntArray.newBuilder()
+                            .addAllValue(value)).build();
+        }
+
+        /**
+         * Build a query condition for {@link List} of {@link Long} as the type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_EQ} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `[Long] == value`
+         */
+        public static PairQueryCondition<List<Long>> eq(String fieldName, List<Long> val) {
+            return new LongArrayQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_EQ, val);
+        }
+
+        /**
+         * Build a query condition for {@link List} of {@link Long} as the type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_NE} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `[Long] != value`
+         */
+        public static PairQueryCondition<List<Long>> ne(String fieldName, List<Long> val) {
+            return new LongArrayQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_NE, val);
+        }
+
+        /**
+         * Build a query condition for {@link List} of {@link Long} as the type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_HAVING} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `[Long] having values`
+         */
+        public static PairQueryCondition<List<Long>> having(String fieldName, List<Long> val) {
+            return new LongArrayQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_HAVING, val);
+        }
+
+        /**
+         * Build a query condition for {@link List} of {@link Long} as the type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_NOT_HAVING} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `[Long] not having values`
+         */
+        public static PairQueryCondition<List<Long>> notHaving(String fieldName, List<Long> val) {
+            return new LongArrayQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_NOT_HAVING, val);
+        }
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/RowEntity.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/RowEntity.java
new file mode 100644
index 0000000..6b9f7ad
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/RowEntity.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import java.util.ArrayList;
+import java.util.List;
+import lombok.Getter;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+
+/**
+ * RowEntity represents an entity of BanyanDB entity.
+ */
+@Getter
+public class RowEntity {
+    /**
+     * identity of the entity.
+     * For a trace entity, it is the spanID of a Span or the segmentId of a segment in Skywalking.
+     */
+    private final String id;
+
+    /**
+     * timestamp of the entity in the timeunit of milliseconds.
+     */
+    private final long timestamp;
+
+    /**
+     * binary part of the entity
+     */
+    private final byte[] binary;
+
+    /**
+     * fields are indexed-fields that are searchable in BanyanBD
+     */
+    private final List<FieldAndValue<?>> fields;
+
+    RowEntity(BanyandbTrace.Entity entity) {
+        id = entity.getEntityId();
+        timestamp = entity.getTimestamp().getSeconds() * 1000 + entity.getTimestamp().getNanos() / 1_000_000;
+        binary = entity.getDataBinary().toByteArray();
+        fields = new ArrayList<>(entity.getFieldsCount());
+        entity.getFieldsList().forEach(field -> fields.add(FieldAndValue.build(field)));
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/SerializableField.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/SerializableField.java
new file mode 100644
index 0000000..2deef2d
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/SerializableField.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import org.apache.skywalking.banyandb.v1.Banyandb;
+
+/**
+ * An interface that represents an object which can be converted to the protobuf representation
+ * of BanyanDB.Field. BanyanDB.Field is used for writing entities to the database.
+ */
+public interface SerializableField {
+    Banyandb.Field toField();
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/TimestampRange.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/TimestampRange.java
new file mode 100644
index 0000000..e1f1f14
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/TimestampRange.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import com.google.protobuf.Timestamp;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+
+@RequiredArgsConstructor
+@Getter(AccessLevel.PROTECTED)
+public class TimestampRange {
+    /**
+     * start timestamp in timeunit of milliseconds. inclusive.
+     */
+    private final long begin;
+
+    /**
+     * end timestamp in timeunit of milliseconds. inclusive.
+     */
+    private final long end;
+
+    /**
+     * @return TimeRange accordingly.
+     */
+    Banyandb.TimeRange build() {
+        final Banyandb.TimeRange.Builder builder = Banyandb.TimeRange.newBuilder();
+        builder.setBegin(Timestamp.newBuilder()
+                                  .setSeconds(begin / 1000)
+                                  .setNanos((int) (begin % 1000 * 1_000_000)));
+        builder.setEnd(Timestamp.newBuilder()
+                                  .setSeconds(end / 1000)
+                                  .setNanos((int) (end % 1000 * 1_000_000)));
+        return builder.build();
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java
new file mode 100644
index 0000000..5bab418
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import io.grpc.stub.StreamObserver;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+import org.apache.skywalking.banyandb.v1.trace.TraceServiceGrpc;
+
+/**
+ * TraceWriteProcessor works for trace flush.
+ */
+@Slf4j
+public class TraceBulkWriteProcessor extends BulkWriteProcessor {
+    /**
+     * The BanyanDB instance name.
+     */
+    private final String group;
+    private TraceServiceGrpc.TraceServiceStub traceServiceStub;
+
+    /**
+     * Create the processor.
+     *
+     * @param traceServiceStub stub for gRPC call.
+     * @param maxBulkSize      the max bulk size for the flush operation
+     * @param flushInterval    if given maxBulkSize is not reached in this period, the flush would be trigger
+     *                         automatically. Unit is second.
+     * @param concurrency      the number of concurrency would run for the flush max.
+     */
+    protected TraceBulkWriteProcessor(final String group,
+                                      final TraceServiceGrpc.TraceServiceStub traceServiceStub,
+                                      final int maxBulkSize,
+                                      final int flushInterval,
+                                      final int concurrency) {
+        super("TraceBulkWriteProcessor", maxBulkSize, flushInterval, concurrency);
+        this.group = group;
+        this.traceServiceStub = traceServiceStub;
+    }
+
+    /**
+     * Add the trace to the bulk processor.
+     *
+     * @param traceWrite to add.
+     */
+    public void add(TraceWrite traceWrite) {
+        this.buffer.produce(traceWrite);
+    }
+
+    @Override
+    protected void flush(final List data) {
+        final StreamObserver<BanyandbTrace.WriteRequest> writeRequestStreamObserver
+            = traceServiceStub.withDeadlineAfter(
+                                  flushInterval, TimeUnit.SECONDS)
+                              .write(
+                                  new StreamObserver<BanyandbTrace.WriteResponse>() {
+                                      @Override
+                                      public void onNext(
+                                          BanyandbTrace.WriteResponse writeResponse) {
+                                      }
+
+                                      @Override
+                                      public void onError(
+                                          Throwable throwable) {
+                                          log.error(
+                                              "Error occurs in flushing traces.",
+                                              throwable
+                                          );
+                                      }
+
+                                      @Override
+                                      public void onCompleted() {
+                                      }
+                                  });
+        try {
+            data.forEach(write -> {
+                final TraceWrite traceWrite = (TraceWrite) write;
+                BanyandbTrace.WriteRequest request = traceWrite.build(group);
+                writeRequestStreamObserver.onNext(request);
+            });
+        } finally {
+            writeRequestStreamObserver.onCompleted();
+        }
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQuery.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQuery.java
new file mode 100644
index 0000000..850ddda
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQuery.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+
+/**
+ * TraceQuery is the high-level query API for the trace model.
+ */
+@Setter
+public class TraceQuery {
+    /**
+     * Owner name current entity
+     */
+    private final String name;
+    /**
+     * The time range for query.
+     */
+    private final TimestampRange timestampRange;
+    /**
+     * The projections of query result. These should have defined in the schema.
+     */
+    private final List<String> projections;
+    /**
+     * Query conditions.
+     */
+    private final List<PairQueryCondition<?>> conditions;
+    /**
+     * The starting row id of the query. Default value is 0.
+     */
+    private int offset;
+    /**
+     * The limit size of the query. Default value is 20.
+     */
+    private int limit;
+    /**
+     * One order condition is supported and optional.
+     */
+    private OrderBy orderBy;
+    /**
+     * Whether to fetch data_binary for the query
+     */
+    private boolean dataBinary;
+
+    public TraceQuery(final String name, final TimestampRange timestampRange, final List<String> projections) {
+        this.name = name;
+        this.timestampRange = timestampRange;
+        this.projections = projections;
+        this.conditions = new ArrayList<>(10);
+        this.offset = 0;
+        this.limit = 20;
+        this.dataBinary = false;
+    }
+
+    public TraceQuery(final String name, final List<String> projections) {
+        this(name, null, projections);
+    }
+
+    /**
+     * Fluent API for appending query condition
+     *
+     * @param condition the query condition to be appended
+     */
+    public TraceQuery appendCondition(PairQueryCondition<?> condition) {
+        this.conditions.add(condition);
+        return this;
+    }
+
+    /**
+     * @param group The instance name.
+     * @return QueryRequest for gRPC level query.
+     */
+    BanyandbTrace.QueryRequest build(String group) {
+        final BanyandbTrace.QueryRequest.Builder builder = BanyandbTrace.QueryRequest.newBuilder();
+        builder.setMetadata(Banyandb.Metadata.newBuilder()
+                .setGroup(group)
+                .setName(name)
+                .build());
+        if (timestampRange != null) {
+            builder.setTimeRange(timestampRange.build());
+        }
+        builder.setProjection(Banyandb.Projection.newBuilder().setDataBinary(this.dataBinary).addAllKeyNames(projections).build());
+        conditions.forEach(pairQueryCondition -> builder.addFields(pairQueryCondition.build()));
+        builder.setOffset(offset);
+        builder.setLimit(limit);
+        if (orderBy != null) {
+            builder.setOrderBy(orderBy.build());
+        }
+        return builder.build();
+    }
+
+    @RequiredArgsConstructor
+    public static class OrderBy {
+        /**
+         * The field name for ordering.
+         */
+        private final String fieldName;
+        /**
+         * The type of ordering.
+         */
+        private final Type type;
+
+        private Banyandb.QueryOrder build() {
+            final Banyandb.QueryOrder.Builder builder = Banyandb.QueryOrder.newBuilder();
+            builder.setKeyName(fieldName);
+            builder.setSort(
+                    Type.DESC.equals(type) ? Banyandb.QueryOrder.Sort.SORT_DESC : Banyandb.QueryOrder.Sort.SORT_ASC);
+            return builder.build();
+        }
+
+        public enum Type {
+            ASC, DESC
+        }
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQueryResponse.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQueryResponse.java
new file mode 100644
index 0000000..49d0881
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQueryResponse.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import java.util.ArrayList;
+import java.util.List;
+import lombok.Getter;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+
+/**
+ * TraceQueryResponse represents the trace query result.
+ */
+public class TraceQueryResponse {
+    @Getter
+    private final List<RowEntity> entities;
+
+    TraceQueryResponse(BanyandbTrace.QueryResponse response) {
+        final List<BanyandbTrace.Entity> entitiesList = response.getEntitiesList();
+        entities = new ArrayList<>(entitiesList.size());
+        entitiesList.forEach(entity -> entities.add(new RowEntity(entity)));
+    }
+
+    /**
+     * @return size of the response set.
+     */
+    public int size() {
+        return entities.size();
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java
new file mode 100644
index 0000000..0e4008a
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Timestamp;
+import java.util.List;
+import lombok.AccessLevel;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Singular;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+
+/**
+ * TraceWrite represents a write operation, including necessary fields, for {@link
+ * BanyanDBClient#buildTraceWriteProcessor}.
+ */
+@Builder
+@Getter(AccessLevel.PROTECTED)
+public class TraceWrite {
+    /**
+     * Owner name current entity
+     */
+    private final String name;
+    /**
+     * ID of current entity
+     */
+    private final String entityId;
+    /**
+     * Timestamp represents the time of current trace or trace segment
+     * in the timeunit of milliseconds.
+     */
+    private final long timestamp;
+    /**
+     * The binary raw data represents the whole object of current trace or trace segment. It could be organized by
+     * different serialization formats. Natively, SkyWalking uses protobuf, but it is not required. The BanyanDB server
+     * wouldn't deserialize this. So, no format requirement.
+     */
+    private final byte[] binary;
+    /**
+     * The values of fields, which are defined by the schema. In the bulk write process, BanyanDB client doesn't require
+     * field names anymore.
+     */
+    @Singular
+    private final List<SerializableField> fields;
+
+    /**
+     * @param group of the BanyanDB client connected.
+     * @return {@link BanyandbTrace.WriteRequest} for the bulk process.
+     */
+    BanyandbTrace.WriteRequest build(String group) {
+        final BanyandbTrace.WriteRequest.Builder builder = BanyandbTrace.WriteRequest.newBuilder();
+        builder.setMetadata(Banyandb.Metadata.newBuilder().setGroup(group).setName(name).build());
+        final BanyandbTrace.EntityValue.Builder entityBuilder = BanyandbTrace.EntityValue.newBuilder();
+        entityBuilder.setEntityId(entityId);
+        entityBuilder.setTimestamp(Timestamp.newBuilder()
+                                            .setSeconds(timestamp / 1000)
+                                            .setNanos((int) (timestamp % 1000 * 1_000_000)));
+        entityBuilder.setDataBinary(ByteString.copyFrom(binary));
+        fields.forEach(writeField -> entityBuilder.addFields(writeField.toField()));
+        builder.setEntity(entityBuilder.build());
+        return builder.build();
+    }
+}
diff --git a/src/main/proto/banyandb/v1/banyandb-trace.proto b/src/main/proto/banyandb/v1/banyandb-trace.proto
new file mode 100644
index 0000000..44d98c1
--- /dev/null
+++ b/src/main/proto/banyandb/v1/banyandb-trace.proto
@@ -0,0 +1,107 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto3";
+
+option java_package = "org.apache.skywalking.banyandb.v1.trace";
+
+package banyandb.trace.v1;
+
+import "google/protobuf/timestamp.proto";
+import "google/protobuf/struct.proto";
+import "banyandb/v1/banyandb.proto";
+
+service TraceService {
+  rpc Query(banyandb.trace.v1.QueryRequest) returns (banyandb.trace.v1.QueryResponse);
+  rpc Write(stream banyandb.trace.v1.WriteRequest) returns (stream banyandb.trace.v1.WriteResponse);
+}
+
+// QueryRequest is the request contract for query.
+message QueryRequest {
+  // metadata is required
+  banyandb.v1.Metadata metadata = 1;
+  // time_range is a range query with begin/end time of entities in the timeunit of nanoseconds.
+  // In the context of Trace, it represents the range of the `startTime` for spans/segments,
+  // while in the context of Log, it means the range of the timestamp(s) for logs.
+  // it is always recommended to specify time range for performance reason
+  banyandb.v1.TimeRange time_range = 2;
+  // offset is used to support pagination, together with the following limit
+  uint32 offset = 3;
+  // limit is used to impose a boundary on the number of records being returned
+  uint32 limit = 4;
+  // order_by is given to specify the sort for a field. So far, only fields in the type of Integer are supported
+  banyandb.v1.QueryOrder order_by = 5;
+  // fields are indexed. Some typical fields are listed below,
+  // - trace_id: if given, it takes precedence over other fields and will be used to retrieve entities before other conditions are imposed
+  // - duration: typical for trace context
+  repeated banyandb.v1.PairQuery fields = 6;
+  // projection can be used to select the key names of the entities in the response
+  banyandb.v1.Projection projection = 7;
+}
+
+// QueryResponse is the response for a query to the Query module.
+message QueryResponse {
+  // entities are the actual data returned
+  repeated Entity entities = 1;
+}
+
+// Entity represents
+// (Trace context) a Span defined in Google Dapper paper or equivalently a Segment in Skywalking.
+// (Log context) a log
+message Entity {
+  // entity_id could be span_id of a Span or segment_id of a Segment in the context of Trace
+  string entity_id = 1;
+  // timestamp represents
+  // 1) either the start time of a Span/Segment,
+  // 2) or the timestamp of a log
+  google.protobuf.Timestamp timestamp = 2;
+  // data_binary contains all un-indexed Tags and other key-value pairs
+  bytes data_binary = 3;
+  // fields contains all indexed Field. Some typical names,
+  // - trace_id
+  // - duration
+  // - service_name
+  // - service_instance_id
+  // - end_time_nanoseconds
+  repeated banyandb.v1.TypedPair fields = 4;
+}
+
+
+message WriteRequest {
+  // the metadata is only required in the first write.
+  banyandb.v1.Metadata metadata = 1;
+  // the entity is required.
+  EntityValue entity = 2;
+}
+
+message WriteResponse {}
+
+message EntityValue {
+  // entity_id could be span_id of a Span or segment_id of a Segment in the context of Trace
+  string entity_id = 1;
+  // timestamp_nanoseconds is in the timeunit of nanoseconds. It represents
+  // 1) either the start time of a Span/Segment,
+  // 2) or the timestamp of a log
+  google.protobuf.Timestamp timestamp = 2;
+  // binary representation of segments, including tags, spans...
+  bytes data_binary = 3;
+  // support all of indexed fields in the fields.
+  // Pair only has value, as the value of PairValue match with the key
+  // by the index rules and index rule bindings of Metadata group.
+  // indexed fields of multiple entities are compression in the fields.
+  repeated banyandb.v1.Field fields = 4;
+}
\ No newline at end of file
diff --git a/src/main/proto/banyandb/v1/banyandb.proto b/src/main/proto/banyandb/v1/banyandb.proto
new file mode 100644
index 0000000..e86186e
--- /dev/null
+++ b/src/main/proto/banyandb/v1/banyandb.proto
@@ -0,0 +1,136 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto3";
+
+option java_package = "org.apache.skywalking.banyandb.v1";
+
+package banyandb.v1;
+
+import "google/protobuf/timestamp.proto";
+import "google/protobuf/struct.proto";
+
+// Metadata is for multi-tenant, multi-model use
+message Metadata {
+  // group contains a set of options, like retention policy, max
+  string group = 1;
+  // name of the entity
+  string name = 2;
+}
+
+enum FieldType {
+  FIELD_TYPE_UNSPECIFIED = 0;
+  FIELD_TYPE_STRING = 1;
+  FIELD_TYPE_INT = 2;
+  FIELD_TYPE_STRING_ARRAY = 3;
+  FIELD_TYPE_INT_ARRAY = 4;
+}
+
+// Pair is the building block of a record which is equivalent to a key-value pair.
+// In the context of Trace, it could be metadata of a trace such as service_name, service_instance, etc.
+// Besides, other fields/tags are organized in key-value pair in the underlying storage layer.
+// One should notice that the values can be a multi-value.
+message TypedPair {
+  string key = 1;
+  oneof typed {
+    NullWithType null_pair = 2;
+    Int int_pair = 3;
+    Str str_pair = 4;
+    IntArray int_array_pair = 5;
+    StrArray str_array_pair = 6;
+  }
+
+  message NullWithType {
+      FieldType type = 1;
+  }
+}
+
+// PairQuery consists of the query condition with a single binary operator to be imposed
+// For 1:1 BinaryOp, values in condition must be an array with length = 1,
+// while for 1:N BinaryOp, values can be an array with length >= 1.
+message PairQuery {
+  // BinaryOp specifies the operation imposed to the given query condition
+  // For EQ, NE, LT, GT, LE and GE, only one operand should be given, i.e. one-to-one relationship.
+  // HAVING and NOT_HAVING allow multi-value to be the operand such as array/vector, i.e. one-to-many relationship.
+  // For example, "keyA" contains "valueA" **and** "valueB"
+  enum BinaryOp {
+    BINARY_OP_UNSPECIFIED = 0;
+    BINARY_OP_EQ = 1;
+    BINARY_OP_NE = 2;
+    BINARY_OP_LT = 3;
+    BINARY_OP_GT = 4;
+    BINARY_OP_LE = 5;
+    BINARY_OP_GE = 6;
+    BINARY_OP_HAVING = 7;
+    BINARY_OP_NOT_HAVING = 8;
+  }
+  BinaryOp op = 1;
+  TypedPair condition = 2;
+}
+
+// QueryOrder means a Sort operation to be done for a given field.
+// The key_name refers to the key of a Pair.
+message QueryOrder {
+  string key_name = 1;
+  enum Sort {
+    SORT_UNSPECIFIED = 0;
+    SORT_DESC = 1;
+    SORT_ASC = 2;
+  }
+  Sort sort = 2;
+}
+
+// Projection is used to select the names of keys to be returned.
+message Projection {
+  // whether binary part is needed
+  bool data_binary = 1;
+  // The key_name refers to the key(s) of Pair(s).
+  repeated string key_names = 2;
+}
+
+// TimeRange is a range query for uint64,
+// the range here follows left-inclusive and right-exclusive rule, i.e. [begin, end) if both edges exist
+message TimeRange {
+  google.protobuf.Timestamp begin = 1;
+  google.protobuf.Timestamp end = 2;
+}
+
+message Str {
+  string value = 1;
+}
+
+message Int {
+  int64 value = 1;
+}
+
+message StrArray {
+  repeated string value = 1;
+}
+
+message IntArray {
+  repeated int64 value = 1;
+}
+
+message Field {
+  oneof value_type {
+    google.protobuf.NullValue null = 1;
+    Str str = 2;
+    StrArray str_array = 3;
+    Int int = 4;
+    IntArray int_array = 5;
+  }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientQueryTest.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientQueryTest.java
new file mode 100644
index 0000000..1667254
--- /dev/null
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientQueryTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Timestamp;
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcCleanupRule;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+import org.apache.skywalking.banyandb.v1.trace.TraceServiceGrpc;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatchers;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.AdditionalAnswers.delegatesTo;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.management.*")
+public class BanyanDBClientQueryTest {
+    @Rule
+    public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+
+    private final TraceServiceGrpc.TraceServiceImplBase serviceImpl =
+            mock(TraceServiceGrpc.TraceServiceImplBase.class, delegatesTo(
+                    new TraceServiceGrpc.TraceServiceImplBase() {
+                        @Override
+                        public void query(BanyandbTrace.QueryRequest request, StreamObserver<BanyandbTrace.QueryResponse> responseObserver) {
+                            responseObserver.onNext(BanyandbTrace.QueryResponse.newBuilder().build());
+                            responseObserver.onCompleted();
+                        }
+                    }));
+
+    private BanyanDBClient client;
+
+    @Before
+    public void setUp() throws IOException {
+        // Generate a unique in-process server name.
+        String serverName = InProcessServerBuilder.generateName();
+
+        // Create a server, add service, start, and register for automatic graceful shutdown.
+        Server server = InProcessServerBuilder
+                .forName(serverName).directExecutor().addService(serviceImpl).build();
+        grpcCleanup.register(server.start());
+
+        // Create a client channel and register for automatic graceful shutdown.
+        ManagedChannel channel = grpcCleanup.register(
+                InProcessChannelBuilder.forName(serverName).directExecutor().build());
+        client = new BanyanDBClient("127.0.0.1", server.getPort(), "default");
+
+        client.connect(channel);
+    }
+
+    @Test
+    public void testNonNull() {
+        Assert.assertNotNull(this.client);
+    }
+
+    @Test
+    public void testQuery_tableScan() {
+        ArgumentCaptor<BanyandbTrace.QueryRequest> requestCaptor = ArgumentCaptor.forClass(BanyandbTrace.QueryRequest.class);
+
+        Instant end = Instant.now();
+        Instant begin = end.minus(15, ChronoUnit.MINUTES);
+        TraceQuery query = new TraceQuery("sw",
+                new TimestampRange(begin.toEpochMilli(), end.toEpochMilli()),
+                Arrays.asList("state", "start_time", "duration", "trace_id"));
+        // search for all states
+        query.appendCondition(PairQueryCondition.LongQueryCondition.eq("state", 0L));
+        query.setOrderBy(new TraceQuery.OrderBy("duration", TraceQuery.OrderBy.Type.DESC));
+        client.queryTraces(query);
+
+        verify(serviceImpl).query(requestCaptor.capture(), ArgumentMatchers.any());
+
+        final BanyandbTrace.QueryRequest request = requestCaptor.getValue();
+        // assert metadata
+        Assert.assertEquals("sw", request.getMetadata().getName());
+        Assert.assertEquals("default", request.getMetadata().getGroup());
+        // assert timeRange, both seconds and the nanos
+        Assert.assertEquals(begin.toEpochMilli() / 1000, request.getTimeRange().getBegin().getSeconds());
+        Assert.assertEquals(TimeUnit.MILLISECONDS.toNanos(begin.toEpochMilli() % 1000), request.getTimeRange().getBegin().getNanos());
+        Assert.assertEquals(end.toEpochMilli() / 1000, request.getTimeRange().getEnd().getSeconds());
+        Assert.assertEquals(TimeUnit.MILLISECONDS.toNanos(end.toEpochMilli() % 1000), request.getTimeRange().getEnd().getNanos());
+        // assert fields, we only have state as a condition which should be state
+        Assert.assertEquals(1, request.getFieldsCount());
+        // assert orderBy, by default DESC
+        Assert.assertEquals(Banyandb.QueryOrder.Sort.SORT_DESC, request.getOrderBy().getSort());
+        Assert.assertEquals("duration", request.getOrderBy().getKeyName());
+        // assert state
+        Assert.assertEquals(Banyandb.PairQuery.BinaryOp.BINARY_OP_EQ, request.getFields(0).getOp());
+        Assert.assertEquals(0, request.getFields(0).getCondition().getIntPair().getValue());
+        // assert projections
+        assertCollectionEqual(Lists.newArrayList("duration", "state", "start_time", "trace_id"), request.getProjection().getKeyNamesList());
+    }
+
+    @Test
+    public void testQuery_indexScan() {
+        ArgumentCaptor<BanyandbTrace.QueryRequest> requestCaptor = ArgumentCaptor.forClass(BanyandbTrace.QueryRequest.class);
+        Instant begin = Instant.now().minus(5, ChronoUnit.MINUTES);
+        Instant end = Instant.now();
+        String serviceId = "service_id_b";
+        String serviceInstanceId = "service_id_b_1";
+        String endpointId = "/check_0";
+        long minDuration = 10;
+        long maxDuration = 100;
+
+        TraceQuery query = new TraceQuery("sw",
+                new TimestampRange(begin.toEpochMilli(), end.toEpochMilli()),
+                Arrays.asList("state", "start_time", "duration", "trace_id"));
+        // search for the successful states
+        query.appendCondition(PairQueryCondition.LongQueryCondition.eq("state", 1L))
+                .appendCondition(PairQueryCondition.StringQueryCondition.eq("service_id", serviceId))
+                .appendCondition(PairQueryCondition.StringQueryCondition.eq("service_instance_id", serviceInstanceId))
+                .appendCondition(PairQueryCondition.StringQueryCondition.eq("endpoint_id", endpointId))
+                .appendCondition(PairQueryCondition.LongQueryCondition.ge("duration", minDuration))
+                .appendCondition(PairQueryCondition.LongQueryCondition.le("duration", maxDuration))
+                .setOrderBy(new TraceQuery.OrderBy("start_time", TraceQuery.OrderBy.Type.ASC));
+
+        client.queryTraces(query);
+
+        verify(serviceImpl).query(requestCaptor.capture(), ArgumentMatchers.any());
+        final BanyandbTrace.QueryRequest request = requestCaptor.getValue();
+        // assert metadata
+        Assert.assertEquals("sw", request.getMetadata().getName());
+        Assert.assertEquals("default", request.getMetadata().getGroup());
+        // assert timeRange
+        Assert.assertEquals(begin.getEpochSecond(), request.getTimeRange().getBegin().getSeconds());
+        Assert.assertEquals(end.getEpochSecond(), request.getTimeRange().getEnd().getSeconds());
+        // assert fields, we only have state as a condition
+        Assert.assertEquals(6, request.getFieldsCount());
+        // assert orderBy, by default DESC
+        Assert.assertEquals(Banyandb.QueryOrder.Sort.SORT_ASC, request.getOrderBy().getSort());
+        Assert.assertEquals("start_time", request.getOrderBy().getKeyName());
+        // assert projections
+        assertCollectionEqual(Lists.newArrayList("duration", "state", "start_time", "trace_id"), request.getProjection().getKeyNamesList());
+        // assert fields
+        assertCollectionEqual(request.getFieldsList(), ImmutableList.of(
+                PairQueryCondition.LongQueryCondition.ge("duration", minDuration).build(), // 1 -> duration >= minDuration
+                PairQueryCondition.LongQueryCondition.le("duration", maxDuration).build(), // 2 -> duration <= maxDuration
+                PairQueryCondition.StringQueryCondition.eq("service_id", serviceId).build(), // 3 -> service_id
+                PairQueryCondition.StringQueryCondition.eq("service_instance_id", serviceInstanceId).build(), // 4 -> service_instance_id
+                PairQueryCondition.StringQueryCondition.eq("endpoint_id", endpointId).build(), // 5 -> endpoint_id
+                PairQueryCondition.LongQueryCondition.eq("state", 1L).build() // 7 -> state
+        ));
+    }
+
+    @Test
+    public void testQuery_traceIDFetch() {
+        ArgumentCaptor<BanyandbTrace.QueryRequest> requestCaptor = ArgumentCaptor.forClass(BanyandbTrace.QueryRequest.class);
+        String traceId = "1111.222.333";
+
+        TraceQuery query = new TraceQuery("sw", Arrays.asList("state", "start_time", "duration", "trace_id"));
+        query.appendCondition(PairQueryCondition.StringQueryCondition.eq("trace_id", traceId));
+
+        client.queryTraces(query);
+
+        verify(serviceImpl).query(requestCaptor.capture(), ArgumentMatchers.any());
+        final BanyandbTrace.QueryRequest request = requestCaptor.getValue();
+        // assert metadata
+        Assert.assertEquals("sw", request.getMetadata().getName());
+        Assert.assertEquals("default", request.getMetadata().getGroup());
+        Assert.assertEquals(1, request.getFieldsCount());
+        // assert fields
+        assertCollectionEqual(request.getFieldsList(), ImmutableList.of(
+                PairQueryCondition.StringQueryCondition.eq("trace_id", traceId).build()
+        ));
+    }
+
+    @Test
+    public void testQuery_responseConversion() {
+        final byte[] binaryData = new byte[]{13};
+        final String segmentId = "1231.dfd.123123ssf";
+        final String traceId = "trace_id-xxfff.111323";
+        final long duration = 200L;
+        final Instant now = Instant.now();
+        final BanyandbTrace.QueryResponse responseObj = BanyandbTrace.QueryResponse.newBuilder()
+                .addEntities(BanyandbTrace.Entity.newBuilder()
+                        .setDataBinary(ByteString.copyFrom(binaryData))
+                        .setEntityId(segmentId)
+                        .setTimestamp(Timestamp.newBuilder()
+                                .setSeconds(now.toEpochMilli() / 1000)
+                                .setNanos((int) TimeUnit.MILLISECONDS.toNanos(now.toEpochMilli() % 1000))
+                                .build())
+                        .addFields(Banyandb.TypedPair.newBuilder()
+                                .setKey("trace_id")
+                                .setStrPair(Banyandb.Str.newBuilder().setValue(traceId).build()).build())
+                        .addFields(Banyandb.TypedPair.newBuilder()
+                                .setKey("duration")
+                                .setIntPair(Banyandb.Int.newBuilder().setValue(duration).build()).build())
+                        .addFields(Banyandb.TypedPair.newBuilder()
+                                .setKey("mq.broker")
+                                .setNullPair(Banyandb.TypedPair.NullWithType.newBuilder().setType(Banyandb.FieldType.FIELD_TYPE_STRING).build()).build())
+                        .build())
+                .build();
+        TraceQueryResponse resp = new TraceQueryResponse(responseObj);
+        Assert.assertNotNull(resp);
+        Assert.assertEquals(1, resp.getEntities().size());
+        Assert.assertEquals(3, resp.getEntities().get(0).getFields().size());
+        Assert.assertEquals(3, resp.getEntities().get(0).getFields().size());
+        Assert.assertEquals(new FieldAndValue.StringFieldPair("trace_id", traceId), resp.getEntities().get(0).getFields().get(0));
+        Assert.assertEquals(new FieldAndValue.LongFieldPair("duration", duration), resp.getEntities().get(0).getFields().get(1));
+        Assert.assertEquals(new FieldAndValue.StringFieldPair("mq.broker", null), resp.getEntities().get(0).getFields().get(2));
+    }
+
+    static <T> void assertCollectionEqual(Collection<T> c1, Collection<T> c2) {
+        Assert.assertTrue(c1.size() == c2.size() && c1.containsAll(c2) && c2.containsAll(c1));
+    }
+}
diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientWriteTest.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientWriteTest.java
new file mode 100644
index 0000000..731f560
--- /dev/null
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientWriteTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcCleanupRule;
+import io.grpc.util.MutableHandlerRegistry;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+import org.apache.skywalking.banyandb.v1.trace.TraceServiceGrpc;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class BanyanDBClientWriteTest {
+    @Rule
+    public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+
+    private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
+
+    private BanyanDBClient client;
+    private TraceBulkWriteProcessor traceBulkWriteProcessor;
+
+    @Before
+    public void setUp() throws IOException {
+        String serverName = InProcessServerBuilder.generateName();
+
+        Server server = InProcessServerBuilder
+                .forName(serverName).fallbackHandlerRegistry(serviceRegistry).directExecutor().build();
+        grpcCleanup.register(server.start());
+
+        ManagedChannel channel = grpcCleanup.register(
+                InProcessChannelBuilder.forName(serverName).directExecutor().build());
+
+        client = new BanyanDBClient("127.0.0.1", server.getPort(), "default");
+        client.connect(channel);
+        traceBulkWriteProcessor = client.buildTraceWriteProcessor(1000, 1, 1);
+    }
+
+    @After
+    public void shutdown() throws IOException {
+        traceBulkWriteProcessor.close();
+    }
+
+    @Test
+    public void testWrite() throws Exception {
+        final CountDownLatch allRequestsDelivered = new CountDownLatch(1);
+        final List<BanyandbTrace.WriteRequest> writeRequestDelivered = new ArrayList<>();
+
+        // implement the fake service
+        final TraceServiceGrpc.TraceServiceImplBase serviceImpl =
+                new TraceServiceGrpc.TraceServiceImplBase() {
+                    @Override
+                    public StreamObserver<BanyandbTrace.WriteRequest> write(StreamObserver<BanyandbTrace.WriteResponse> responseObserver) {
+                        return new StreamObserver<BanyandbTrace.WriteRequest>() {
+                            @Override
+                            public void onNext(BanyandbTrace.WriteRequest value) {
+                                writeRequestDelivered.add(value);
+                                responseObserver.onNext(BanyandbTrace.WriteResponse.newBuilder().build());
+                            }
+
+                            @Override
+                            public void onError(Throwable t) {
+                            }
+
+                            @Override
+                            public void onCompleted() {
+                                responseObserver.onCompleted();
+                                allRequestsDelivered.countDown();
+                            }
+                        };
+                    }
+                };
+        serviceRegistry.addService(serviceImpl);
+
+        String segmentId = "1231.dfd.123123ssf";
+        String traceId = "trace_id-xxfff.111323";
+        String serviceId = "webapp_id";
+        String serviceInstanceId = "10.0.0.1_id";
+        String endpointId = "home_id";
+        int latency = 200;
+        int state = 1;
+        Instant now = Instant.now();
+        byte[] byteData = new byte[]{14};
+        String broker = "172.16.10.129:9092";
+        String topic = "topic_1";
+        String queue = "queue_2";
+        String httpStatusCode = "200";
+        String dbType = "SQL";
+        String dbInstance = "127.0.0.1:3306";
+
+        TraceWrite traceWrite = TraceWrite.builder()
+                .entityId(segmentId)
+                .binary(byteData)
+                .timestamp(now.toEpochMilli())
+                .name("sw")
+                .field(Field.stringField(traceId)) // 0
+                .field(Field.stringField(serviceId))
+                .field(Field.stringField(serviceInstanceId))
+                .field(Field.stringField(endpointId))
+                .field(Field.longField(latency)) // 4
+                .field(Field.longField(state))
+                .field(Field.stringField(httpStatusCode))
+                .field(Field.nullField()) // 7
+                .field(Field.stringField(dbType))
+                .field(Field.stringField(dbInstance))
+                .field(Field.stringField(broker))
+                .field(Field.stringField(topic))
+                .field(Field.stringField(queue)) // 12
+                .build();
+
+        traceBulkWriteProcessor.add(traceWrite);
+
+        if (allRequestsDelivered.await(5, TimeUnit.SECONDS)) {
+            Assert.assertEquals(1, writeRequestDelivered.size());
+            final BanyandbTrace.WriteRequest request = writeRequestDelivered.get(0);
+            Assert.assertEquals(13, request.getEntity().getFieldsCount());
+            Assert.assertEquals(traceId, request.getEntity().getFields(0).getStr().getValue());
+            Assert.assertEquals(latency, request.getEntity().getFields(4).getInt().getValue());
+            Assert.assertEquals(request.getEntity().getFields(7).getValueTypeCase(), Banyandb.Field.ValueTypeCase.NULL);
+            Assert.assertEquals(queue, request.getEntity().getFields(12).getStr().getValue());
+        } else {
+            Assert.fail();
+        }
+    }
+}