You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/08/31 07:16:18 UTC

[skywalking-banyandb-java-client] branch main created (now 4575ecf)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb-java-client.git.


      at 4575ecf  Initial project codebase.

This branch includes the following new commits:

     new 11fe818  first commit
     new 4575ecf  Initial project codebase.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[skywalking-banyandb-java-client] 02/02: Initial project codebase.

Posted by wu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb-java-client.git

commit 4575ecf012c130fe17197f3bbed7b7a1836a38a9
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Tue Aug 31 15:16:13 2021 +0800

    Initial project codebase.
---
 .gitignore                                         |  23 ++
 .mvn/wrapper/MavenWrapperDownloader.java           | 117 +++++++
 .mvn/wrapper/maven-wrapper.properties              |   3 +
 HEADER                                             |  14 +
 LICENSE                                            | 202 ++++++++++++
 NOTICE                                             |   5 +
 apm-checkstyle/CHECKSTYLE_HEAD                     |  18 ++
 apm-checkstyle/checkStyle.xml                      | 133 ++++++++
 apm-checkstyle/importControl.xml                   |  28 ++
 mvnw                                               | 310 ++++++++++++++++++
 mvnw.cmd                                           | 182 +++++++++++
 pom.xml                                            | 357 +++++++++++++++++++++
 .../banyandb/commons/datacarrier/DataCarrier.java  | 166 ++++++++++
 .../banyandb/commons/datacarrier/EnvUtil.java      |  50 +++
 .../buffer/ArrayBlockingQueueBuffer.java           |  67 ++++
 .../commons/datacarrier/buffer/Buffer.java         |  76 +++++
 .../commons/datacarrier/buffer/BufferStrategy.java |  23 ++
 .../commons/datacarrier/buffer/Channels.java       |  93 ++++++
 .../commons/datacarrier/buffer/QueueBuffer.java    |  46 +++
 .../datacarrier/common/AtomicRangeInteger.java     |  76 +++++
 .../datacarrier/consumer/BulkConsumePool.java      | 118 +++++++
 .../datacarrier/consumer/ConsumeDriver.java        | 137 ++++++++
 .../consumer/ConsumerCannotBeCreatedException.java |  25 ++
 .../commons/datacarrier/consumer/ConsumerPool.java |  30 ++
 .../datacarrier/consumer/ConsumerPoolFactory.java  |  50 +++
 .../datacarrier/consumer/ConsumerThread.java       | 105 ++++++
 .../commons/datacarrier/consumer/IConsumer.java    |  40 +++
 .../commons/datacarrier/consumer/IDriver.java      |  32 ++
 .../consumer/MultipleChannelsConsumer.java         | 124 +++++++
 .../datacarrier/partition/IDataPartitioner.java    |  32 ++
 .../partition/ProducerThreadPartitioner.java       |  37 +++
 .../partition/SimpleRollingPartitioner.java        |  37 +++
 .../banyandb/v1/client/BanyanDBClient.java         | 197 ++++++++++++
 .../banyandb/v1/client/BulkWriteProcessor.java     | 129 ++++++++
 .../skywalking/banyandb/v1/client/Field.java       | 157 +++++++++
 .../banyandb/v1/client/FieldAndValue.java          | 101 ++++++
 .../skywalking/banyandb/v1/client/Options.java     |  43 +++
 .../banyandb/v1/client/PairQueryCondition.java     | 307 ++++++++++++++++++
 .../skywalking/banyandb/v1/client/RowEntity.java   |  59 ++++
 .../banyandb/v1/client/SerializableField.java      |  29 ++
 .../banyandb/v1/client/TimestampRange.java         |  53 +++
 .../v1/client/TraceBulkWriteProcessor.java         | 102 ++++++
 .../skywalking/banyandb/v1/client/TraceQuery.java  | 137 ++++++++
 .../banyandb/v1/client/TraceQueryResponse.java     |  45 +++
 .../skywalking/banyandb/v1/client/TraceWrite.java  |  81 +++++
 src/main/proto/banyandb/v1/banyandb-trace.proto    | 107 ++++++
 src/main/proto/banyandb/v1/banyandb.proto          | 136 ++++++++
 .../v1/client/BanyanDBClientQueryTest.java         | 245 ++++++++++++++
 .../v1/client/BanyanDBClientWriteTest.java         | 155 +++++++++
 49 files changed, 4839 insertions(+)

diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..562e568
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,23 @@
+/build/
+target/
+.idea/
+*.iml
+.classpath
+.project
+.settings/
+.DS_Store
+*~
+packages/
+**/dependency-reduced-pom.xml
+**/dist/
+/docker/snapshot/*.gz
+.mvn/wrapper/*.jar
+.factorypath
+.vscode
+.checkstyle
+.externalToolBuilders
+/test/plugin/dist
+/test/plugin/workspace
+/test/jacoco/classes
+/test/jacoco/*.exec
+test/jacoco
\ No newline at end of file
diff --git a/.mvn/wrapper/MavenWrapperDownloader.java b/.mvn/wrapper/MavenWrapperDownloader.java
new file mode 100644
index 0000000..187216f
--- /dev/null
+++ b/.mvn/wrapper/MavenWrapperDownloader.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2007-present the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.net.*;
+import java.io.*;
+import java.nio.channels.*;
+import java.util.Properties;
+
+public class MavenWrapperDownloader {
+
+    private static final String WRAPPER_VERSION = "0.5.5";
+    /**
+     * Default URL to download the maven-wrapper.jar from, if no 'downloadUrl' is provided.
+     */
+    private static final String DEFAULT_DOWNLOAD_URL = "https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/"
+        + WRAPPER_VERSION + "/maven-wrapper-" + WRAPPER_VERSION + ".jar";
+
+    /**
+     * Path to the maven-wrapper.properties file, which might contain a downloadUrl property to
+     * use instead of the default one.
+     */
+    private static final String MAVEN_WRAPPER_PROPERTIES_PATH =
+        ".mvn/wrapper/maven-wrapper.properties";
+
+    /**
+     * Path where the maven-wrapper.jar will be saved to.
+     */
+    private static final String MAVEN_WRAPPER_JAR_PATH =
+        ".mvn/wrapper/maven-wrapper.jar";
+
+    /**
+     * Name of the property which should be used to override the default download url for the wrapper.
+     */
+    private static final String PROPERTY_NAME_WRAPPER_URL = "wrapperUrl";
+
+    public static void main(String args[]) {
+        System.out.println("- Downloader started");
+        File baseDirectory = new File(args[0]);
+        System.out.println("- Using base directory: " + baseDirectory.getAbsolutePath());
+
+        // If the maven-wrapper.properties exists, read it and check if it contains a custom
+        // wrapperUrl parameter.
+        File mavenWrapperPropertyFile = new File(baseDirectory, MAVEN_WRAPPER_PROPERTIES_PATH);
+        String url = DEFAULT_DOWNLOAD_URL;
+        if(mavenWrapperPropertyFile.exists()) {
+            FileInputStream mavenWrapperPropertyFileInputStream = null;
+            try {
+                mavenWrapperPropertyFileInputStream = new FileInputStream(mavenWrapperPropertyFile);
+                Properties mavenWrapperProperties = new Properties();
+                mavenWrapperProperties.load(mavenWrapperPropertyFileInputStream);
+                url = mavenWrapperProperties.getProperty(PROPERTY_NAME_WRAPPER_URL, url);
+            } catch (IOException e) {
+                System.out.println("- ERROR loading '" + MAVEN_WRAPPER_PROPERTIES_PATH + "'");
+            } finally {
+                try {
+                    if(mavenWrapperPropertyFileInputStream != null) {
+                        mavenWrapperPropertyFileInputStream.close();
+                    }
+                } catch (IOException e) {
+                    // Ignore ...
+                }
+            }
+        }
+        System.out.println("- Downloading from: " + url);
+
+        File outputFile = new File(baseDirectory.getAbsolutePath(), MAVEN_WRAPPER_JAR_PATH);
+        if(!outputFile.getParentFile().exists()) {
+            if(!outputFile.getParentFile().mkdirs()) {
+                System.out.println(
+                    "- ERROR creating output directory '" + outputFile.getParentFile().getAbsolutePath() + "'");
+            }
+        }
+        System.out.println("- Downloading to: " + outputFile.getAbsolutePath());
+        try {
+            downloadFileFromURL(url, outputFile);
+            System.out.println("Done");
+            System.exit(0);
+        } catch (Throwable e) {
+            System.out.println("- Error downloading");
+            e.printStackTrace();
+            System.exit(1);
+        }
+    }
+
+    private static void downloadFileFromURL(String urlString, File destination) throws Exception {
+        if (System.getenv("MVNW_USERNAME") != null && System.getenv("MVNW_PASSWORD") != null) {
+            String username = System.getenv("MVNW_USERNAME");
+            char[] password = System.getenv("MVNW_PASSWORD").toCharArray();
+            Authenticator.setDefault(new Authenticator() {
+                @Override
+                protected PasswordAuthentication getPasswordAuthentication() {
+                    return new PasswordAuthentication(username, password);
+                }
+            });
+        }
+        URL website = new URL(urlString);
+        ReadableByteChannel rbc;
+        rbc = Channels.newChannel(website.openStream());
+        FileOutputStream fos = new FileOutputStream(destination);
+        fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
+        fos.close();
+        rbc.close();
+    }
+
+}
diff --git a/.mvn/wrapper/maven-wrapper.properties b/.mvn/wrapper/maven-wrapper.properties
new file mode 100644
index 0000000..54ab0bc
--- /dev/null
+++ b/.mvn/wrapper/maven-wrapper.properties
@@ -0,0 +1,3 @@
+distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.6.1/apache-maven-3.6.1-bin.zip
+wrapperUrl=https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.5/maven-wrapper-0.5.5.jar
+
diff --git a/HEADER b/HEADER
new file mode 100644
index 0000000..1745cfe
--- /dev/null
+++ b/HEADER
@@ -0,0 +1,14 @@
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..8f71f43
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,202 @@
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "{}"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright {yyyy} {name of copyright owner}
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
diff --git a/NOTICE b/NOTICE
new file mode 100644
index 0000000..2ca1b04
--- /dev/null
+++ b/NOTICE
@@ -0,0 +1,5 @@
+Apache SkyWalking
+Copyright 2017-2021 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
diff --git a/apm-checkstyle/CHECKSTYLE_HEAD b/apm-checkstyle/CHECKSTYLE_HEAD
new file mode 100755
index 0000000..00e3b09
--- /dev/null
+++ b/apm-checkstyle/CHECKSTYLE_HEAD
@@ -0,0 +1,18 @@
+^<\?xml version="1\.0" encoding="UTF-8"\?>$
+^<!--$
+^ *(/\*|#|~)$
+^ *(\*|#|~) +Licensed to the Apache Software Foundation \(ASF\) under one or more$
+^ *(\*|#|~) +contributor license agreements\.  See the NOTICE file distributed with$
+^ *(\*|#|~) +this work for additional information regarding copyright ownership\.$
+^ *(\*|#|~) +The ASF licenses this file to You under the Apache License, Version 2\.0$
+^ *(\*|#|~) +\(the "License"\); you may not use this file except in compliance with$
+^ *(\*|#|~) +the License\.  You may obtain a copy of the License at$
+^ *(\*|#|~)$
+^ *(\*|#|~) +http://www\.apache\.org/licenses/LICENSE-2\.0$
+^ *(\*|#|~)$
+^ *(\*|#|~) +Unless required by applicable law or agreed to in writing, software$
+^ *(\*|#|~) +distributed under the License is distributed on an "AS IS" BASIS,$
+^ *(\*|#|~) +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied\.$
+^ *(\*|#|~) +See the License for the specific language governing permissions and$
+^ *(\*|#|~) +limitations under the License\.$
+^ *(\*|#|~)$
diff --git a/apm-checkstyle/checkStyle.xml b/apm-checkstyle/checkStyle.xml
new file mode 100755
index 0000000..d51e0c7
--- /dev/null
+++ b/apm-checkstyle/checkStyle.xml
@@ -0,0 +1,133 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<!DOCTYPE module PUBLIC
+    "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
+    "http://checkstyle.org/dtds/configuration_1_3.dtd">
+<module name="Checker">
+
+    <property name="localeLanguage" value="en"/>
+
+    <module name="FileTabCharacter">
+        <property name="eachLine" value="true"/>
+    </module>
+
+    <module name="RegexpHeader">
+        <property name="headerFile" value="${checkstyle.header.file}"/>
+        <property name="multiLines" value="1, 2, 3, 18"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="System\.out\.println"/>
+        <property name="message" value="Prohibit invoking System.out.println in source code !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="^\s*\*\s*@author"/>
+        <property name="minimum" value="0"/>
+        <property name="maximum" value="0"/>
+        <property name="message" value="ASF project doesn't allow @author copyright."/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format"
+                  value=".*[\u3400-\u4DB5\u4E00-\u9FA5\u9FA6-\u9FBB\uF900-\uFA2D\uFA30-\uFA6A\uFA70-\uFAD9\uFF00-\uFFEF\u2E80-\u2EFF\u3000-\u303F\u31C0-\u31EF]+.*"/>
+        <property name="message" value="Not allow chinese character !"/>
+    </module>
+
+    <module name="FileLength">
+        <property name="max" value="3000"/>
+    </module>
+
+    <module name="TreeWalker">
+
+        <module name="UnusedImports"/>
+        <module name="RedundantImport"/>
+        <module name="AvoidStarImport"/>
+
+        <module name="NonEmptyAtclauseDescription"/>
+
+        <!--Checks that classes that override equals() also override hashCode()-->
+        <module name="EqualsHashCode"/>
+        <!--Checks for over-complicated boolean expressions. Currently finds code like if (topic == true), topic || true, !false, etc.-->
+        <module name="SimplifyBooleanExpression"/>
+        <module name="OneStatementPerLine"/>
+        <module name="UnnecessaryParentheses"/>
+        <!--Checks for over-complicated boolean return statements. For example the following code-->
+        <module name="SimplifyBooleanReturn"/>
+
+        <!--Check that the default is after all the cases in producerGroup switch statement-->
+        <module name="DefaultComesLast"/>
+        <!--Detects empty statements (standalone ";" semicolon)-->
+        <module name="EmptyStatement"/>
+        <!--Checks that long constants are defined with an upper ell-->
+        <module name="UpperEll"/>
+        <module name="ConstantName">
+            <property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)"/>
+        </module>
+        <!--Checks that local, non-final variable names conform to producerGroup format specified by the format property-->
+        <module name="LocalVariableName"/>
+        <!--Validates identifiers for local, final variables, including catch parameters-->
+        <module name="LocalFinalVariableName"/>
+        <!--Validates identifiers for non-static fields-->
+        <module name="MemberName"/>
+        <!--Validates identifiers for class type parameters-->
+        <module name="ClassTypeParameterName">
+            <property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)"/>
+        </module>
+        <!--Validates identifiers for method type parameters-->
+        <module name="MethodTypeParameterName">
+            <property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)"/>
+        </module>
+        <module name="PackageName">
+            <property name="format" value="^(org|test)\.apache\.skywalking(\.[a-zA-Z][a-zA-Z0-9]*)+$"/>
+        </module>
+        <module name="ParameterName"/>
+        <module name="StaticVariableName">
+            <property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)"/>
+        </module>
+        <module name="TypeName">
+            <property name="format" value="(^[A-Z][a-zA-Z0-9]*$)|(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)"/>
+        </module>
+        <module name="MissingOverride"/>
+
+        <!--whitespace-->
+        <module name="GenericWhitespace"/>
+        <module name="WhitespaceAfter"/>
+        <module name="WhitespaceAround"/>
+        <module name="MethodParamPad"/>
+        <module name="ParenPad"/>
+        <module name="TypecastParenPad"/>
+        <module name="EmptyLineSeparator">
+            <property name="allowNoEmptyLineBetweenFields" value="true"/>
+            <property name="allowMultipleEmptyLines" value="false"/>
+            <property name="allowMultipleEmptyLinesInsideClassMembers" value="false"/>
+        </module>
+
+        <module name="ImportControl">
+            <property name="file" value="${import.control}"/>
+            <property name="path" value="apm-sniffer/(apm-sdk-plugin|bootstrap-plugins|optional-plugins)/.+/src/main/.+Instrumentation.java$"/>
+        </module>
+
+        <module name="ImportControl">
+            <property name="file" value="${import.control}"/>
+            <property name="path" value="apm-sniffer/apm-toolkit-activation/.+/src/main/.+Activation.java$"/>
+        </module>
+    </module>
+</module>
diff --git a/apm-checkstyle/importControl.xml b/apm-checkstyle/importControl.xml
new file mode 100755
index 0000000..ed6c0a4
--- /dev/null
+++ b/apm-checkstyle/importControl.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<!DOCTYPE import-control PUBLIC
+        "-//Puppy Crawl//DTD Import Control 1.4//EN"
+        "https://checkstyle.org/dtds/import_control_1_4.dtd">
+
+<import-control pkg="org.apache.skywalking.apm.(toolkit|plugin)" regex="true">
+    <allow pkg="java"/>
+    <allow pkg="org.apache.skywalking"/>
+    <allow pkg="net.bytebuddy"/>
+</import-control>
diff --git a/mvnw b/mvnw
new file mode 100755
index 0000000..a3925bb
--- /dev/null
+++ b/mvnw
@@ -0,0 +1,310 @@
+#!/bin/sh
+# ----------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# ----------------------------------------------------------------------------
+
+# ----------------------------------------------------------------------------
+# Maven Start Up Batch script
+#
+# Required ENV vars:
+# ------------------
+#   JAVA_HOME - location of a JDK home dir
+#
+# Optional ENV vars
+# -----------------
+#   M2_HOME - location of maven2's installed home dir
+#   MAVEN_OPTS - parameters passed to the Java VM when running Maven
+#     e.g. to debug Maven itself, use
+#       set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
+#   MAVEN_SKIP_RC - flag to disable loading of mavenrc files
+# ----------------------------------------------------------------------------
+
+if [ -z "$MAVEN_SKIP_RC" ] ; then
+
+  if [ -f /etc/mavenrc ] ; then
+    . /etc/mavenrc
+  fi
+
+  if [ -f "$HOME/.mavenrc" ] ; then
+    . "$HOME/.mavenrc"
+  fi
+
+fi
+
+# OS specific support.  $var _must_ be set to either true or false.
+cygwin=false;
+darwin=false;
+mingw=false
+case "`uname`" in
+  CYGWIN*) cygwin=true ;;
+  MINGW*) mingw=true;;
+  Darwin*) darwin=true
+    # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home
+    # See https://developer.apple.com/library/mac/qa/qa1170/_index.html
+    if [ -z "$JAVA_HOME" ]; then
+      if [ -x "/usr/libexec/java_home" ]; then
+        export JAVA_HOME="`/usr/libexec/java_home`"
+      else
+        export JAVA_HOME="/Library/Java/Home"
+      fi
+    fi
+    ;;
+esac
+
+if [ -z "$JAVA_HOME" ] ; then
+  if [ -r /etc/gentoo-release ] ; then
+    JAVA_HOME=`java-config --jre-home`
+  fi
+fi
+
+if [ -z "$M2_HOME" ] ; then
+  ## resolve links - $0 may be a link to maven's home
+  PRG="$0"
+
+  # need this for relative symlinks
+  while [ -h "$PRG" ] ; do
+    ls=`ls -ld "$PRG"`
+    link=`expr "$ls" : '.*-> \(.*\)$'`
+    if expr "$link" : '/.*' > /dev/null; then
+      PRG="$link"
+    else
+      PRG="`dirname "$PRG"`/$link"
+    fi
+  done
+
+  saveddir=`pwd`
+
+  M2_HOME=`dirname "$PRG"`/..
+
+  # make it fully qualified
+  M2_HOME=`cd "$M2_HOME" && pwd`
+
+  cd "$saveddir"
+  # echo Using m2 at $M2_HOME
+fi
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched
+if $cygwin ; then
+  [ -n "$M2_HOME" ] &&
+    M2_HOME=`cygpath --unix "$M2_HOME"`
+  [ -n "$JAVA_HOME" ] &&
+    JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
+  [ -n "$CLASSPATH" ] &&
+    CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
+fi
+
+# For Mingw, ensure paths are in UNIX format before anything is touched
+if $mingw ; then
+  [ -n "$M2_HOME" ] &&
+    M2_HOME="`(cd "$M2_HOME"; pwd)`"
+  [ -n "$JAVA_HOME" ] &&
+    JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
+fi
+
+if [ -z "$JAVA_HOME" ]; then
+  javaExecutable="`which javac`"
+  if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then
+    # readlink(1) is not available as standard on Solaris 10.
+    readLink=`which readlink`
+    if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then
+      if $darwin ; then
+        javaHome="`dirname \"$javaExecutable\"`"
+        javaExecutable="`cd \"$javaHome\" && pwd -P`/javac"
+      else
+        javaExecutable="`readlink -f \"$javaExecutable\"`"
+      fi
+      javaHome="`dirname \"$javaExecutable\"`"
+      javaHome=`expr "$javaHome" : '\(.*\)/bin'`
+      JAVA_HOME="$javaHome"
+      export JAVA_HOME
+    fi
+  fi
+fi
+
+if [ -z "$JAVACMD" ] ; then
+  if [ -n "$JAVA_HOME"  ] ; then
+    if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+      # IBM's JDK on AIX uses strange locations for the executables
+      JAVACMD="$JAVA_HOME/jre/sh/java"
+    else
+      JAVACMD="$JAVA_HOME/bin/java"
+    fi
+  else
+    JAVACMD="`which java`"
+  fi
+fi
+
+if [ ! -x "$JAVACMD" ] ; then
+  echo "Error: JAVA_HOME is not defined correctly." >&2
+  echo "  We cannot execute $JAVACMD" >&2
+  exit 1
+fi
+
+if [ -z "$JAVA_HOME" ] ; then
+  echo "Warning: JAVA_HOME environment variable is not set."
+fi
+
+CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher
+
+# traverses directory structure from process work directory to filesystem root
+# first directory with .mvn subdirectory is considered project base directory
+find_maven_basedir() {
+
+  if [ -z "$1" ]
+  then
+    echo "Path not specified to find_maven_basedir"
+    return 1
+  fi
+
+  basedir="$1"
+  wdir="$1"
+  while [ "$wdir" != '/' ] ; do
+    if [ -d "$wdir"/.mvn ] ; then
+      basedir=$wdir
+      break
+    fi
+    # workaround for JBEAP-8937 (on Solaris 10/Sparc)
+    if [ -d "${wdir}" ]; then
+      wdir=`cd "$wdir/.."; pwd`
+    fi
+    # end of workaround
+  done
+  echo "${basedir}"
+}
+
+# concatenates all lines of a file
+concat_lines() {
+  if [ -f "$1" ]; then
+    echo "$(tr -s '\n' ' ' < "$1")"
+  fi
+}
+
+BASE_DIR=`find_maven_basedir "$(pwd)"`
+if [ -z "$BASE_DIR" ]; then
+  exit 1;
+fi
+
+##########################################################################################
+# Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
+# This allows using the maven wrapper in projects that prohibit checking in binary data.
+##########################################################################################
+if [ -r "$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" ]; then
+    if [ "$MVNW_VERBOSE" = true ]; then
+      echo "Found .mvn/wrapper/maven-wrapper.jar"
+    fi
+else
+    if [ "$MVNW_VERBOSE" = true ]; then
+      echo "Couldn't find .mvn/wrapper/maven-wrapper.jar, downloading it ..."
+    fi
+    if [ -n "$MVNW_REPOURL" ]; then
+      jarUrl="$MVNW_REPOURL/io/takari/maven-wrapper/0.5.5/maven-wrapper-0.5.5.jar"
+    else
+      jarUrl="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.5/maven-wrapper-0.5.5.jar"
+    fi
+    while IFS="=" read key value; do
+      case "$key" in (wrapperUrl) jarUrl="$value"; break ;;
+      esac
+    done < "$BASE_DIR/.mvn/wrapper/maven-wrapper.properties"
+    if [ "$MVNW_VERBOSE" = true ]; then
+      echo "Downloading from: $jarUrl"
+    fi
+    wrapperJarPath="$BASE_DIR/.mvn/wrapper/maven-wrapper.jar"
+    if $cygwin; then
+      wrapperJarPath=`cygpath --path --windows "$wrapperJarPath"`
+    fi
+
+    if command -v wget > /dev/null; then
+        if [ "$MVNW_VERBOSE" = true ]; then
+          echo "Found wget ... using wget"
+        fi
+        if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
+            wget "$jarUrl" -O "$wrapperJarPath"
+        else
+            wget --http-user=$MVNW_USERNAME --http-password=$MVNW_PASSWORD "$jarUrl" -O "$wrapperJarPath"
+        fi
+    elif command -v curl > /dev/null; then
+        if [ "$MVNW_VERBOSE" = true ]; then
+          echo "Found curl ... using curl"
+        fi
+        if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
+            curl -o "$wrapperJarPath" "$jarUrl" -f
+        else
+            curl --user $MVNW_USERNAME:$MVNW_PASSWORD -o "$wrapperJarPath" "$jarUrl" -f
+        fi
+
+    else
+        if [ "$MVNW_VERBOSE" = true ]; then
+          echo "Falling back to using Java to download"
+        fi
+        javaClass="$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.java"
+        # For Cygwin, switch paths to Windows format before running javac
+        if $cygwin; then
+          javaClass=`cygpath --path --windows "$javaClass"`
+        fi
+        if [ -e "$javaClass" ]; then
+            if [ ! -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then
+                if [ "$MVNW_VERBOSE" = true ]; then
+                  echo " - Compiling MavenWrapperDownloader.java ..."
+                fi
+                # Compiling the Java class
+                ("$JAVA_HOME/bin/javac" "$javaClass")
+            fi
+            if [ -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then
+                # Running the downloader
+                if [ "$MVNW_VERBOSE" = true ]; then
+                  echo " - Running MavenWrapperDownloader.java ..."
+                fi
+                ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$MAVEN_PROJECTBASEDIR")
+            fi
+        fi
+    fi
+fi
+##########################################################################################
+# End of extension
+##########################################################################################
+
+export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}
+if [ "$MVNW_VERBOSE" = true ]; then
+  echo $MAVEN_PROJECTBASEDIR
+fi
+MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin; then
+  [ -n "$M2_HOME" ] &&
+    M2_HOME=`cygpath --path --windows "$M2_HOME"`
+  [ -n "$JAVA_HOME" ] &&
+    JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
+  [ -n "$CLASSPATH" ] &&
+    CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
+  [ -n "$MAVEN_PROJECTBASEDIR" ] &&
+    MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"`
+fi
+
+# Provide a "standardized" way to retrieve the CLI args that will
+# work with both Windows and non-Windows executions.
+MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@"
+export MAVEN_CMD_LINE_ARGS
+
+WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
+
+exec "$JAVACMD" \
+  $MAVEN_OPTS \
+  -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
+  "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
+  ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@"
diff --git a/mvnw.cmd b/mvnw.cmd
new file mode 100755
index 0000000..15123ea
--- /dev/null
+++ b/mvnw.cmd
@@ -0,0 +1,182 @@
+@REM ----------------------------------------------------------------------------
+@REM Licensed to the Apache Software Foundation (ASF) under one
+@REM or more contributor license agreements.  See the NOTICE file
+@REM distributed with this work for additional information
+@REM regarding copyright ownership.  The ASF licenses this file
+@REM to you under the Apache License, Version 2.0 (the
+@REM "License"); you may not use this file except in compliance
+@REM with the License.  You may obtain a copy of the License at
+@REM
+@REM    http://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM Unless required by applicable law or agreed to in writing,
+@REM software distributed under the License is distributed on an
+@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+@REM KIND, either express or implied.  See the License for the
+@REM specific language governing permissions and limitations
+@REM under the License.
+@REM ----------------------------------------------------------------------------
+
+@REM ----------------------------------------------------------------------------
+@REM Maven Start Up Batch script
+@REM
+@REM Required ENV vars:
+@REM JAVA_HOME - location of a JDK home dir
+@REM
+@REM Optional ENV vars
+@REM M2_HOME - location of maven2's installed home dir
+@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
+@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending
+@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
+@REM     e.g. to debug Maven itself, use
+@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
+@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
+@REM ----------------------------------------------------------------------------
+
+@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
+@echo off
+@REM set title of command window
+title %0
+@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on'
+@if "%MAVEN_BATCH_ECHO%" == "on"  echo %MAVEN_BATCH_ECHO%
+
+@REM set %HOME% to equivalent of $HOME
+if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
+
+@REM Execute a user defined script before this one
+if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
+@REM check for pre script, once with legacy .bat ending and once with .cmd ending
+if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
+if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
+:skipRcPre
+
+@setlocal
+
+set ERROR_CODE=0
+
+@REM To isolate internal variables from possible post scripts, we use another setlocal
+@setlocal
+
+@REM ==== START VALIDATION ====
+if not "%JAVA_HOME%" == "" goto OkJHome
+
+echo.
+echo Error: JAVA_HOME not found in your environment. >&2
+echo Please set the JAVA_HOME variable in your environment to match the >&2
+echo location of your Java installation. >&2
+echo.
+goto error
+
+:OkJHome
+if exist "%JAVA_HOME%\bin\java.exe" goto init
+
+echo.
+echo Error: JAVA_HOME is set to an invalid directory. >&2
+echo JAVA_HOME = "%JAVA_HOME%" >&2
+echo Please set the JAVA_HOME variable in your environment to match the >&2
+echo location of your Java installation. >&2
+echo.
+goto error
+
+@REM ==== END VALIDATION ====
+
+:init
+
+@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
+@REM Fallback to current working directory if not found.
+
+set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
+IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
+
+set EXEC_DIR=%CD%
+set WDIR=%EXEC_DIR%
+:findBaseDir
+IF EXIST "%WDIR%"\.mvn goto baseDirFound
+cd ..
+IF "%WDIR%"=="%CD%" goto baseDirNotFound
+set WDIR=%CD%
+goto findBaseDir
+
+:baseDirFound
+set MAVEN_PROJECTBASEDIR=%WDIR%
+cd "%EXEC_DIR%"
+goto endDetectBaseDir
+
+:baseDirNotFound
+set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
+cd "%EXEC_DIR%"
+
+:endDetectBaseDir
+
+IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
+
+@setlocal EnableExtensions EnableDelayedExpansion
+for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
+@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
+
+:endReadAdditionalConfig
+
+SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
+set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
+set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
+
+set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.5/maven-wrapper-0.5.5.jar"
+
+FOR /F "tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO (
+    IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B
+)
+
+@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
+@REM This allows using the maven wrapper in projects that prohibit checking in binary data.
+if exist %WRAPPER_JAR% (
+    if "%MVNW_VERBOSE%" == "true" (
+        echo Found %WRAPPER_JAR%
+    )
+) else (
+    if not "%MVNW_REPOURL%" == "" (
+        SET DOWNLOAD_URL="%MVNW_REPOURL%/io/takari/maven-wrapper/0.5.5/maven-wrapper-0.5.5.jar"
+    )
+    if "%MVNW_VERBOSE%" == "true" (
+        echo Couldn't find %WRAPPER_JAR%, downloading it ...
+        echo Downloading from: %DOWNLOAD_URL%
+    )
+
+    powershell -Command "&{"^
+		"$webclient = new-object System.Net.WebClient;"^
+		"if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^
+		"$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^
+		"}"^
+		"[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"^
+		"}"
+    if "%MVNW_VERBOSE%" == "true" (
+        echo Finished downloading %WRAPPER_JAR%
+    )
+)
+@REM End of extension
+
+@REM Provide a "standardized" way to retrieve the CLI args that will
+@REM work with both Windows and non-Windows executions.
+set MAVEN_CMD_LINE_ARGS=%*
+
+%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
+if ERRORLEVEL 1 goto error
+goto end
+
+:error
+set ERROR_CODE=1
+
+:end
+@endlocal & set ERROR_CODE=%ERROR_CODE%
+
+if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
+@REM check for post script, once with legacy .bat ending and once with .cmd ending
+if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
+if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
+:skipRcPost
+
+@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
+if "%MAVEN_BATCH_PAUSE%" == "on" pause
+
+if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
+
+exit /B %ERROR_CODE%
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..fb36c46
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,357 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.skywalking</groupId>
+    <artifactId>banyandb-java-client</artifactId>
+    <version>0.1.0-SNAPSHOT</version>
+
+    <parent>
+        <groupId>org.apache</groupId>
+        <artifactId>apache</artifactId>
+        <version>21</version>
+        <relativePath/>
+    </parent>
+
+    <packaging>jar</packaging>
+
+    <name>banyandb-java-client</name>
+    <url>https://github.com/apache/skywalking-banyandb-java-client</url>
+
+    <scm>
+        <url>https://github.com/apache/skywalking-banyandb-java-client</url>
+        <connection>scm:git:https://github.com/apache/skywalking-banyandb-java-client.git</connection>
+        <developerConnection>scm:git:https://github.com/apache/skywalking-banyandb-java-client.git</developerConnection>
+        <tag>HEAD</tag>
+    </scm>
+
+    <issueManagement>
+        <system>GitHub</system>
+        <url>https://github.com/apache/skywalking/issues</url>
+    </issueManagement>
+
+    <licenses>
+        <license>
+            <name>The Apache Software License, Version 2.0</name>
+            <url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
+        </license>
+    </licenses>
+
+    <mailingLists>
+        <mailingList>
+            <name>SkyWalking Developer List</name>
+            <post>dev@skywalking.apache.org</post>
+            <subscribe>dev-subscribe@skywalking.apache.org</subscribe>
+            <unsubscribe>dev-unsubscribe@skywalking.apache.org</unsubscribe>
+        </mailingList>
+        <mailingList>
+            <name>SkyWalking Commits</name>
+            <post>commits@skywalking.apache.org</post>
+            <subscribe>commits-subscribe@skywalking.apache.org</subscribe>
+            <unsubscribe>commits-unsubscribe@skywalking.apache.org</unsubscribe>
+        </mailingList>
+    </mailingLists>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <compiler.version>1.8</compiler.version>
+        <powermock.version>2.0.7</powermock.version>
+        <checkstyle.version>6.18</checkstyle.version>
+        <junit.version>4.12</junit.version>
+        <mockito-core.version>3.5.13</mockito-core.version>
+        <lombok.version>1.18.20</lombok.version>
+
+        <!-- core lib dependency -->
+        <bytebuddy.version>1.10.19</bytebuddy.version>
+        <grpc.version>1.32.1</grpc.version>
+        <gson.version>2.8.6</gson.version>
+        <os-maven-plugin.version>1.6.2</os-maven-plugin.version>
+        <protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
+        <com.google.protobuf.protoc.version>3.12.0</com.google.protobuf.protoc.version>
+        <protoc-gen-grpc-java.plugin.version>1.32.1</protoc-gen-grpc-java.plugin.version>
+        <netty-tcnative-boringssl-static.version>2.0.39.Final</netty-tcnative-boringssl-static.version>
+        <javax.annotation-api.version>1.3.2</javax.annotation-api.version>
+        <!-- necessary for Java 9+ -->
+        <org.apache.tomcat.annotations-api.version>6.0.53</org.apache.tomcat.annotations-api.version>
+        <slf4j.version>1.7.30</slf4j.version>
+
+        <!-- Plugin versions -->
+        <docker.plugin.version>0.4.13</docker.plugin.version>
+        <takari-maven-plugin.version>0.6.1</takari-maven-plugin.version>
+        <exec-maven-plugin.version>1.6.0</exec-maven-plugin.version>
+        <maven-antrun-plugin.version>1.8</maven-antrun-plugin.version>
+        <maven-dependency-plugin.version>2.10</maven-dependency-plugin.version>
+        <maven-deploy-plugin.version>2.8.2</maven-deploy-plugin.version>
+        <maven-assembly-plugin.version>3.1.0</maven-assembly-plugin.version>
+        <maven-failsafe-plugin.version>2.22.0</maven-failsafe-plugin.version>
+        <build-helper-maven-plugin.version>3.2.0</build-helper-maven-plugin.version>
+        <maven-jar-plugin.version>3.1.0</maven-jar-plugin.version>
+        <maven-enforcer-plugin.version>3.0.0-M2</maven-enforcer-plugin.version>
+        <maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
+        <maven-resource-plugin.version>3.1.0</maven-resource-plugin.version>
+        <maven-source-plugin.version>3.0.1</maven-source-plugin.version>
+        <versions-maven-plugin.version>2.5</versions-maven-plugin.version>
+        <coveralls-maven-plugin.version>4.3.0</coveralls-maven-plugin.version>
+        <maven-checkstyle-plugin.version>3.1.0</maven-checkstyle-plugin.version>
+        <jmh.version>1.21</jmh.version>
+        <checkstyle.fails.on.error>true</checkstyle.fails.on.error>
+
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-netty-shaded</artifactId>
+            <version>${grpc.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-protobuf</artifactId>
+            <version>${grpc.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-stub</artifactId>
+            <version>${grpc.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-tcnative-boringssl-static</artifactId>
+            <version>${netty-tcnative-boringssl-static.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+        <dependency> <!-- necessary for Java 9+ -->
+            <groupId>org.apache.tomcat</groupId>
+            <artifactId>annotations-api</artifactId>
+            <version>${org.apache.tomcat.annotations-api.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-testing</artifactId>
+            <version>${grpc.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${mockito-core.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito2</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>${lombok.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>javax.annotation</groupId>
+            <artifactId>javax.annotation-api</artifactId>
+            <version>${javax.annotation-api.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>kr.motd.maven</groupId>
+                <artifactId>os-maven-plugin</artifactId>
+                <version>${os-maven-plugin.version}</version>
+                <executions>
+                    <execution>
+                        <phase>initialize</phase>
+                        <goals>
+                            <goal>detect</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.xolstice.maven.plugins</groupId>
+                <artifactId>protobuf-maven-plugin</artifactId>
+                <version>${protobuf-maven-plugin.version}</version>
+                <configuration>
+                    <!--
+                      The version of protoc must match protobuf-java. If you don't depend on
+                      protobuf-java directly, you will be transitively depending on the
+                      protobuf-java version that grpc depends on.
+                    -->
+                    <protocArtifact>
+                        com.google.protobuf:protoc:${com.google.protobuf.protoc.version}:exe:${os.detected.classifier}
+                    </protocArtifact>
+                    <pluginId>grpc-java</pluginId>
+                    <pluginArtifact>
+                        io.grpc:protoc-gen-grpc-java:${protoc-gen-grpc-java.plugin.version}:exe:${os.detected.classifier}
+                    </pluginArtifact>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>grpc-build</id>
+                        <goals>
+                            <goal>compile</goal>
+                            <goal>compile-custom</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-enforcer-plugin</artifactId>
+                <version>${maven-enforcer-plugin.version}</version>
+                <executions>
+                    <execution>
+                        <id>enforce-java</id>
+                        <goals>
+                            <goal>enforce</goal>
+                        </goals>
+                        <phase>validate</phase>
+                        <configuration>
+                            <rules>
+                                <requireJavaVersion>
+                                    <!-- Build has not yet been updated for Java 9+ -->
+                                    <version>1.8</version>
+                                </requireJavaVersion>
+                                <requireMavenVersion>
+                                    <version>3.6</version>
+                                </requireMavenVersion>
+                            </rules>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>${maven-compiler-plugin.version}</version>
+                <configuration>
+                    <source>${compiler.version}</source>
+                    <target>${compiler.version}</target>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>${maven-resource-plugin.version}</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>com.spotify</groupId>
+                <artifactId>docker-maven-plugin</artifactId>
+                <version>${docker.plugin.version}</version>
+                <configuration>
+                    <skipDocker>true</skipDocker>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>${maven-source-plugin.version}</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <phase>none</phase>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <version>${maven-checkstyle-plugin.version}</version>
+                <configuration>
+                    <configLocation>${maven.multiModuleProjectDirectory}/apm-checkstyle/checkStyle.xml</configLocation>
+                    <headerLocation>${maven.multiModuleProjectDirectory}/apm-checkstyle/CHECKSTYLE_HEAD</headerLocation>
+                    <encoding>UTF-8</encoding>
+                    <consoleOutput>true</consoleOutput>
+                    <includeTestSourceDirectory>true</includeTestSourceDirectory>
+                    <failOnViolation>${checkstyle.fails.on.error}</failOnViolation>
+                    <sourceDirectories>
+                        <sourceDirectory>${project.build.sourceDirectory}</sourceDirectory>
+                        <sourceDirectory>${project.build.testSourceDirectory}</sourceDirectory>
+                    </sourceDirectories>
+                    <resourceIncludes>
+                        **/*.properties,
+                        **/*.sh,
+                        **/*.bat,
+                        **/*.yml,
+                        **/*.yaml,
+                        **/*.xml
+                    </resourceIncludes>
+                    <resourceExcludes>
+                        **/.asf.yaml,
+                        **/.github/**
+                    </resourceExcludes>
+                    <excludes>
+                        **/target/generated-test-sources/**,
+                        org/apache/skywalking/apm/network/register/v2/**/*.java,
+                        org/apache/skywalking/apm/network/common/**/*.java,
+                        org/apache/skywalking/apm/network/servicemesh/**/*.java,
+                        org/apache/skywalking/apm/network/language/**/*.java,
+                        org/apache/skywalking/oap/server/core/remote/grpc/proto/*.java,
+                        org/apache/skywalking/oal/rt/grammar/*.java,
+                        org/apache/skywalking/oap/server/exporter/grpc/*.java,
+                        org/apache/skywalking/oap/server/configuration/service/*.java,
+                        **/generated/*_jmhType*.java,
+                        **/generated/*_jmhTest.java
+                    </excludes>
+                    <propertyExpansion>
+                        import.control=${maven.multiModuleProjectDirectory}/apm-checkstyle/importControl.xml
+                    </propertyExpansion>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>validate</id>
+                        <phase>process-sources</phase>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/DataCarrier.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/DataCarrier.java
new file mode 100644
index 0000000..db2ed2d
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/DataCarrier.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier;
+
+import java.util.Properties;
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.BufferStrategy;
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Channels;
+import org.apache.skywalking.banyandb.commons.datacarrier.consumer.ConsumeDriver;
+import org.apache.skywalking.banyandb.commons.datacarrier.consumer.ConsumerPool;
+import org.apache.skywalking.banyandb.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.banyandb.commons.datacarrier.consumer.IDriver;
+import org.apache.skywalking.banyandb.commons.datacarrier.partition.IDataPartitioner;
+import org.apache.skywalking.banyandb.commons.datacarrier.partition.SimpleRollingPartitioner;
+
+/**
+ * DataCarrier main class. use this instance to set Producer/Consumer Model.
+ */
+public class DataCarrier<T> {
+    private Channels<T> channels;
+    private IDriver driver;
+    private String name;
+
+    public DataCarrier(int channelSize, int bufferSize) {
+        this("DEFAULT", channelSize, bufferSize);
+    }
+
+    public DataCarrier(String name, int channelSize, int bufferSize) {
+        this(name, name, channelSize, bufferSize);
+    }
+
+    public DataCarrier(String name, String envPrefix, int channelSize, int bufferSize) {
+        this(name, envPrefix, channelSize, bufferSize, BufferStrategy.BLOCKING);
+    }
+
+    public DataCarrier(String name, String envPrefix, int channelSize, int bufferSize, BufferStrategy strategy) {
+        this.name = name;
+        bufferSize = EnvUtil.getInt(envPrefix + "_BUFFER_SIZE", bufferSize);
+        channelSize = EnvUtil.getInt(envPrefix + "_CHANNEL_SIZE", channelSize);
+        channels = new Channels<>(channelSize, bufferSize, new SimpleRollingPartitioner<T>(), strategy);
+    }
+
+    public DataCarrier(int channelSize, int bufferSize, BufferStrategy strategy) {
+        this("DEFAULT", "DEFAULT", channelSize, bufferSize, strategy);
+    }
+
+    /**
+     * set a new IDataPartitioner. It will cover the current one or default one.(Default is {@link
+     * SimpleRollingPartitioner}
+     *
+     * @param dataPartitioner to partition data into different channel by some rules.
+     * @return DataCarrier instance for chain
+     */
+    public DataCarrier setPartitioner(IDataPartitioner<T> dataPartitioner) {
+        this.channels.setPartitioner(dataPartitioner);
+        return this;
+    }
+
+    /**
+     * produce data to buffer, using the given {@link BufferStrategy}.
+     *
+     * @return false means produce data failure. The data will not be consumed.
+     */
+    public boolean produce(T data) {
+        if (driver != null) {
+            if (!driver.isRunning(channels)) {
+                return false;
+            }
+        }
+
+        return this.channels.save(data);
+    }
+
+    /**
+     * set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work.
+     *
+     * @param consumerClass class of consumer
+     * @param num           number of consumer threads
+     * @param properties    for initializing consumer.
+     */
+    public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass,
+                               int num,
+                               long consumeCycle,
+                               Properties properties) {
+        if (driver != null) {
+            driver.close(channels);
+        }
+        driver = new ConsumeDriver<T>(this.name, this.channels, consumerClass, num, consumeCycle, properties);
+        driver.begin(channels);
+        return this;
+    }
+
+    /**
+     * set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work with 20
+     * millis consume cycle.
+     *
+     * @param consumerClass class of consumer
+     * @param num           number of consumer threads
+     */
+    public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass, int num) {
+        return this.consume(consumerClass, num, 20, new Properties());
+    }
+
+    /**
+     * set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work.
+     *
+     * @param consumer single instance of consumer, all consumer threads will all use this instance.
+     * @param num      number of consumer threads
+     */
+    public DataCarrier consume(IConsumer<T> consumer, int num, long consumeCycle) {
+        if (driver != null) {
+            driver.close(channels);
+        }
+        driver = new ConsumeDriver<T>(this.name, this.channels, consumer, num, consumeCycle);
+        driver.begin(channels);
+        return this;
+    }
+
+    /**
+     * set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work with 20
+     * millis consume cycle.
+     *
+     * @param consumer single instance of consumer, all consumer threads will all use this instance.
+     * @param num      number of consumer threads
+     */
+    public DataCarrier consume(IConsumer<T> consumer, int num) {
+        return this.consume(consumer, num, 20);
+    }
+
+    /**
+     * Set a consumer pool to manage the channels of this DataCarrier. Then consumerPool could use its own consuming
+     * model to adjust the consumer thread and throughput.
+     */
+    public DataCarrier consume(ConsumerPool consumerPool, IConsumer<T> consumer) {
+        driver = consumerPool;
+        consumerPool.add(this.name, channels, consumer);
+        driver.begin(channels);
+        return this;
+    }
+
+    /**
+     * shutdown all consumer threads, if consumer threads are running. Notice {@link BufferStrategy}: if {@link
+     * BufferStrategy} == {@link BufferStrategy#BLOCKING}, shutdown consumeDriver maybe cause blocking when producing.
+     * Better way to change consumeDriver are use {@link DataCarrier#consume}
+     */
+    public void shutdownConsumers() {
+        if (driver != null) {
+            driver.close(channels);
+        }
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/EnvUtil.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/EnvUtil.java
new file mode 100644
index 0000000..36f0cb6
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/EnvUtil.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier;
+
+/**
+ * Read value from system env.
+ */
+public class EnvUtil {
+    public static int getInt(String envName, int defaultValue) {
+        int value = defaultValue;
+        String envValue = System.getenv(envName);
+        if (envValue != null) {
+            try {
+                value = Integer.parseInt(envValue);
+            } catch (NumberFormatException e) {
+
+            }
+        }
+        return value;
+    }
+
+    public static long getLong(String envName, long defaultValue) {
+        long value = defaultValue;
+        String envValue = System.getenv(envName);
+        if (envValue != null) {
+            try {
+                value = Long.parseLong(envValue);
+            } catch (NumberFormatException e) {
+
+            }
+        }
+        return value;
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/ArrayBlockingQueueBuffer.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/ArrayBlockingQueueBuffer.java
new file mode 100644
index 0000000..feb2a99
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/ArrayBlockingQueueBuffer.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.buffer;
+
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+
+/**
+ * The buffer implementation based on JDK ArrayBlockingQueue.
+ * <p>
+ * This implementation has better performance in server side. We are still trying to research whether this is suitable
+ * for agent side, which is more sensitive about blocks.
+ */
+public class ArrayBlockingQueueBuffer<T> implements QueueBuffer<T> {
+    private BufferStrategy strategy;
+    private ArrayBlockingQueue<T> queue;
+    private int bufferSize;
+
+    ArrayBlockingQueueBuffer(int bufferSize, BufferStrategy strategy) {
+        this.strategy = strategy;
+        this.queue = new ArrayBlockingQueue<T>(bufferSize);
+        this.bufferSize = bufferSize;
+    }
+
+    @Override
+    public boolean save(T data) {
+        //only BufferStrategy.BLOCKING
+        try {
+            queue.put(data);
+        } catch (InterruptedException e) {
+            // Ignore the error
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public void setStrategy(BufferStrategy strategy) {
+        this.strategy = strategy;
+    }
+
+    @Override
+    public void obtain(List<T> consumeList) {
+        queue.drainTo(consumeList);
+    }
+
+    @Override
+    public int getBufferSize() {
+        return bufferSize;
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/Buffer.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/Buffer.java
new file mode 100644
index 0000000..38e92af
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/Buffer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.buffer;
+
+import java.util.List;
+import org.apache.skywalking.banyandb.commons.datacarrier.common.AtomicRangeInteger;
+
+/**
+ * Self implementation ring queue.
+ */
+public class Buffer<T> implements QueueBuffer<T> {
+    private final Object[] buffer;
+    private BufferStrategy strategy;
+    private AtomicRangeInteger index;
+
+    Buffer(int bufferSize, BufferStrategy strategy) {
+        buffer = new Object[bufferSize];
+        this.strategy = strategy;
+        index = new AtomicRangeInteger(0, bufferSize);
+    }
+
+    @Override
+    public void setStrategy(BufferStrategy strategy) {
+        this.strategy = strategy;
+    }
+
+    @Override
+    public boolean save(T data) {
+        int i = index.getAndIncrement();
+        if (buffer[i] != null) {
+            switch (strategy) {
+                case IF_POSSIBLE:
+                    return false;
+                default:
+            }
+        }
+        buffer[i] = data;
+        return true;
+    }
+
+    @Override
+    public int getBufferSize() {
+        return buffer.length;
+    }
+
+    @Override
+    public void obtain(List<T> consumeList) {
+        this.obtain(consumeList, 0, buffer.length);
+    }
+
+    void obtain(List<T> consumeList, int start, int end) {
+        for (int i = start; i < end; i++) {
+            if (buffer[i] != null) {
+                consumeList.add((T) buffer[i]);
+                buffer[i] = null;
+            }
+        }
+    }
+
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/BufferStrategy.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/BufferStrategy.java
new file mode 100644
index 0000000..9dbc556
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/BufferStrategy.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.buffer;
+
+public enum BufferStrategy {
+    BLOCKING, IF_POSSIBLE
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/Channels.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/Channels.java
new file mode 100644
index 0000000..46ce98f
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/Channels.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.buffer;
+
+import org.apache.skywalking.banyandb.commons.datacarrier.partition.IDataPartitioner;
+
+/**
+ * Channels of Buffer It contains all buffer data which belongs to this channel. It supports several strategy when
+ * buffer is full. The Default is BLOCKING <p> Created by wusheng on 2016/10/25.
+ */
+public class Channels<T> {
+    private final QueueBuffer<T>[] bufferChannels;
+    private IDataPartitioner<T> dataPartitioner;
+    private final BufferStrategy strategy;
+    private final long size;
+
+    public Channels(int channelSize, int bufferSize, IDataPartitioner<T> partitioner, BufferStrategy strategy) {
+        this.dataPartitioner = partitioner;
+        this.strategy = strategy;
+        bufferChannels = new QueueBuffer[channelSize];
+        for (int i = 0; i < channelSize; i++) {
+            if (BufferStrategy.BLOCKING.equals(strategy)) {
+                bufferChannels[i] = new ArrayBlockingQueueBuffer<>(bufferSize, strategy);
+            } else {
+                bufferChannels[i] = new Buffer<>(bufferSize, strategy);
+            }
+        }
+        // noinspection PointlessArithmeticExpression
+        size = 1L * channelSize * bufferSize; // it's not pointless, it prevents numeric overflow before assigning an integer to a long
+    }
+
+    public boolean save(T data) {
+        int index = dataPartitioner.partition(bufferChannels.length, data);
+        int retryCountDown = 1;
+        if (BufferStrategy.IF_POSSIBLE.equals(strategy)) {
+            int maxRetryCount = dataPartitioner.maxRetryCount();
+            if (maxRetryCount > 1) {
+                retryCountDown = maxRetryCount;
+            }
+        }
+        for (; retryCountDown > 0; retryCountDown--) {
+            if (bufferChannels[index].save(data)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public void setPartitioner(IDataPartitioner<T> dataPartitioner) {
+        this.dataPartitioner = dataPartitioner;
+    }
+
+    /**
+     * override the strategy at runtime. Notice, this will override several channels one by one. So, when running
+     * setStrategy, each channel may use different BufferStrategy
+     */
+    public void setStrategy(BufferStrategy strategy) {
+        for (QueueBuffer<T> buffer : bufferChannels) {
+            buffer.setStrategy(strategy);
+        }
+    }
+
+    /**
+     * get channelSize
+     */
+    public int getChannelSize() {
+        return this.bufferChannels.length;
+    }
+
+    public long size() {
+        return size;
+    }
+
+    public QueueBuffer<T> getBuffer(int index) {
+        return this.bufferChannels[index];
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/QueueBuffer.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/QueueBuffer.java
new file mode 100644
index 0000000..4baf83c
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/QueueBuffer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.buffer;
+
+import java.util.List;
+
+/**
+ * Queue buffer interface.
+ */
+public interface QueueBuffer<T> {
+    /**
+     * Save data into the queue;
+     *
+     * @param data to add.
+     * @return true if saved
+     */
+    boolean save(T data);
+
+    /**
+     * Set different strategy when queue is full.
+     */
+    void setStrategy(BufferStrategy strategy);
+
+    /**
+     * Obtain the existing data from the queue
+     */
+    void obtain(List<T> consumeList);
+
+    int getBufferSize();
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/common/AtomicRangeInteger.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/common/AtomicRangeInteger.java
new file mode 100644
index 0000000..cb2b4be
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/common/AtomicRangeInteger.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.common;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicIntegerArray;
+
+public class AtomicRangeInteger extends Number implements Serializable {
+    private static final long serialVersionUID = -4099792402691141643L;
+    private AtomicIntegerArray values;
+
+    private static final int VALUE_OFFSET = 15;
+
+    private int startValue;
+    private int endValue;
+
+    public AtomicRangeInteger(int startValue, int maxValue) {
+        this.values = new AtomicIntegerArray(31);
+        this.values.set(VALUE_OFFSET, startValue);
+        this.startValue = startValue;
+        this.endValue = maxValue - 1;
+    }
+
+    public final int getAndIncrement() {
+        int next;
+        do {
+            next = this.values.incrementAndGet(VALUE_OFFSET);
+            if (next > endValue && this.values.compareAndSet(VALUE_OFFSET, next, startValue)) {
+                return endValue;
+            }
+        }
+        while (next > endValue);
+
+        return next - 1;
+    }
+
+    public final int get() {
+        return this.values.get(VALUE_OFFSET);
+    }
+
+    @Override
+    public int intValue() {
+        return this.values.get(VALUE_OFFSET);
+    }
+
+    @Override
+    public long longValue() {
+        return this.values.get(VALUE_OFFSET);
+    }
+
+    @Override
+    public float floatValue() {
+        return this.values.get(VALUE_OFFSET);
+    }
+
+    @Override
+    public double doubleValue() {
+        return this.values.get(VALUE_OFFSET);
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/BulkConsumePool.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/BulkConsumePool.java
new file mode 100644
index 0000000..fd45499
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/BulkConsumePool.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import org.apache.skywalking.banyandb.commons.datacarrier.EnvUtil;
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Channels;
+
+/**
+ * BulkConsumePool works for consuming data from multiple channels(DataCarrier instances), with multiple {@link
+ * MultipleChannelsConsumer}s.
+ * <p>
+ * In typical case, the number of {@link MultipleChannelsConsumer} should be less than the number of channels.
+ */
+public class BulkConsumePool implements ConsumerPool {
+    private List<MultipleChannelsConsumer> allConsumers;
+    private volatile boolean isStarted = false;
+
+    public BulkConsumePool(String name, int size, long consumeCycle) {
+        size = EnvUtil.getInt(name + "_THREAD", size);
+        allConsumers = new ArrayList<MultipleChannelsConsumer>(size);
+        for (int i = 0; i < size; i++) {
+            MultipleChannelsConsumer multipleChannelsConsumer = new MultipleChannelsConsumer("DataCarrier." + name + ".BulkConsumePool." + i + ".Thread", consumeCycle);
+            multipleChannelsConsumer.setDaemon(true);
+            allConsumers.add(multipleChannelsConsumer);
+        }
+    }
+
+    @Override
+    synchronized public void add(String name, Channels channels, IConsumer consumer) {
+        MultipleChannelsConsumer multipleChannelsConsumer = getLowestPayload();
+        multipleChannelsConsumer.addNewTarget(channels, consumer);
+    }
+
+    /**
+     * Get the lowest payload consumer thread based on current allocate status.
+     *
+     * @return the lowest consumer.
+     */
+    private MultipleChannelsConsumer getLowestPayload() {
+        MultipleChannelsConsumer winner = allConsumers.get(0);
+        for (int i = 1; i < allConsumers.size(); i++) {
+            MultipleChannelsConsumer option = allConsumers.get(i);
+            if (option.size() < winner.size()) {
+                winner = option;
+            }
+        }
+        return winner;
+    }
+
+    /**
+     *
+     */
+    @Override
+    public boolean isRunning(Channels channels) {
+        return isStarted;
+    }
+
+    @Override
+    public void close(Channels channels) {
+        for (MultipleChannelsConsumer consumer : allConsumers) {
+            consumer.shutdown();
+        }
+    }
+
+    @Override
+    public void begin(Channels channels) {
+        if (isStarted) {
+            return;
+        }
+        for (MultipleChannelsConsumer consumer : allConsumers) {
+            consumer.start();
+        }
+        isStarted = true;
+    }
+
+    /**
+     * The creator for {@link BulkConsumePool}.
+     */
+    public static class Creator implements Callable<ConsumerPool> {
+        private String name;
+        private int size;
+        private long consumeCycle;
+
+        public Creator(String name, int poolSize, long consumeCycle) {
+            this.name = name;
+            this.size = poolSize;
+            this.consumeCycle = consumeCycle;
+        }
+
+        @Override
+        public ConsumerPool call() {
+            return new BulkConsumePool(name, size, consumeCycle);
+        }
+
+        public static int recommendMaxSize() {
+            return Runtime.getRuntime().availableProcessors() * 2;
+        }
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumeDriver.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumeDriver.java
new file mode 100644
index 0000000..4535a95
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumeDriver.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Properties;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Channels;
+
+/**
+ * Pool of consumers <p> Created by wusheng on 2016/10/25.
+ */
+public class ConsumeDriver<T> implements IDriver {
+    private boolean running;
+    private ConsumerThread[] consumerThreads;
+    private Channels<T> channels;
+    private ReentrantLock lock;
+
+    public ConsumeDriver(String name,
+                         Channels<T> channels, Class<? extends IConsumer<T>> consumerClass,
+                         int num,
+                         long consumeCycle,
+                         Properties properties) {
+        this(channels, num);
+        for (int i = 0; i < num; i++) {
+            consumerThreads[i] = new ConsumerThread(
+                "DataCarrier." + name + ".Consumer." + i + ".Thread", getNewConsumerInstance(consumerClass, properties),
+                consumeCycle
+            );
+            consumerThreads[i].setDaemon(true);
+        }
+    }
+
+    public ConsumeDriver(String name, Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) {
+        this(channels, num);
+        prototype.init(new Properties());
+        for (int i = 0; i < num; i++) {
+            consumerThreads[i] = new ConsumerThread(
+                "DataCarrier." + name + ".Consumer." + i + ".Thread", prototype, consumeCycle);
+            consumerThreads[i].setDaemon(true);
+        }
+
+    }
+
+    private ConsumeDriver(Channels<T> channels, int num) {
+        running = false;
+        this.channels = channels;
+        consumerThreads = new ConsumerThread[num];
+        lock = new ReentrantLock();
+    }
+
+    private IConsumer<T> getNewConsumerInstance(Class<? extends IConsumer<T>> consumerClass, Properties properties) {
+        try {
+            IConsumer<T> inst = consumerClass.getDeclaredConstructor().newInstance();
+            inst.init(properties);
+            return inst;
+        } catch (InstantiationException e) {
+            throw new ConsumerCannotBeCreatedException(e);
+        } catch (IllegalAccessException e) {
+            throw new ConsumerCannotBeCreatedException(e);
+        } catch (NoSuchMethodException e) {
+            throw new ConsumerCannotBeCreatedException(e);
+        } catch (InvocationTargetException e) {
+            throw new ConsumerCannotBeCreatedException(e);
+        }
+    }
+
+    @Override
+    public void begin(Channels channels) {
+        if (running) {
+            return;
+        }
+        lock.lock();
+        try {
+            this.allocateBuffer2Thread();
+            for (ConsumerThread consumerThread : consumerThreads) {
+                consumerThread.start();
+            }
+            running = true;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public boolean isRunning(Channels channels) {
+        return running;
+    }
+
+    private void allocateBuffer2Thread() {
+        int channelSize = this.channels.getChannelSize();
+        /**
+         * if consumerThreads.length < channelSize
+         * each consumer will process several channels.
+         *
+         * if consumerThreads.length == channelSize
+         * each consumer will process one channel.
+         *
+         * if consumerThreads.length > channelSize
+         * there will be some threads do nothing.
+         */
+        for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {
+            int consumerIndex = channelIndex % consumerThreads.length;
+            consumerThreads[consumerIndex].addDataSource(channels.getBuffer(channelIndex));
+        }
+
+    }
+
+    @Override
+    public void close(Channels channels) {
+        lock.lock();
+        try {
+            this.running = false;
+            for (ConsumerThread consumerThread : consumerThreads) {
+                consumerThread.shutdown();
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerCannotBeCreatedException.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerCannotBeCreatedException.java
new file mode 100644
index 0000000..0d0c8de
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerCannotBeCreatedException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
+
+public class ConsumerCannotBeCreatedException extends RuntimeException {
+    ConsumerCannotBeCreatedException(Throwable t) {
+        super(t);
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerPool.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerPool.java
new file mode 100644
index 0000000..fd93dfb
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerPool.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
+
+import org.apache.skywalking.banyandb.commons.datacarrier.DataCarrier;
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Channels;
+
+/**
+ * The Consumer pool could support data consumer from multiple {@link DataCarrier}s, by using different consume thread
+ * management models.
+ */
+public interface ConsumerPool extends IDriver {
+    void add(String name, Channels channels, IConsumer consumer);
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerPoolFactory.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerPoolFactory.java
new file mode 100644
index 0000000..8eb1581
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerPoolFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+/**
+ * Consumer Pool Factory provides global management for all Consumer Pool.
+ */
+public enum ConsumerPoolFactory {
+    INSTANCE;
+
+    private final Map<String, ConsumerPool> pools;
+
+    ConsumerPoolFactory() {
+        pools = new HashMap<>();
+    }
+
+    public synchronized boolean createIfAbsent(String poolName, Callable<ConsumerPool> creator) throws Exception {
+        if (pools.containsKey(poolName)) {
+            return false;
+        } else {
+            pools.put(poolName, creator.call());
+            return true;
+        }
+    }
+
+    public ConsumerPool get(String poolName) {
+        return pools.get(poolName);
+    }
+
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerThread.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerThread.java
new file mode 100644
index 0000000..b5f5478
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerThread.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Buffer;
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.QueueBuffer;
+
+public class ConsumerThread<T> extends Thread {
+    private volatile boolean running;
+    private IConsumer<T> consumer;
+    private List<DataSource> dataSources;
+    private long consumeCycle;
+
+    ConsumerThread(String threadName, IConsumer<T> consumer, long consumeCycle) {
+        super(threadName);
+        this.consumer = consumer;
+        running = false;
+        dataSources = new ArrayList<DataSource>(1);
+        this.consumeCycle = consumeCycle;
+    }
+
+    /**
+     * add whole buffer to consume
+     */
+    void addDataSource(QueueBuffer<T> sourceBuffer) {
+        this.dataSources.add(new DataSource(sourceBuffer));
+    }
+
+    @Override
+    public void run() {
+        running = true;
+
+        final List<T> consumeList = new ArrayList<T>(1500);
+        while (running) {
+            if (!consume(consumeList)) {
+                try {
+                    Thread.sleep(consumeCycle);
+                } catch (InterruptedException e) {
+                }
+            }
+        }
+
+        // consumer thread is going to stop
+        // consume the last time
+        consume(consumeList);
+
+        consumer.onExit();
+    }
+
+    private boolean consume(List<T> consumeList) {
+        for (DataSource dataSource : dataSources) {
+            dataSource.obtain(consumeList);
+        }
+
+        if (!consumeList.isEmpty()) {
+            try {
+                consumer.consume(consumeList);
+            } catch (Throwable t) {
+                consumer.onError(consumeList, t);
+            } finally {
+                consumeList.clear();
+            }
+            return true;
+        }
+        consumer.nothingToConsume();
+        return false;
+    }
+
+    void shutdown() {
+        running = false;
+    }
+
+    /**
+     * DataSource is a refer to {@link Buffer}.
+     */
+    class DataSource {
+        private QueueBuffer<T> sourceBuffer;
+
+        DataSource(QueueBuffer<T> sourceBuffer) {
+            this.sourceBuffer = sourceBuffer;
+        }
+
+        void obtain(List<T> consumeList) {
+            sourceBuffer.obtain(consumeList);
+        }
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/IConsumer.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/IConsumer.java
new file mode 100644
index 0000000..9d965e1
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/IConsumer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
+
+import java.util.List;
+import java.util.Properties;
+
+public interface IConsumer<T> {
+    void init(final Properties properties);
+
+    void consume(List<T> data);
+
+    void onError(List<T> data, Throwable t);
+
+    void onExit();
+
+    /**
+     * Notify the implementation, if there is nothing fetched from the queue. This could be used as a timer to trigger
+     * reaction if the queue has no element.
+     */
+    default void nothingToConsume() {
+        return;
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/IDriver.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/IDriver.java
new file mode 100644
index 0000000..7011c4e
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/IDriver.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
+
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Channels;
+
+/**
+ * The driver of consumer.
+ */
+public interface IDriver {
+    boolean isRunning(Channels channels);
+
+    void close(Channels channels);
+
+    void begin(Channels channels);
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/MultipleChannelsConsumer.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/MultipleChannelsConsumer.java
new file mode 100644
index 0000000..19969fa
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/MultipleChannelsConsumer.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Channels;
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.QueueBuffer;
+
+/**
+ * MultipleChannelsConsumer represent a single consumer thread, but support multiple channels with their {@link
+ * IConsumer}s
+ */
+public class MultipleChannelsConsumer extends Thread {
+    private volatile boolean running;
+    private volatile ArrayList<Group> consumeTargets;
+    @SuppressWarnings("NonAtomicVolatileUpdate")
+    private volatile long size;
+    private final long consumeCycle;
+
+    public MultipleChannelsConsumer(String threadName, long consumeCycle) {
+        super(threadName);
+        this.consumeTargets = new ArrayList<Group>();
+        this.consumeCycle = consumeCycle;
+    }
+
+    @Override
+    public void run() {
+        running = true;
+
+        final List consumeList = new ArrayList(2000);
+        while (running) {
+            boolean hasData = false;
+            for (Group target : consumeTargets) {
+                boolean consume = consume(target, consumeList);
+                hasData = hasData || consume;
+            }
+
+            if (!hasData) {
+                try {
+                    Thread.sleep(consumeCycle);
+                } catch (InterruptedException e) {
+                }
+            }
+        }
+
+        // consumer thread is going to stop
+        // consume the last time
+        for (Group target : consumeTargets) {
+            consume(target, consumeList);
+
+            target.consumer.onExit();
+        }
+    }
+
+    private boolean consume(Group target, List consumeList) {
+        for (int i = 0; i < target.channels.getChannelSize(); i++) {
+            QueueBuffer buffer = target.channels.getBuffer(i);
+            buffer.obtain(consumeList);
+        }
+
+        if (!consumeList.isEmpty()) {
+            try {
+                target.consumer.consume(consumeList);
+            } catch (Throwable t) {
+                target.consumer.onError(consumeList, t);
+            } finally {
+                consumeList.clear();
+            }
+            return true;
+        }
+        target.consumer.nothingToConsume();
+        return false;
+    }
+
+    /**
+     * Add a new target channels.
+     */
+    public void addNewTarget(Channels channels, IConsumer consumer) {
+        Group group = new Group(channels, consumer);
+        // Recreate the new list to avoid change list while the list is used in consuming.
+        ArrayList<Group> newList = new ArrayList<Group>();
+        for (Group target : consumeTargets) {
+            newList.add(target);
+        }
+        newList.add(group);
+        consumeTargets = newList;
+        size += channels.size();
+    }
+
+    public long size() {
+        return size;
+    }
+
+    void shutdown() {
+        running = false;
+    }
+
+    private static class Group {
+        private Channels channels;
+        private IConsumer consumer;
+
+        public Group(Channels channels, IConsumer consumer) {
+            this.channels = channels;
+            this.consumer = consumer;
+        }
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/IDataPartitioner.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/IDataPartitioner.java
new file mode 100644
index 0000000..4b6c576
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/IDataPartitioner.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.partition;
+
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.BufferStrategy;
+
+public interface IDataPartitioner<T> {
+    int partition(int total, T data);
+
+    /**
+     * @return an integer represents how many times should retry when {@link BufferStrategy#IF_POSSIBLE}.
+     * <p>
+     * Less or equal 1, means not support retry.
+     */
+    int maxRetryCount();
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/ProducerThreadPartitioner.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/ProducerThreadPartitioner.java
new file mode 100644
index 0000000..b83fd1f
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/ProducerThreadPartitioner.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.partition;
+
+/**
+ * use threadid % total to partition
+ */
+public class ProducerThreadPartitioner<T> implements IDataPartitioner<T> {
+    public ProducerThreadPartitioner() {
+    }
+
+    @Override
+    public int partition(int total, T data) {
+        return (int) Thread.currentThread().getId() % total;
+    }
+
+    @Override
+    public int maxRetryCount() {
+        return 1;
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/SimpleRollingPartitioner.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/SimpleRollingPartitioner.java
new file mode 100644
index 0000000..e22f92f
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/SimpleRollingPartitioner.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.partition;
+
+/**
+ * use normal int to rolling.
+ */
+public class SimpleRollingPartitioner<T> implements IDataPartitioner<T> {
+    @SuppressWarnings("NonAtomicVolatileUpdate")
+    private volatile int i = 0;
+
+    @Override
+    public int partition(int total, T data) {
+        return Math.abs(i++ % total);
+    }
+
+    @Override
+    public int maxRetryCount() {
+        return 3;
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
new file mode 100644
index 0000000..ea12319
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.NameResolverRegistry;
+import io.grpc.internal.DnsNameResolverProvider;
+import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+import org.apache.skywalking.banyandb.v1.trace.TraceServiceGrpc;
+
+/**
+ * BanyanDBClient represents a client instance interacting with BanyanDB server. This is built on the top of BanyanDB v1
+ * gRPC APIs.
+ */
+@Slf4j
+public class BanyanDBClient implements Closeable {
+    /**
+     * The hostname of BanyanDB server.
+     */
+    private final String host;
+    /**
+     * The port of BanyanDB server.
+     */
+    private final int port;
+    /**
+     * The instance name.
+     */
+    private final String group;
+    /**
+     * Options for server connection.
+     */
+    private Options options;
+    /**
+     * Managed gRPC connection.
+     */
+    private volatile ManagedChannel managedChannel;
+    /**
+     * gRPC client stub
+     */
+    private volatile TraceServiceGrpc.TraceServiceStub traceServiceStub;
+    /**
+     * gRPC blocking stub.
+     */
+    private volatile TraceServiceGrpc.TraceServiceBlockingStub traceServiceBlockingStub;
+    /**
+     * The connection status.
+     */
+    private volatile boolean isConnected = false;
+    /**
+     * A lock to control the race condition in establishing and disconnecting network connection.
+     */
+    private volatile ReentrantLock connectionEstablishLock;
+
+    /**
+     * Create a BanyanDB client instance
+     *
+     * @param host  IP or domain name
+     * @param port  Server port
+     * @param group Database instance name
+     */
+    public BanyanDBClient(final String host, final int port, final String group) {
+        this(host, port, group, new Options());
+    }
+
+    /**
+     * Create a BanyanDB client instance with custom options
+     *
+     * @param host    IP or domain name
+     * @param port    Server port
+     * @param group   Database instance name
+     * @param options for database connection
+     */
+    public BanyanDBClient(final String host,
+                          final int port,
+                          final String group,
+                          final Options options) {
+        this.host = host;
+        this.port = port;
+        this.group = group;
+        this.options = options;
+        this.connectionEstablishLock = new ReentrantLock();
+
+        NameResolverRegistry.getDefaultRegistry().register(new DnsNameResolverProvider());
+    }
+
+    /**
+     * Connect to the server.
+     *
+     * @throws RuntimeException if server is not reachable.
+     */
+    public void connect() {
+        connectionEstablishLock.lock();
+        try {
+            if (!isConnected) {
+                final ManagedChannelBuilder<?> nettyChannelBuilder = NettyChannelBuilder.forAddress(host, port).usePlaintext();
+                nettyChannelBuilder.maxInboundMessageSize(options.getMaxInboundMessageSize());
+
+                managedChannel = nettyChannelBuilder.build();
+                traceServiceStub = TraceServiceGrpc.newStub(managedChannel);
+                traceServiceBlockingStub = TraceServiceGrpc.newBlockingStub(
+                        managedChannel);
+                isConnected = true;
+            }
+        } finally {
+            connectionEstablishLock.unlock();
+        }
+    }
+
+    /**
+     * Connect to the mock server.
+     * Created for testing purpose.
+     *
+     * @param channel the channel used for communication.
+     *                For tests, it is normally an in-process channel.
+     */
+    void connect(ManagedChannel channel) {
+        connectionEstablishLock.lock();
+        try {
+            if (!isConnected) {
+                traceServiceStub = TraceServiceGrpc.newStub(channel);
+                traceServiceBlockingStub = TraceServiceGrpc.newBlockingStub(
+                        channel);
+                isConnected = true;
+            }
+        } finally {
+            connectionEstablishLock.unlock();
+        }
+    }
+
+    /**
+     * Create a build process for trace write.
+     *
+     * @param maxBulkSize   the max bulk size for the flush operation
+     * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger
+     *                      automatically. Unit is second
+     * @param concurrency   the number of concurrency would run for the flush max
+     * @return trace bulk write processor
+     */
+    public TraceBulkWriteProcessor buildTraceWriteProcessor(int maxBulkSize, int flushInterval, int concurrency) {
+        return new TraceBulkWriteProcessor(group, traceServiceStub, maxBulkSize, flushInterval, concurrency);
+    }
+
+    /**
+     * Query trace according to given conditions
+     *
+     * @param traceQuery condition for query
+     * @return hint traces.
+     */
+    public TraceQueryResponse queryTraces(TraceQuery traceQuery) {
+        final BanyandbTrace.QueryResponse response = traceServiceBlockingStub
+                .withDeadlineAfter(options.getDeadline(), TimeUnit.SECONDS)
+                .query(traceQuery.build(group));
+        return new TraceQueryResponse(response);
+    }
+
+    @Override
+    public void close() throws IOException {
+        connectionEstablishLock.lock();
+        try {
+            if (isConnected) {
+                this.managedChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+                isConnected = false;
+            }
+        } catch (InterruptedException interruptedException) {
+            log.warn("fail to wait for channel termination, shutdown now!", interruptedException);
+            this.managedChannel.shutdownNow();
+            isConnected = false;
+        } finally {
+            connectionEstablishLock.unlock();
+        }
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java
new file mode 100644
index 0000000..2fda151
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.skywalking.banyandb.commons.datacarrier.DataCarrier;
+import org.apache.skywalking.banyandb.commons.datacarrier.consumer.IConsumer;
+
+/**
+ * BulkWriteProcessor is a timeline and size dual driven processor.
+ * <p>
+ * It includes an internal queue and timer, and accept the data sequentially. With the given thresholds of time and
+ * size, it could activate flush to continue the process to the next step.
+ */
+public abstract class BulkWriteProcessor implements Closeable {
+    protected final int flushInterval;
+    protected DataCarrier buffer;
+
+    /**
+     * Create the processor.
+     *
+     * @param maxBulkSize   the max bulk size for the flush operation
+     * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger
+     *                      automatically. Unit is second.
+     * @param concurrency   the number of concurrency would run for the flush max.
+     */
+    protected BulkWriteProcessor(String processorName, int maxBulkSize, int flushInterval, int concurrency) {
+        this.flushInterval = flushInterval;
+        this.buffer = new DataCarrier(processorName, maxBulkSize, concurrency);
+        Properties properties = new Properties();
+        properties.put("maxBulkSize", maxBulkSize);
+        properties.put("flushInterval", flushInterval);
+        properties.put("BulkWriteProcessor", this);
+        buffer.consume(QueueWatcher.class, concurrency, 20, properties);
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.buffer.shutdownConsumers();
+    }
+
+    /**
+     * The internal queue consumer for build process.
+     */
+    public static class QueueWatcher implements IConsumer {
+        private long lastFlushTimestamp;
+        private int maxBulkSize;
+        private int flushInterval;
+        private List cachedData;
+        private BulkWriteProcessor bulkWriteProcessor;
+
+        public QueueWatcher() {
+        }
+
+        @Override
+        public void init(Properties properties) {
+            lastFlushTimestamp = System.currentTimeMillis();
+            maxBulkSize = (Integer) properties.get("maxBulkSize");
+            flushInterval = (Integer) properties.get("flushInterval") * 1000;
+            cachedData = new ArrayList(maxBulkSize);
+            bulkWriteProcessor = (BulkWriteProcessor) properties.get("BulkWriteProcessor");
+        }
+
+        @Override
+        public void consume(final List data) {
+            if (data.size() >= maxBulkSize) {
+                // The data#size actually wouldn't over the maxBulkSize due to the DataCarrier channel's max size.
+                // This is just to preventing unexpected case and avoid confusion about dropping into else section.
+                bulkWriteProcessor.flush(data);
+                lastFlushTimestamp = System.currentTimeMillis();
+            } else {
+                data.forEach(element -> {
+                    cachedData.add(element);
+                    if (cachedData.size() >= maxBulkSize) {
+                        // Flush and re-init.
+                        bulkWriteProcessor.flush(cachedData);
+                        cachedData = new ArrayList(maxBulkSize);
+                        lastFlushTimestamp = System.currentTimeMillis();
+                    }
+                });
+            }
+        }
+
+        @Override
+        public void onError(final List data, final Throwable t) {
+
+        }
+
+        @Override
+        public void onExit() {
+
+        }
+
+        @Override
+        public void nothingToConsume() {
+            if (System.currentTimeMillis() - lastFlushTimestamp > flushInterval) {
+                bulkWriteProcessor.flush(cachedData);
+                cachedData = new ArrayList(maxBulkSize);
+                lastFlushTimestamp = System.currentTimeMillis();
+            }
+        }
+    }
+
+    /**
+     * @param data to be flush.
+     */
+    protected abstract void flush(List data);
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/Field.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/Field.java
new file mode 100644
index 0000000..cf2c04b
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/Field.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import java.util.List;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+
+import static com.google.protobuf.NullValue.NULL_VALUE;
+
+/**
+ * Field represents a value of column/field in the write-op or response.
+ */
+@EqualsAndHashCode
+public abstract class Field<T> {
+    @Getter
+    protected final T value;
+
+    protected Field(T value) {
+        this.value = value;
+    }
+
+    /**
+     * NullField is a value which can be converted to {@link com.google.protobuf.NullValue}.
+     * Users should use the singleton instead of create a new instance everytime.
+     */
+    public static class NullField extends Field<Object> implements SerializableField {
+        private static final NullField INSTANCE = new NullField();
+
+        private NullField() {
+            super(null);
+        }
+
+        @Override
+        public Banyandb.Field toField() {
+            return Banyandb.Field.newBuilder().setNull(NULL_VALUE).build();
+        }
+    }
+
+    /**
+     * The value of a String type field.
+     */
+    public static class StringField extends Field<String> implements SerializableField {
+        private StringField(String value) {
+            super(value);
+        }
+
+        @Override
+        public Banyandb.Field toField() {
+            return Banyandb.Field.newBuilder().setStr(Banyandb.Str.newBuilder().setValue(value)).build();
+        }
+    }
+
+    /**
+     * The value of a String array type field.
+     */
+    public static class StringArrayField extends Field<List<String>> implements SerializableField {
+        private StringArrayField(List<String> value) {
+            super(value);
+        }
+
+        @Override
+        public Banyandb.Field toField() {
+            return Banyandb.Field.newBuilder().setStrArray(Banyandb.StrArray.newBuilder().addAllValue(value)).build();
+        }
+    }
+
+    /**
+     * The value of an int64(Long) type field.
+     */
+    public static class LongField extends Field<Long> implements SerializableField {
+        private LongField(Long value) {
+            super(value);
+        }
+
+        @Override
+        public Banyandb.Field toField() {
+            return Banyandb.Field.newBuilder().setInt(Banyandb.Int.newBuilder().setValue(value)).build();
+        }
+    }
+
+    /**
+     * The value of an int64(Long) array type field.
+     */
+    public static class LongArrayField extends Field<List<Long>> implements SerializableField {
+        private LongArrayField(List<Long> value) {
+            super(value);
+        }
+
+        @Override
+        public Banyandb.Field toField() {
+            return Banyandb.Field.newBuilder().setIntArray(Banyandb.IntArray.newBuilder().addAllValue(value)).build();
+        }
+    }
+
+    /**
+     * Construct a string field
+     *
+     * @param val payload
+     * @return Anonymous field with String payload
+     */
+    public static SerializableField stringField(String val) {
+        return new StringField(val);
+    }
+
+    /**
+     * Construct a numeric field
+     *
+     * @param val payload
+     * @return Anonymous field with numeric payload
+     */
+    public static SerializableField longField(long val) {
+        return new LongField(val);
+    }
+
+    /**
+     * Construct a string array field
+     *
+     * @param val payload
+     * @return Anonymous field with string array payload
+     */
+    public static SerializableField stringArrayField(List<String> val) {
+        return new StringArrayField(val);
+    }
+
+    /**
+     * Construct a long array field
+     *
+     * @param val payload
+     * @return Anonymous field with numeric array payload
+     */
+    public static SerializableField longArrayField(List<Long> val) {
+        return new LongArrayField(val);
+    }
+
+    public static SerializableField nullField() {
+        return NullField.INSTANCE;
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/FieldAndValue.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/FieldAndValue.java
new file mode 100644
index 0000000..5182544
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/FieldAndValue.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import java.util.List;
+
+import lombok.EqualsAndHashCode;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+
+/**
+ * FieldAndValue represents a value of column in the response
+ */
+@EqualsAndHashCode(callSuper = true)
+public abstract class FieldAndValue<T> extends Field<T> {
+    protected final String fieldName;
+
+    protected FieldAndValue(String fieldName, T value) {
+        super(value);
+        this.fieldName = fieldName;
+    }
+
+    /**
+     * @return field name
+     */
+    public String getFieldName() {
+        return this.fieldName;
+    }
+
+    /**
+     * @return true if value is null;
+     */
+    public boolean isNull() {
+        return this.value == null;
+    }
+
+    static FieldAndValue<?> build(Banyandb.TypedPair typedPair) {
+        if (typedPair.hasNullPair()) {
+            switch (typedPair.getNullPair().getType()) {
+                case FIELD_TYPE_INT:
+                    return new LongFieldPair(typedPair.getKey(), null);
+                case FIELD_TYPE_INT_ARRAY:
+                    return new LongArrayFieldPair(typedPair.getKey(), null);
+                case FIELD_TYPE_STRING:
+                    return new StringFieldPair(typedPair.getKey(), null);
+                case FIELD_TYPE_STRING_ARRAY:
+                    return new StringArrayFieldPair(typedPair.getKey(), null);
+                default:
+                    throw new IllegalArgumentException("Unrecognized NullType, " + typedPair.getNullPair().getType());
+            }
+        } else if (typedPair.hasIntPair()) {
+            return new LongFieldPair(typedPair.getKey(), typedPair.getIntPair().getValue());
+        } else if (typedPair.hasStrPair()) {
+            return new StringFieldPair(typedPair.getKey(), typedPair.getStrPair().getValue());
+        } else if (typedPair.hasIntArrayPair()) {
+            return new LongArrayFieldPair(typedPair.getKey(), typedPair.getIntArrayPair().getValueList());
+        } else if (typedPair.hasStrArrayPair()) {
+            return new StringArrayFieldPair(typedPair.getKey(), typedPair.getStrArrayPair().getValueList());
+        }
+        throw new IllegalArgumentException("Unrecognized TypedPair, " + typedPair);
+    }
+
+    public static class StringFieldPair extends FieldAndValue<String> {
+        StringFieldPair(final String fieldName, final String value) {
+            super(fieldName, value);
+        }
+    }
+
+    public static class StringArrayFieldPair extends FieldAndValue<List<String>> {
+        StringArrayFieldPair(final String fieldName, final List<String> value) {
+            super(fieldName, value);
+        }
+    }
+
+    public static class LongFieldPair extends FieldAndValue<Long> {
+        LongFieldPair(final String fieldName, final Long value) {
+            super(fieldName, value);
+        }
+    }
+
+    public static class LongArrayFieldPair extends FieldAndValue<List<Long>> {
+        LongArrayFieldPair(final String fieldName, final List<Long> value) {
+            super(fieldName, value);
+        }
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
new file mode 100644
index 0000000..89b8038
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * Client connection options.
+ */
+@Setter
+@Getter(AccessLevel.PACKAGE)
+public class Options {
+    /**
+     * Max inbound message size
+     */
+    private int maxInboundMessageSize = 1024 * 1024 * 50;
+    /**
+     * Threshold of gRPC blocking query, unit is second
+     */
+    private int deadline = 30;
+
+    Options() {
+    }
+
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java
new file mode 100644
index 0000000..78f714f
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import org.apache.skywalking.banyandb.v1.Banyandb;
+
+import java.util.List;
+
+/**
+ * PairQuery represents a query condition, including field name, operator, and value(s);
+ */
+public abstract class PairQueryCondition<T> extends FieldAndValue<T> {
+    protected final Banyandb.PairQuery.BinaryOp op;
+
+    private PairQueryCondition(String fieldName, Banyandb.PairQuery.BinaryOp op, T value) {
+        super(fieldName, value);
+        this.op = op;
+    }
+
+    Banyandb.PairQuery build() {
+        return Banyandb.PairQuery.newBuilder()
+                .setOp(this.op)
+                .setCondition(buildTypedPair()).build();
+    }
+
+    /**
+     * The various implementations should build different TypedPair
+     *
+     * @return typedPair to be included
+     */
+    protected abstract Banyandb.TypedPair buildTypedPair();
+
+    /**
+     * LongQueryCondition represents `Field(Long) $op value` condition.
+     */
+    public static class LongQueryCondition extends PairQueryCondition<Long> {
+        private LongQueryCondition(String fieldName, Banyandb.PairQuery.BinaryOp op, Long value) {
+            super(fieldName, op, value);
+        }
+
+        @Override
+        protected Banyandb.TypedPair buildTypedPair() {
+            return Banyandb.TypedPair.newBuilder()
+                    .setKey(fieldName)
+                    .setIntPair(Banyandb.Int.newBuilder()
+                            .setValue(value)).build();
+        }
+
+        /**
+         * Build a query condition for {@link Long} type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_EQ} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `Long == value`
+         */
+        public static PairQueryCondition<Long> eq(String fieldName, Long val) {
+            return new LongQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_EQ, val);
+        }
+
+        /**
+         * Build a query condition for {@link Long} type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_NE} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `Long != value`
+         */
+        public static PairQueryCondition<Long> ne(String fieldName, Long val) {
+            return new LongQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_NE, val);
+        }
+
+        /**
+         * Build a query condition for {@link Long} type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_GT} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `Long &gt; value`
+         */
+        public static PairQueryCondition<Long> gt(String fieldName, Long val) {
+            return new LongQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_GT, val);
+        }
+
+        /**
+         * Build a query condition for {@link Long} type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_GE} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `Long &ge; value`
+         */
+        public static PairQueryCondition<Long> ge(String fieldName, Long val) {
+            return new LongQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_GE, val);
+        }
+
+        /**
+         * Build a query condition for {@link Long} type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_LT} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `Long &lt; value`
+         */
+        public static PairQueryCondition<Long> lt(String fieldName, Long val) {
+            return new LongQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_LT, val);
+        }
+
+        /**
+         * Build a query condition for {@link Long} type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_LE} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `Long &le; value`
+         */
+        public static PairQueryCondition<Long> le(String fieldName, Long val) {
+            return new LongQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_LE, val);
+        }
+    }
+
+    /**
+     * StringQueryCondition represents `Field(String) $op value` condition.
+     */
+    public static class StringQueryCondition extends PairQueryCondition<String> {
+        private StringQueryCondition(String fieldName, Banyandb.PairQuery.BinaryOp op, String value) {
+            super(fieldName, op, value);
+        }
+
+        @Override
+        protected Banyandb.TypedPair buildTypedPair() {
+            return Banyandb.TypedPair.newBuilder()
+                    .setKey(fieldName)
+                    .setStrPair(Banyandb.Str.newBuilder().setValue(value)).build();
+        }
+
+        /**
+         * Build a query condition for {@link String} type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_EQ} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `String == value`
+         */
+        public static PairQueryCondition<String> eq(String fieldName, String val) {
+            return new StringQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_EQ, val);
+        }
+
+        /**
+         * Build a query condition for {@link String} type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_NE} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `String != value`
+         */
+        public static PairQueryCondition<String> ne(String fieldName, String val) {
+            return new StringQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_NE, val);
+        }
+    }
+
+    /**
+     * StringArrayQueryCondition represents `Field(List of String) $op value` condition.
+     */
+    public static class StringArrayQueryCondition extends PairQueryCondition<List<String>> {
+        private StringArrayQueryCondition(String fieldName, Banyandb.PairQuery.BinaryOp op, List<String> value) {
+            super(fieldName, op, value);
+        }
+
+        @Override
+        protected Banyandb.TypedPair buildTypedPair() {
+            return Banyandb.TypedPair.newBuilder()
+                    .setKey(fieldName)
+                    .setStrArrayPair(Banyandb.StrArray.newBuilder()
+                            .addAllValue(value)).build();
+        }
+
+        /**
+         * Build a query condition for {@link List} of {@link String} as the type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_EQ} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `[String] == values`
+         */
+        public static PairQueryCondition<List<String>> eq(String fieldName, List<String> val) {
+            return new StringArrayQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_EQ, val);
+        }
+
+        /**
+         * Build a query condition for {@link List} of {@link String} as the type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_NE} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `[String] != values`
+         */
+        public static PairQueryCondition<List<String>> ne(String fieldName, List<String> val) {
+            return new StringArrayQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_NE, val);
+        }
+
+        /**
+         * Build a query condition for {@link List} of {@link String} as the type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_HAVING} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `[String] having values`
+         */
+        public static PairQueryCondition<List<String>> having(String fieldName, List<String> val) {
+            return new StringArrayQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_HAVING, val);
+        }
+
+        /**
+         * Build a query condition for {@link List} of {@link String} as the type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_NOT_HAVING} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `[String] not having values`
+         */
+        public static PairQueryCondition<List<String>> notHaving(String fieldName, List<String> val) {
+            return new StringArrayQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_NOT_HAVING, val);
+        }
+    }
+
+    /**
+     * LongArrayQueryCondition represents `Field(List of Long) $op value` condition.
+     */
+    public static class LongArrayQueryCondition extends PairQueryCondition<List<Long>> {
+        private LongArrayQueryCondition(String fieldName, Banyandb.PairQuery.BinaryOp op, List<Long> value) {
+            super(fieldName, op, value);
+        }
+
+        @Override
+        protected Banyandb.TypedPair buildTypedPair() {
+            return Banyandb.TypedPair.newBuilder()
+                    .setKey(fieldName)
+                    .setIntArrayPair(Banyandb.IntArray.newBuilder()
+                            .addAllValue(value)).build();
+        }
+
+        /**
+         * Build a query condition for {@link List} of {@link Long} as the type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_EQ} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `[Long] == value`
+         */
+        public static PairQueryCondition<List<Long>> eq(String fieldName, List<Long> val) {
+            return new LongArrayQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_EQ, val);
+        }
+
+        /**
+         * Build a query condition for {@link List} of {@link Long} as the type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_NE} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `[Long] != value`
+         */
+        public static PairQueryCondition<List<Long>> ne(String fieldName, List<Long> val) {
+            return new LongArrayQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_NE, val);
+        }
+
+        /**
+         * Build a query condition for {@link List} of {@link Long} as the type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_HAVING} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `[Long] having values`
+         */
+        public static PairQueryCondition<List<Long>> having(String fieldName, List<Long> val) {
+            return new LongArrayQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_HAVING, val);
+        }
+
+        /**
+         * Build a query condition for {@link List} of {@link Long} as the type
+         * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_NOT_HAVING} as the relation
+         *
+         * @param fieldName name of the field
+         * @param val       value of the field
+         * @return a query that `[Long] not having values`
+         */
+        public static PairQueryCondition<List<Long>> notHaving(String fieldName, List<Long> val) {
+            return new LongArrayQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_NOT_HAVING, val);
+        }
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/RowEntity.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/RowEntity.java
new file mode 100644
index 0000000..6b9f7ad
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/RowEntity.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import java.util.ArrayList;
+import java.util.List;
+import lombok.Getter;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+
+/**
+ * RowEntity represents an entity of BanyanDB entity.
+ */
+@Getter
+public class RowEntity {
+    /**
+     * identity of the entity.
+     * For a trace entity, it is the spanID of a Span or the segmentId of a segment in Skywalking.
+     */
+    private final String id;
+
+    /**
+     * timestamp of the entity in the timeunit of milliseconds.
+     */
+    private final long timestamp;
+
+    /**
+     * binary part of the entity
+     */
+    private final byte[] binary;
+
+    /**
+     * fields are indexed-fields that are searchable in BanyanBD
+     */
+    private final List<FieldAndValue<?>> fields;
+
+    RowEntity(BanyandbTrace.Entity entity) {
+        id = entity.getEntityId();
+        timestamp = entity.getTimestamp().getSeconds() * 1000 + entity.getTimestamp().getNanos() / 1_000_000;
+        binary = entity.getDataBinary().toByteArray();
+        fields = new ArrayList<>(entity.getFieldsCount());
+        entity.getFieldsList().forEach(field -> fields.add(FieldAndValue.build(field)));
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/SerializableField.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/SerializableField.java
new file mode 100644
index 0000000..2deef2d
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/SerializableField.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import org.apache.skywalking.banyandb.v1.Banyandb;
+
+/**
+ * An interface that represents an object which can be converted to the protobuf representation
+ * of BanyanDB.Field. BanyanDB.Field is used for writing entities to the database.
+ */
+public interface SerializableField {
+    Banyandb.Field toField();
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/TimestampRange.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/TimestampRange.java
new file mode 100644
index 0000000..e1f1f14
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/TimestampRange.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import com.google.protobuf.Timestamp;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+
+@RequiredArgsConstructor
+@Getter(AccessLevel.PROTECTED)
+public class TimestampRange {
+    /**
+     * start timestamp in timeunit of milliseconds. inclusive.
+     */
+    private final long begin;
+
+    /**
+     * end timestamp in timeunit of milliseconds. inclusive.
+     */
+    private final long end;
+
+    /**
+     * @return TimeRange accordingly.
+     */
+    Banyandb.TimeRange build() {
+        final Banyandb.TimeRange.Builder builder = Banyandb.TimeRange.newBuilder();
+        builder.setBegin(Timestamp.newBuilder()
+                                  .setSeconds(begin / 1000)
+                                  .setNanos((int) (begin % 1000 * 1_000_000)));
+        builder.setEnd(Timestamp.newBuilder()
+                                  .setSeconds(end / 1000)
+                                  .setNanos((int) (end % 1000 * 1_000_000)));
+        return builder.build();
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java
new file mode 100644
index 0000000..5bab418
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import io.grpc.stub.StreamObserver;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+import org.apache.skywalking.banyandb.v1.trace.TraceServiceGrpc;
+
+/**
+ * TraceWriteProcessor works for trace flush.
+ */
+@Slf4j
+public class TraceBulkWriteProcessor extends BulkWriteProcessor {
+    /**
+     * The BanyanDB instance name.
+     */
+    private final String group;
+    private TraceServiceGrpc.TraceServiceStub traceServiceStub;
+
+    /**
+     * Create the processor.
+     *
+     * @param traceServiceStub stub for gRPC call.
+     * @param maxBulkSize      the max bulk size for the flush operation
+     * @param flushInterval    if given maxBulkSize is not reached in this period, the flush would be trigger
+     *                         automatically. Unit is second.
+     * @param concurrency      the number of concurrency would run for the flush max.
+     */
+    protected TraceBulkWriteProcessor(final String group,
+                                      final TraceServiceGrpc.TraceServiceStub traceServiceStub,
+                                      final int maxBulkSize,
+                                      final int flushInterval,
+                                      final int concurrency) {
+        super("TraceBulkWriteProcessor", maxBulkSize, flushInterval, concurrency);
+        this.group = group;
+        this.traceServiceStub = traceServiceStub;
+    }
+
+    /**
+     * Add the trace to the bulk processor.
+     *
+     * @param traceWrite to add.
+     */
+    public void add(TraceWrite traceWrite) {
+        this.buffer.produce(traceWrite);
+    }
+
+    @Override
+    protected void flush(final List data) {
+        final StreamObserver<BanyandbTrace.WriteRequest> writeRequestStreamObserver
+            = traceServiceStub.withDeadlineAfter(
+                                  flushInterval, TimeUnit.SECONDS)
+                              .write(
+                                  new StreamObserver<BanyandbTrace.WriteResponse>() {
+                                      @Override
+                                      public void onNext(
+                                          BanyandbTrace.WriteResponse writeResponse) {
+                                      }
+
+                                      @Override
+                                      public void onError(
+                                          Throwable throwable) {
+                                          log.error(
+                                              "Error occurs in flushing traces.",
+                                              throwable
+                                          );
+                                      }
+
+                                      @Override
+                                      public void onCompleted() {
+                                      }
+                                  });
+        try {
+            data.forEach(write -> {
+                final TraceWrite traceWrite = (TraceWrite) write;
+                BanyandbTrace.WriteRequest request = traceWrite.build(group);
+                writeRequestStreamObserver.onNext(request);
+            });
+        } finally {
+            writeRequestStreamObserver.onCompleted();
+        }
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQuery.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQuery.java
new file mode 100644
index 0000000..850ddda
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQuery.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+
+/**
+ * TraceQuery is the high-level query API for the trace model.
+ */
+@Setter
+public class TraceQuery {
+    /**
+     * Owner name current entity
+     */
+    private final String name;
+    /**
+     * The time range for query.
+     */
+    private final TimestampRange timestampRange;
+    /**
+     * The projections of query result. These should have defined in the schema.
+     */
+    private final List<String> projections;
+    /**
+     * Query conditions.
+     */
+    private final List<PairQueryCondition<?>> conditions;
+    /**
+     * The starting row id of the query. Default value is 0.
+     */
+    private int offset;
+    /**
+     * The limit size of the query. Default value is 20.
+     */
+    private int limit;
+    /**
+     * One order condition is supported and optional.
+     */
+    private OrderBy orderBy;
+    /**
+     * Whether to fetch data_binary for the query
+     */
+    private boolean dataBinary;
+
+    public TraceQuery(final String name, final TimestampRange timestampRange, final List<String> projections) {
+        this.name = name;
+        this.timestampRange = timestampRange;
+        this.projections = projections;
+        this.conditions = new ArrayList<>(10);
+        this.offset = 0;
+        this.limit = 20;
+        this.dataBinary = false;
+    }
+
+    public TraceQuery(final String name, final List<String> projections) {
+        this(name, null, projections);
+    }
+
+    /**
+     * Fluent API for appending query condition
+     *
+     * @param condition the query condition to be appended
+     */
+    public TraceQuery appendCondition(PairQueryCondition<?> condition) {
+        this.conditions.add(condition);
+        return this;
+    }
+
+    /**
+     * @param group The instance name.
+     * @return QueryRequest for gRPC level query.
+     */
+    BanyandbTrace.QueryRequest build(String group) {
+        final BanyandbTrace.QueryRequest.Builder builder = BanyandbTrace.QueryRequest.newBuilder();
+        builder.setMetadata(Banyandb.Metadata.newBuilder()
+                .setGroup(group)
+                .setName(name)
+                .build());
+        if (timestampRange != null) {
+            builder.setTimeRange(timestampRange.build());
+        }
+        builder.setProjection(Banyandb.Projection.newBuilder().setDataBinary(this.dataBinary).addAllKeyNames(projections).build());
+        conditions.forEach(pairQueryCondition -> builder.addFields(pairQueryCondition.build()));
+        builder.setOffset(offset);
+        builder.setLimit(limit);
+        if (orderBy != null) {
+            builder.setOrderBy(orderBy.build());
+        }
+        return builder.build();
+    }
+
+    @RequiredArgsConstructor
+    public static class OrderBy {
+        /**
+         * The field name for ordering.
+         */
+        private final String fieldName;
+        /**
+         * The type of ordering.
+         */
+        private final Type type;
+
+        private Banyandb.QueryOrder build() {
+            final Banyandb.QueryOrder.Builder builder = Banyandb.QueryOrder.newBuilder();
+            builder.setKeyName(fieldName);
+            builder.setSort(
+                    Type.DESC.equals(type) ? Banyandb.QueryOrder.Sort.SORT_DESC : Banyandb.QueryOrder.Sort.SORT_ASC);
+            return builder.build();
+        }
+
+        public enum Type {
+            ASC, DESC
+        }
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQueryResponse.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQueryResponse.java
new file mode 100644
index 0000000..49d0881
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQueryResponse.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import java.util.ArrayList;
+import java.util.List;
+import lombok.Getter;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+
+/**
+ * TraceQueryResponse represents the trace query result.
+ */
+public class TraceQueryResponse {
+    @Getter
+    private final List<RowEntity> entities;
+
+    TraceQueryResponse(BanyandbTrace.QueryResponse response) {
+        final List<BanyandbTrace.Entity> entitiesList = response.getEntitiesList();
+        entities = new ArrayList<>(entitiesList.size());
+        entitiesList.forEach(entity -> entities.add(new RowEntity(entity)));
+    }
+
+    /**
+     * @return size of the response set.
+     */
+    public int size() {
+        return entities.size();
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java
new file mode 100644
index 0000000..0e4008a
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Timestamp;
+import java.util.List;
+import lombok.AccessLevel;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Singular;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+
+/**
+ * TraceWrite represents a write operation, including necessary fields, for {@link
+ * BanyanDBClient#buildTraceWriteProcessor}.
+ */
+@Builder
+@Getter(AccessLevel.PROTECTED)
+public class TraceWrite {
+    /**
+     * Owner name current entity
+     */
+    private final String name;
+    /**
+     * ID of current entity
+     */
+    private final String entityId;
+    /**
+     * Timestamp represents the time of current trace or trace segment
+     * in the timeunit of milliseconds.
+     */
+    private final long timestamp;
+    /**
+     * The binary raw data represents the whole object of current trace or trace segment. It could be organized by
+     * different serialization formats. Natively, SkyWalking uses protobuf, but it is not required. The BanyanDB server
+     * wouldn't deserialize this. So, no format requirement.
+     */
+    private final byte[] binary;
+    /**
+     * The values of fields, which are defined by the schema. In the bulk write process, BanyanDB client doesn't require
+     * field names anymore.
+     */
+    @Singular
+    private final List<SerializableField> fields;
+
+    /**
+     * @param group of the BanyanDB client connected.
+     * @return {@link BanyandbTrace.WriteRequest} for the bulk process.
+     */
+    BanyandbTrace.WriteRequest build(String group) {
+        final BanyandbTrace.WriteRequest.Builder builder = BanyandbTrace.WriteRequest.newBuilder();
+        builder.setMetadata(Banyandb.Metadata.newBuilder().setGroup(group).setName(name).build());
+        final BanyandbTrace.EntityValue.Builder entityBuilder = BanyandbTrace.EntityValue.newBuilder();
+        entityBuilder.setEntityId(entityId);
+        entityBuilder.setTimestamp(Timestamp.newBuilder()
+                                            .setSeconds(timestamp / 1000)
+                                            .setNanos((int) (timestamp % 1000 * 1_000_000)));
+        entityBuilder.setDataBinary(ByteString.copyFrom(binary));
+        fields.forEach(writeField -> entityBuilder.addFields(writeField.toField()));
+        builder.setEntity(entityBuilder.build());
+        return builder.build();
+    }
+}
diff --git a/src/main/proto/banyandb/v1/banyandb-trace.proto b/src/main/proto/banyandb/v1/banyandb-trace.proto
new file mode 100644
index 0000000..44d98c1
--- /dev/null
+++ b/src/main/proto/banyandb/v1/banyandb-trace.proto
@@ -0,0 +1,107 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto3";
+
+option java_package = "org.apache.skywalking.banyandb.v1.trace";
+
+package banyandb.trace.v1;
+
+import "google/protobuf/timestamp.proto";
+import "google/protobuf/struct.proto";
+import "banyandb/v1/banyandb.proto";
+
+service TraceService {
+  rpc Query(banyandb.trace.v1.QueryRequest) returns (banyandb.trace.v1.QueryResponse);
+  rpc Write(stream banyandb.trace.v1.WriteRequest) returns (stream banyandb.trace.v1.WriteResponse);
+}
+
+// QueryRequest is the request contract for query.
+message QueryRequest {
+  // metadata is required
+  banyandb.v1.Metadata metadata = 1;
+  // time_range is a range query with begin/end time of entities in the timeunit of nanoseconds.
+  // In the context of Trace, it represents the range of the `startTime` for spans/segments,
+  // while in the context of Log, it means the range of the timestamp(s) for logs.
+  // it is always recommended to specify time range for performance reason
+  banyandb.v1.TimeRange time_range = 2;
+  // offset is used to support pagination, together with the following limit
+  uint32 offset = 3;
+  // limit is used to impose a boundary on the number of records being returned
+  uint32 limit = 4;
+  // order_by is given to specify the sort for a field. So far, only fields in the type of Integer are supported
+  banyandb.v1.QueryOrder order_by = 5;
+  // fields are indexed. Some typical fields are listed below,
+  // - trace_id: if given, it takes precedence over other fields and will be used to retrieve entities before other conditions are imposed
+  // - duration: typical for trace context
+  repeated banyandb.v1.PairQuery fields = 6;
+  // projection can be used to select the key names of the entities in the response
+  banyandb.v1.Projection projection = 7;
+}
+
+// QueryResponse is the response for a query to the Query module.
+message QueryResponse {
+  // entities are the actual data returned
+  repeated Entity entities = 1;
+}
+
+// Entity represents
+// (Trace context) a Span defined in Google Dapper paper or equivalently a Segment in Skywalking.
+// (Log context) a log
+message Entity {
+  // entity_id could be span_id of a Span or segment_id of a Segment in the context of Trace
+  string entity_id = 1;
+  // timestamp represents
+  // 1) either the start time of a Span/Segment,
+  // 2) or the timestamp of a log
+  google.protobuf.Timestamp timestamp = 2;
+  // data_binary contains all un-indexed Tags and other key-value pairs
+  bytes data_binary = 3;
+  // fields contains all indexed Field. Some typical names,
+  // - trace_id
+  // - duration
+  // - service_name
+  // - service_instance_id
+  // - end_time_nanoseconds
+  repeated banyandb.v1.TypedPair fields = 4;
+}
+
+
+message WriteRequest {
+  // the metadata is only required in the first write.
+  banyandb.v1.Metadata metadata = 1;
+  // the entity is required.
+  EntityValue entity = 2;
+}
+
+message WriteResponse {}
+
+message EntityValue {
+  // entity_id could be span_id of a Span or segment_id of a Segment in the context of Trace
+  string entity_id = 1;
+  // timestamp_nanoseconds is in the timeunit of nanoseconds. It represents
+  // 1) either the start time of a Span/Segment,
+  // 2) or the timestamp of a log
+  google.protobuf.Timestamp timestamp = 2;
+  // binary representation of segments, including tags, spans...
+  bytes data_binary = 3;
+  // support all of indexed fields in the fields.
+  // Pair only has value, as the value of PairValue match with the key
+  // by the index rules and index rule bindings of Metadata group.
+  // indexed fields of multiple entities are compression in the fields.
+  repeated banyandb.v1.Field fields = 4;
+}
\ No newline at end of file
diff --git a/src/main/proto/banyandb/v1/banyandb.proto b/src/main/proto/banyandb/v1/banyandb.proto
new file mode 100644
index 0000000..e86186e
--- /dev/null
+++ b/src/main/proto/banyandb/v1/banyandb.proto
@@ -0,0 +1,136 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto3";
+
+option java_package = "org.apache.skywalking.banyandb.v1";
+
+package banyandb.v1;
+
+import "google/protobuf/timestamp.proto";
+import "google/protobuf/struct.proto";
+
+// Metadata is for multi-tenant, multi-model use
+message Metadata {
+  // group contains a set of options, like retention policy, max
+  string group = 1;
+  // name of the entity
+  string name = 2;
+}
+
+enum FieldType {
+  FIELD_TYPE_UNSPECIFIED = 0;
+  FIELD_TYPE_STRING = 1;
+  FIELD_TYPE_INT = 2;
+  FIELD_TYPE_STRING_ARRAY = 3;
+  FIELD_TYPE_INT_ARRAY = 4;
+}
+
+// Pair is the building block of a record which is equivalent to a key-value pair.
+// In the context of Trace, it could be metadata of a trace such as service_name, service_instance, etc.
+// Besides, other fields/tags are organized in key-value pair in the underlying storage layer.
+// One should notice that the values can be a multi-value.
+message TypedPair {
+  string key = 1;
+  oneof typed {
+    NullWithType null_pair = 2;
+    Int int_pair = 3;
+    Str str_pair = 4;
+    IntArray int_array_pair = 5;
+    StrArray str_array_pair = 6;
+  }
+
+  message NullWithType {
+      FieldType type = 1;
+  }
+}
+
+// PairQuery consists of the query condition with a single binary operator to be imposed
+// For 1:1 BinaryOp, values in condition must be an array with length = 1,
+// while for 1:N BinaryOp, values can be an array with length >= 1.
+message PairQuery {
+  // BinaryOp specifies the operation imposed to the given query condition
+  // For EQ, NE, LT, GT, LE and GE, only one operand should be given, i.e. one-to-one relationship.
+  // HAVING and NOT_HAVING allow multi-value to be the operand such as array/vector, i.e. one-to-many relationship.
+  // For example, "keyA" contains "valueA" **and** "valueB"
+  enum BinaryOp {
+    BINARY_OP_UNSPECIFIED = 0;
+    BINARY_OP_EQ = 1;
+    BINARY_OP_NE = 2;
+    BINARY_OP_LT = 3;
+    BINARY_OP_GT = 4;
+    BINARY_OP_LE = 5;
+    BINARY_OP_GE = 6;
+    BINARY_OP_HAVING = 7;
+    BINARY_OP_NOT_HAVING = 8;
+  }
+  BinaryOp op = 1;
+  TypedPair condition = 2;
+}
+
+// QueryOrder means a Sort operation to be done for a given field.
+// The key_name refers to the key of a Pair.
+message QueryOrder {
+  string key_name = 1;
+  enum Sort {
+    SORT_UNSPECIFIED = 0;
+    SORT_DESC = 1;
+    SORT_ASC = 2;
+  }
+  Sort sort = 2;
+}
+
+// Projection is used to select the names of keys to be returned.
+message Projection {
+  // whether binary part is needed
+  bool data_binary = 1;
+  // The key_name refers to the key(s) of Pair(s).
+  repeated string key_names = 2;
+}
+
+// TimeRange is a range query for uint64,
+// the range here follows left-inclusive and right-exclusive rule, i.e. [begin, end) if both edges exist
+message TimeRange {
+  google.protobuf.Timestamp begin = 1;
+  google.protobuf.Timestamp end = 2;
+}
+
+message Str {
+  string value = 1;
+}
+
+message Int {
+  int64 value = 1;
+}
+
+message StrArray {
+  repeated string value = 1;
+}
+
+message IntArray {
+  repeated int64 value = 1;
+}
+
+message Field {
+  oneof value_type {
+    google.protobuf.NullValue null = 1;
+    Str str = 2;
+    StrArray str_array = 3;
+    Int int = 4;
+    IntArray int_array = 5;
+  }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientQueryTest.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientQueryTest.java
new file mode 100644
index 0000000..1667254
--- /dev/null
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientQueryTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Timestamp;
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcCleanupRule;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+import org.apache.skywalking.banyandb.v1.trace.TraceServiceGrpc;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatchers;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.AdditionalAnswers.delegatesTo;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.management.*")
+public class BanyanDBClientQueryTest {
+    @Rule
+    public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+
+    private final TraceServiceGrpc.TraceServiceImplBase serviceImpl =
+            mock(TraceServiceGrpc.TraceServiceImplBase.class, delegatesTo(
+                    new TraceServiceGrpc.TraceServiceImplBase() {
+                        @Override
+                        public void query(BanyandbTrace.QueryRequest request, StreamObserver<BanyandbTrace.QueryResponse> responseObserver) {
+                            responseObserver.onNext(BanyandbTrace.QueryResponse.newBuilder().build());
+                            responseObserver.onCompleted();
+                        }
+                    }));
+
+    private BanyanDBClient client;
+
+    @Before
+    public void setUp() throws IOException {
+        // Generate a unique in-process server name.
+        String serverName = InProcessServerBuilder.generateName();
+
+        // Create a server, add service, start, and register for automatic graceful shutdown.
+        Server server = InProcessServerBuilder
+                .forName(serverName).directExecutor().addService(serviceImpl).build();
+        grpcCleanup.register(server.start());
+
+        // Create a client channel and register for automatic graceful shutdown.
+        ManagedChannel channel = grpcCleanup.register(
+                InProcessChannelBuilder.forName(serverName).directExecutor().build());
+        client = new BanyanDBClient("127.0.0.1", server.getPort(), "default");
+
+        client.connect(channel);
+    }
+
+    @Test
+    public void testNonNull() {
+        Assert.assertNotNull(this.client);
+    }
+
+    @Test
+    public void testQuery_tableScan() {
+        ArgumentCaptor<BanyandbTrace.QueryRequest> requestCaptor = ArgumentCaptor.forClass(BanyandbTrace.QueryRequest.class);
+
+        Instant end = Instant.now();
+        Instant begin = end.minus(15, ChronoUnit.MINUTES);
+        TraceQuery query = new TraceQuery("sw",
+                new TimestampRange(begin.toEpochMilli(), end.toEpochMilli()),
+                Arrays.asList("state", "start_time", "duration", "trace_id"));
+        // search for all states
+        query.appendCondition(PairQueryCondition.LongQueryCondition.eq("state", 0L));
+        query.setOrderBy(new TraceQuery.OrderBy("duration", TraceQuery.OrderBy.Type.DESC));
+        client.queryTraces(query);
+
+        verify(serviceImpl).query(requestCaptor.capture(), ArgumentMatchers.any());
+
+        final BanyandbTrace.QueryRequest request = requestCaptor.getValue();
+        // assert metadata
+        Assert.assertEquals("sw", request.getMetadata().getName());
+        Assert.assertEquals("default", request.getMetadata().getGroup());
+        // assert timeRange, both seconds and the nanos
+        Assert.assertEquals(begin.toEpochMilli() / 1000, request.getTimeRange().getBegin().getSeconds());
+        Assert.assertEquals(TimeUnit.MILLISECONDS.toNanos(begin.toEpochMilli() % 1000), request.getTimeRange().getBegin().getNanos());
+        Assert.assertEquals(end.toEpochMilli() / 1000, request.getTimeRange().getEnd().getSeconds());
+        Assert.assertEquals(TimeUnit.MILLISECONDS.toNanos(end.toEpochMilli() % 1000), request.getTimeRange().getEnd().getNanos());
+        // assert fields, we only have state as a condition which should be state
+        Assert.assertEquals(1, request.getFieldsCount());
+        // assert orderBy, by default DESC
+        Assert.assertEquals(Banyandb.QueryOrder.Sort.SORT_DESC, request.getOrderBy().getSort());
+        Assert.assertEquals("duration", request.getOrderBy().getKeyName());
+        // assert state
+        Assert.assertEquals(Banyandb.PairQuery.BinaryOp.BINARY_OP_EQ, request.getFields(0).getOp());
+        Assert.assertEquals(0, request.getFields(0).getCondition().getIntPair().getValue());
+        // assert projections
+        assertCollectionEqual(Lists.newArrayList("duration", "state", "start_time", "trace_id"), request.getProjection().getKeyNamesList());
+    }
+
+    @Test
+    public void testQuery_indexScan() {
+        ArgumentCaptor<BanyandbTrace.QueryRequest> requestCaptor = ArgumentCaptor.forClass(BanyandbTrace.QueryRequest.class);
+        Instant begin = Instant.now().minus(5, ChronoUnit.MINUTES);
+        Instant end = Instant.now();
+        String serviceId = "service_id_b";
+        String serviceInstanceId = "service_id_b_1";
+        String endpointId = "/check_0";
+        long minDuration = 10;
+        long maxDuration = 100;
+
+        TraceQuery query = new TraceQuery("sw",
+                new TimestampRange(begin.toEpochMilli(), end.toEpochMilli()),
+                Arrays.asList("state", "start_time", "duration", "trace_id"));
+        // search for the successful states
+        query.appendCondition(PairQueryCondition.LongQueryCondition.eq("state", 1L))
+                .appendCondition(PairQueryCondition.StringQueryCondition.eq("service_id", serviceId))
+                .appendCondition(PairQueryCondition.StringQueryCondition.eq("service_instance_id", serviceInstanceId))
+                .appendCondition(PairQueryCondition.StringQueryCondition.eq("endpoint_id", endpointId))
+                .appendCondition(PairQueryCondition.LongQueryCondition.ge("duration", minDuration))
+                .appendCondition(PairQueryCondition.LongQueryCondition.le("duration", maxDuration))
+                .setOrderBy(new TraceQuery.OrderBy("start_time", TraceQuery.OrderBy.Type.ASC));
+
+        client.queryTraces(query);
+
+        verify(serviceImpl).query(requestCaptor.capture(), ArgumentMatchers.any());
+        final BanyandbTrace.QueryRequest request = requestCaptor.getValue();
+        // assert metadata
+        Assert.assertEquals("sw", request.getMetadata().getName());
+        Assert.assertEquals("default", request.getMetadata().getGroup());
+        // assert timeRange
+        Assert.assertEquals(begin.getEpochSecond(), request.getTimeRange().getBegin().getSeconds());
+        Assert.assertEquals(end.getEpochSecond(), request.getTimeRange().getEnd().getSeconds());
+        // assert fields, we only have state as a condition
+        Assert.assertEquals(6, request.getFieldsCount());
+        // assert orderBy, by default DESC
+        Assert.assertEquals(Banyandb.QueryOrder.Sort.SORT_ASC, request.getOrderBy().getSort());
+        Assert.assertEquals("start_time", request.getOrderBy().getKeyName());
+        // assert projections
+        assertCollectionEqual(Lists.newArrayList("duration", "state", "start_time", "trace_id"), request.getProjection().getKeyNamesList());
+        // assert fields
+        assertCollectionEqual(request.getFieldsList(), ImmutableList.of(
+                PairQueryCondition.LongQueryCondition.ge("duration", minDuration).build(), // 1 -> duration >= minDuration
+                PairQueryCondition.LongQueryCondition.le("duration", maxDuration).build(), // 2 -> duration <= maxDuration
+                PairQueryCondition.StringQueryCondition.eq("service_id", serviceId).build(), // 3 -> service_id
+                PairQueryCondition.StringQueryCondition.eq("service_instance_id", serviceInstanceId).build(), // 4 -> service_instance_id
+                PairQueryCondition.StringQueryCondition.eq("endpoint_id", endpointId).build(), // 5 -> endpoint_id
+                PairQueryCondition.LongQueryCondition.eq("state", 1L).build() // 7 -> state
+        ));
+    }
+
+    @Test
+    public void testQuery_traceIDFetch() {
+        ArgumentCaptor<BanyandbTrace.QueryRequest> requestCaptor = ArgumentCaptor.forClass(BanyandbTrace.QueryRequest.class);
+        String traceId = "1111.222.333";
+
+        TraceQuery query = new TraceQuery("sw", Arrays.asList("state", "start_time", "duration", "trace_id"));
+        query.appendCondition(PairQueryCondition.StringQueryCondition.eq("trace_id", traceId));
+
+        client.queryTraces(query);
+
+        verify(serviceImpl).query(requestCaptor.capture(), ArgumentMatchers.any());
+        final BanyandbTrace.QueryRequest request = requestCaptor.getValue();
+        // assert metadata
+        Assert.assertEquals("sw", request.getMetadata().getName());
+        Assert.assertEquals("default", request.getMetadata().getGroup());
+        Assert.assertEquals(1, request.getFieldsCount());
+        // assert fields
+        assertCollectionEqual(request.getFieldsList(), ImmutableList.of(
+                PairQueryCondition.StringQueryCondition.eq("trace_id", traceId).build()
+        ));
+    }
+
+    @Test
+    public void testQuery_responseConversion() {
+        final byte[] binaryData = new byte[]{13};
+        final String segmentId = "1231.dfd.123123ssf";
+        final String traceId = "trace_id-xxfff.111323";
+        final long duration = 200L;
+        final Instant now = Instant.now();
+        final BanyandbTrace.QueryResponse responseObj = BanyandbTrace.QueryResponse.newBuilder()
+                .addEntities(BanyandbTrace.Entity.newBuilder()
+                        .setDataBinary(ByteString.copyFrom(binaryData))
+                        .setEntityId(segmentId)
+                        .setTimestamp(Timestamp.newBuilder()
+                                .setSeconds(now.toEpochMilli() / 1000)
+                                .setNanos((int) TimeUnit.MILLISECONDS.toNanos(now.toEpochMilli() % 1000))
+                                .build())
+                        .addFields(Banyandb.TypedPair.newBuilder()
+                                .setKey("trace_id")
+                                .setStrPair(Banyandb.Str.newBuilder().setValue(traceId).build()).build())
+                        .addFields(Banyandb.TypedPair.newBuilder()
+                                .setKey("duration")
+                                .setIntPair(Banyandb.Int.newBuilder().setValue(duration).build()).build())
+                        .addFields(Banyandb.TypedPair.newBuilder()
+                                .setKey("mq.broker")
+                                .setNullPair(Banyandb.TypedPair.NullWithType.newBuilder().setType(Banyandb.FieldType.FIELD_TYPE_STRING).build()).build())
+                        .build())
+                .build();
+        TraceQueryResponse resp = new TraceQueryResponse(responseObj);
+        Assert.assertNotNull(resp);
+        Assert.assertEquals(1, resp.getEntities().size());
+        Assert.assertEquals(3, resp.getEntities().get(0).getFields().size());
+        Assert.assertEquals(3, resp.getEntities().get(0).getFields().size());
+        Assert.assertEquals(new FieldAndValue.StringFieldPair("trace_id", traceId), resp.getEntities().get(0).getFields().get(0));
+        Assert.assertEquals(new FieldAndValue.LongFieldPair("duration", duration), resp.getEntities().get(0).getFields().get(1));
+        Assert.assertEquals(new FieldAndValue.StringFieldPair("mq.broker", null), resp.getEntities().get(0).getFields().get(2));
+    }
+
+    static <T> void assertCollectionEqual(Collection<T> c1, Collection<T> c2) {
+        Assert.assertTrue(c1.size() == c2.size() && c1.containsAll(c2) && c2.containsAll(c1));
+    }
+}
diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientWriteTest.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientWriteTest.java
new file mode 100644
index 0000000..731f560
--- /dev/null
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientWriteTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcCleanupRule;
+import io.grpc.util.MutableHandlerRegistry;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+import org.apache.skywalking.banyandb.v1.trace.TraceServiceGrpc;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class BanyanDBClientWriteTest {
+    @Rule
+    public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+
+    private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
+
+    private BanyanDBClient client;
+    private TraceBulkWriteProcessor traceBulkWriteProcessor;
+
+    @Before
+    public void setUp() throws IOException {
+        String serverName = InProcessServerBuilder.generateName();
+
+        Server server = InProcessServerBuilder
+                .forName(serverName).fallbackHandlerRegistry(serviceRegistry).directExecutor().build();
+        grpcCleanup.register(server.start());
+
+        ManagedChannel channel = grpcCleanup.register(
+                InProcessChannelBuilder.forName(serverName).directExecutor().build());
+
+        client = new BanyanDBClient("127.0.0.1", server.getPort(), "default");
+        client.connect(channel);
+        traceBulkWriteProcessor = client.buildTraceWriteProcessor(1000, 1, 1);
+    }
+
+    @After
+    public void shutdown() throws IOException {
+        traceBulkWriteProcessor.close();
+    }
+
+    @Test
+    public void testWrite() throws Exception {
+        final CountDownLatch allRequestsDelivered = new CountDownLatch(1);
+        final List<BanyandbTrace.WriteRequest> writeRequestDelivered = new ArrayList<>();
+
+        // implement the fake service
+        final TraceServiceGrpc.TraceServiceImplBase serviceImpl =
+                new TraceServiceGrpc.TraceServiceImplBase() {
+                    @Override
+                    public StreamObserver<BanyandbTrace.WriteRequest> write(StreamObserver<BanyandbTrace.WriteResponse> responseObserver) {
+                        return new StreamObserver<BanyandbTrace.WriteRequest>() {
+                            @Override
+                            public void onNext(BanyandbTrace.WriteRequest value) {
+                                writeRequestDelivered.add(value);
+                                responseObserver.onNext(BanyandbTrace.WriteResponse.newBuilder().build());
+                            }
+
+                            @Override
+                            public void onError(Throwable t) {
+                            }
+
+                            @Override
+                            public void onCompleted() {
+                                responseObserver.onCompleted();
+                                allRequestsDelivered.countDown();
+                            }
+                        };
+                    }
+                };
+        serviceRegistry.addService(serviceImpl);
+
+        String segmentId = "1231.dfd.123123ssf";
+        String traceId = "trace_id-xxfff.111323";
+        String serviceId = "webapp_id";
+        String serviceInstanceId = "10.0.0.1_id";
+        String endpointId = "home_id";
+        int latency = 200;
+        int state = 1;
+        Instant now = Instant.now();
+        byte[] byteData = new byte[]{14};
+        String broker = "172.16.10.129:9092";
+        String topic = "topic_1";
+        String queue = "queue_2";
+        String httpStatusCode = "200";
+        String dbType = "SQL";
+        String dbInstance = "127.0.0.1:3306";
+
+        TraceWrite traceWrite = TraceWrite.builder()
+                .entityId(segmentId)
+                .binary(byteData)
+                .timestamp(now.toEpochMilli())
+                .name("sw")
+                .field(Field.stringField(traceId)) // 0
+                .field(Field.stringField(serviceId))
+                .field(Field.stringField(serviceInstanceId))
+                .field(Field.stringField(endpointId))
+                .field(Field.longField(latency)) // 4
+                .field(Field.longField(state))
+                .field(Field.stringField(httpStatusCode))
+                .field(Field.nullField()) // 7
+                .field(Field.stringField(dbType))
+                .field(Field.stringField(dbInstance))
+                .field(Field.stringField(broker))
+                .field(Field.stringField(topic))
+                .field(Field.stringField(queue)) // 12
+                .build();
+
+        traceBulkWriteProcessor.add(traceWrite);
+
+        if (allRequestsDelivered.await(5, TimeUnit.SECONDS)) {
+            Assert.assertEquals(1, writeRequestDelivered.size());
+            final BanyandbTrace.WriteRequest request = writeRequestDelivered.get(0);
+            Assert.assertEquals(13, request.getEntity().getFieldsCount());
+            Assert.assertEquals(traceId, request.getEntity().getFields(0).getStr().getValue());
+            Assert.assertEquals(latency, request.getEntity().getFields(4).getInt().getValue());
+            Assert.assertEquals(request.getEntity().getFields(7).getValueTypeCase(), Banyandb.Field.ValueTypeCase.NULL);
+            Assert.assertEquals(queue, request.getEntity().getFields(12).getStr().getValue());
+        } else {
+            Assert.fail();
+        }
+    }
+}

[skywalking-banyandb-java-client] 01/02: first commit

Posted by wu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb-java-client.git

commit 11fe8186b7472b12c454289cf7ff0ee181d246b2
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Tue Aug 31 11:55:22 2021 +0800

    first commit
---
 README.md | 0
 1 file changed, 0 insertions(+), 0 deletions(-)

diff --git a/README.md b/README.md
new file mode 100644
index 0000000..e69de29