You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/08/31 07:16:20 UTC
[skywalking-banyandb-java-client] 02/02: Initial project codebase.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb-java-client.git
commit 4575ecf012c130fe17197f3bbed7b7a1836a38a9
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Tue Aug 31 15:16:13 2021 +0800
Initial project codebase.
---
.gitignore | 23 ++
.mvn/wrapper/MavenWrapperDownloader.java | 117 +++++++
.mvn/wrapper/maven-wrapper.properties | 3 +
HEADER | 14 +
LICENSE | 202 ++++++++++++
NOTICE | 5 +
apm-checkstyle/CHECKSTYLE_HEAD | 18 ++
apm-checkstyle/checkStyle.xml | 133 ++++++++
apm-checkstyle/importControl.xml | 28 ++
mvnw | 310 ++++++++++++++++++
mvnw.cmd | 182 +++++++++++
pom.xml | 357 +++++++++++++++++++++
.../banyandb/commons/datacarrier/DataCarrier.java | 166 ++++++++++
.../banyandb/commons/datacarrier/EnvUtil.java | 50 +++
.../buffer/ArrayBlockingQueueBuffer.java | 67 ++++
.../commons/datacarrier/buffer/Buffer.java | 76 +++++
.../commons/datacarrier/buffer/BufferStrategy.java | 23 ++
.../commons/datacarrier/buffer/Channels.java | 93 ++++++
.../commons/datacarrier/buffer/QueueBuffer.java | 46 +++
.../datacarrier/common/AtomicRangeInteger.java | 76 +++++
.../datacarrier/consumer/BulkConsumePool.java | 118 +++++++
.../datacarrier/consumer/ConsumeDriver.java | 137 ++++++++
.../consumer/ConsumerCannotBeCreatedException.java | 25 ++
.../commons/datacarrier/consumer/ConsumerPool.java | 30 ++
.../datacarrier/consumer/ConsumerPoolFactory.java | 50 +++
.../datacarrier/consumer/ConsumerThread.java | 105 ++++++
.../commons/datacarrier/consumer/IConsumer.java | 40 +++
.../commons/datacarrier/consumer/IDriver.java | 32 ++
.../consumer/MultipleChannelsConsumer.java | 124 +++++++
.../datacarrier/partition/IDataPartitioner.java | 32 ++
.../partition/ProducerThreadPartitioner.java | 37 +++
.../partition/SimpleRollingPartitioner.java | 37 +++
.../banyandb/v1/client/BanyanDBClient.java | 197 ++++++++++++
.../banyandb/v1/client/BulkWriteProcessor.java | 129 ++++++++
.../skywalking/banyandb/v1/client/Field.java | 157 +++++++++
.../banyandb/v1/client/FieldAndValue.java | 101 ++++++
.../skywalking/banyandb/v1/client/Options.java | 43 +++
.../banyandb/v1/client/PairQueryCondition.java | 307 ++++++++++++++++++
.../skywalking/banyandb/v1/client/RowEntity.java | 59 ++++
.../banyandb/v1/client/SerializableField.java | 29 ++
.../banyandb/v1/client/TimestampRange.java | 53 +++
.../v1/client/TraceBulkWriteProcessor.java | 102 ++++++
.../skywalking/banyandb/v1/client/TraceQuery.java | 137 ++++++++
.../banyandb/v1/client/TraceQueryResponse.java | 45 +++
.../skywalking/banyandb/v1/client/TraceWrite.java | 81 +++++
src/main/proto/banyandb/v1/banyandb-trace.proto | 107 ++++++
src/main/proto/banyandb/v1/banyandb.proto | 136 ++++++++
.../v1/client/BanyanDBClientQueryTest.java | 245 ++++++++++++++
.../v1/client/BanyanDBClientWriteTest.java | 155 +++++++++
49 files changed, 4839 insertions(+)
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..562e568
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,23 @@
+/build/
+target/
+.idea/
+*.iml
+.classpath
+.project
+.settings/
+.DS_Store
+*~
+packages/
+**/dependency-reduced-pom.xml
+**/dist/
+/docker/snapshot/*.gz
+.mvn/wrapper/*.jar
+.factorypath
+.vscode
+.checkstyle
+.externalToolBuilders
+/test/plugin/dist
+/test/plugin/workspace
+/test/jacoco/classes
+/test/jacoco/*.exec
+test/jacoco
\ No newline at end of file
diff --git a/.mvn/wrapper/MavenWrapperDownloader.java b/.mvn/wrapper/MavenWrapperDownloader.java
new file mode 100644
index 0000000..187216f
--- /dev/null
+++ b/.mvn/wrapper/MavenWrapperDownloader.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2007-present the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.net.*;
+import java.io.*;
+import java.nio.channels.*;
+import java.util.Properties;
+
+public class MavenWrapperDownloader {
+
+ private static final String WRAPPER_VERSION = "0.5.5";
+ /**
+ * Default URL to download the maven-wrapper.jar from, if no 'downloadUrl' is provided.
+ */
+ private static final String DEFAULT_DOWNLOAD_URL = "https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/"
+ + WRAPPER_VERSION + "/maven-wrapper-" + WRAPPER_VERSION + ".jar";
+
+ /**
+ * Path to the maven-wrapper.properties file, which might contain a downloadUrl property to
+ * use instead of the default one.
+ */
+ private static final String MAVEN_WRAPPER_PROPERTIES_PATH =
+ ".mvn/wrapper/maven-wrapper.properties";
+
+ /**
+ * Path where the maven-wrapper.jar will be saved to.
+ */
+ private static final String MAVEN_WRAPPER_JAR_PATH =
+ ".mvn/wrapper/maven-wrapper.jar";
+
+ /**
+ * Name of the property which should be used to override the default download url for the wrapper.
+ */
+ private static final String PROPERTY_NAME_WRAPPER_URL = "wrapperUrl";
+
+ public static void main(String args[]) {
+ System.out.println("- Downloader started");
+ File baseDirectory = new File(args[0]);
+ System.out.println("- Using base directory: " + baseDirectory.getAbsolutePath());
+
+ // If the maven-wrapper.properties exists, read it and check if it contains a custom
+ // wrapperUrl parameter.
+ File mavenWrapperPropertyFile = new File(baseDirectory, MAVEN_WRAPPER_PROPERTIES_PATH);
+ String url = DEFAULT_DOWNLOAD_URL;
+ if(mavenWrapperPropertyFile.exists()) {
+ FileInputStream mavenWrapperPropertyFileInputStream = null;
+ try {
+ mavenWrapperPropertyFileInputStream = new FileInputStream(mavenWrapperPropertyFile);
+ Properties mavenWrapperProperties = new Properties();
+ mavenWrapperProperties.load(mavenWrapperPropertyFileInputStream);
+ url = mavenWrapperProperties.getProperty(PROPERTY_NAME_WRAPPER_URL, url);
+ } catch (IOException e) {
+ System.out.println("- ERROR loading '" + MAVEN_WRAPPER_PROPERTIES_PATH + "'");
+ } finally {
+ try {
+ if(mavenWrapperPropertyFileInputStream != null) {
+ mavenWrapperPropertyFileInputStream.close();
+ }
+ } catch (IOException e) {
+ // Ignore ...
+ }
+ }
+ }
+ System.out.println("- Downloading from: " + url);
+
+ File outputFile = new File(baseDirectory.getAbsolutePath(), MAVEN_WRAPPER_JAR_PATH);
+ if(!outputFile.getParentFile().exists()) {
+ if(!outputFile.getParentFile().mkdirs()) {
+ System.out.println(
+ "- ERROR creating output directory '" + outputFile.getParentFile().getAbsolutePath() + "'");
+ }
+ }
+ System.out.println("- Downloading to: " + outputFile.getAbsolutePath());
+ try {
+ downloadFileFromURL(url, outputFile);
+ System.out.println("Done");
+ System.exit(0);
+ } catch (Throwable e) {
+ System.out.println("- Error downloading");
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+
+ private static void downloadFileFromURL(String urlString, File destination) throws Exception {
+ if (System.getenv("MVNW_USERNAME") != null && System.getenv("MVNW_PASSWORD") != null) {
+ String username = System.getenv("MVNW_USERNAME");
+ char[] password = System.getenv("MVNW_PASSWORD").toCharArray();
+ Authenticator.setDefault(new Authenticator() {
+ @Override
+ protected PasswordAuthentication getPasswordAuthentication() {
+ return new PasswordAuthentication(username, password);
+ }
+ });
+ }
+ URL website = new URL(urlString);
+ ReadableByteChannel rbc;
+ rbc = Channels.newChannel(website.openStream());
+ FileOutputStream fos = new FileOutputStream(destination);
+ fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
+ fos.close();
+ rbc.close();
+ }
+
+}
diff --git a/.mvn/wrapper/maven-wrapper.properties b/.mvn/wrapper/maven-wrapper.properties
new file mode 100644
index 0000000..54ab0bc
--- /dev/null
+++ b/.mvn/wrapper/maven-wrapper.properties
@@ -0,0 +1,3 @@
+distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.6.1/apache-maven-3.6.1-bin.zip
+wrapperUrl=https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.5/maven-wrapper-0.5.5.jar
+
diff --git a/HEADER b/HEADER
new file mode 100644
index 0000000..1745cfe
--- /dev/null
+++ b/HEADER
@@ -0,0 +1,14 @@
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..8f71f43
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,202 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "{}"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright {yyyy} {name of copyright owner}
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
diff --git a/NOTICE b/NOTICE
new file mode 100644
index 0000000..2ca1b04
--- /dev/null
+++ b/NOTICE
@@ -0,0 +1,5 @@
+Apache SkyWalking
+Copyright 2017-2021 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
diff --git a/apm-checkstyle/CHECKSTYLE_HEAD b/apm-checkstyle/CHECKSTYLE_HEAD
new file mode 100755
index 0000000..00e3b09
--- /dev/null
+++ b/apm-checkstyle/CHECKSTYLE_HEAD
@@ -0,0 +1,18 @@
+^<\?xml version="1\.0" encoding="UTF-8"\?>$
+^<!--$
+^ *(/\*|#|~)$
+^ *(\*|#|~) +Licensed to the Apache Software Foundation \(ASF\) under one or more$
+^ *(\*|#|~) +contributor license agreements\. See the NOTICE file distributed with$
+^ *(\*|#|~) +this work for additional information regarding copyright ownership\.$
+^ *(\*|#|~) +The ASF licenses this file to You under the Apache License, Version 2\.0$
+^ *(\*|#|~) +\(the "License"\); you may not use this file except in compliance with$
+^ *(\*|#|~) +the License\. You may obtain a copy of the License at$
+^ *(\*|#|~)$
+^ *(\*|#|~) +http://www\.apache\.org/licenses/LICENSE-2\.0$
+^ *(\*|#|~)$
+^ *(\*|#|~) +Unless required by applicable law or agreed to in writing, software$
+^ *(\*|#|~) +distributed under the License is distributed on an "AS IS" BASIS,$
+^ *(\*|#|~) +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied\.$
+^ *(\*|#|~) +See the License for the specific language governing permissions and$
+^ *(\*|#|~) +limitations under the License\.$
+^ *(\*|#|~)$
diff --git a/apm-checkstyle/checkStyle.xml b/apm-checkstyle/checkStyle.xml
new file mode 100755
index 0000000..d51e0c7
--- /dev/null
+++ b/apm-checkstyle/checkStyle.xml
@@ -0,0 +1,133 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+<!DOCTYPE module PUBLIC
+ "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
+ "http://checkstyle.org/dtds/configuration_1_3.dtd">
+<module name="Checker">
+
+ <property name="localeLanguage" value="en"/>
+
+ <module name="FileTabCharacter">
+ <property name="eachLine" value="true"/>
+ </module>
+
+ <module name="RegexpHeader">
+ <property name="headerFile" value="${checkstyle.header.file}"/>
+ <property name="multiLines" value="1, 2, 3, 18"/>
+ </module>
+
+ <module name="RegexpSingleline">
+ <property name="format" value="System\.out\.println"/>
+ <property name="message" value="Prohibit invoking System.out.println in source code !"/>
+ </module>
+
+ <module name="RegexpSingleline">
+ <property name="format" value="^\s*\*\s*@author"/>
+ <property name="minimum" value="0"/>
+ <property name="maximum" value="0"/>
+ <property name="message" value="ASF project doesn't allow @author copyright."/>
+ </module>
+
+ <module name="RegexpSingleline">
+ <property name="format"
+ value=".*[\u3400-\u4DB5\u4E00-\u9FA5\u9FA6-\u9FBB\uF900-\uFA2D\uFA30-\uFA6A\uFA70-\uFAD9\uFF00-\uFFEF\u2E80-\u2EFF\u3000-\u303F\u31C0-\u31EF]+.*"/>
+ <property name="message" value="Not allow chinese character !"/>
+ </module>
+
+ <module name="FileLength">
+ <property name="max" value="3000"/>
+ </module>
+
+ <module name="TreeWalker">
+
+ <module name="UnusedImports"/>
+ <module name="RedundantImport"/>
+ <module name="AvoidStarImport"/>
+
+ <module name="NonEmptyAtclauseDescription"/>
+
+ <!--Checks that classes that override equals() also override hashCode()-->
+ <module name="EqualsHashCode"/>
+ <!--Checks for over-complicated boolean expressions. Currently finds code like if (topic == true), topic || true, !false, etc.-->
+ <module name="SimplifyBooleanExpression"/>
+ <module name="OneStatementPerLine"/>
+ <module name="UnnecessaryParentheses"/>
+ <!--Checks for over-complicated boolean return statements. For example the following code-->
+ <module name="SimplifyBooleanReturn"/>
+
+ <!--Check that the default is after all the cases in producerGroup switch statement-->
+ <module name="DefaultComesLast"/>
+ <!--Detects empty statements (standalone ";" semicolon)-->
+ <module name="EmptyStatement"/>
+ <!--Checks that long constants are defined with an upper ell-->
+ <module name="UpperEll"/>
+ <module name="ConstantName">
+ <property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)"/>
+ </module>
+ <!--Checks that local, non-final variable names conform to producerGroup format specified by the format property-->
+ <module name="LocalVariableName"/>
+ <!--Validates identifiers for local, final variables, including catch parameters-->
+ <module name="LocalFinalVariableName"/>
+ <!--Validates identifiers for non-static fields-->
+ <module name="MemberName"/>
+ <!--Validates identifiers for class type parameters-->
+ <module name="ClassTypeParameterName">
+ <property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)"/>
+ </module>
+ <!--Validates identifiers for method type parameters-->
+ <module name="MethodTypeParameterName">
+ <property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)"/>
+ </module>
+ <module name="PackageName">
+ <property name="format" value="^(org|test)\.apache\.skywalking(\.[a-zA-Z][a-zA-Z0-9]*)+$"/>
+ </module>
+ <module name="ParameterName"/>
+ <module name="StaticVariableName">
+ <property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)"/>
+ </module>
+ <module name="TypeName">
+ <property name="format" value="(^[A-Z][a-zA-Z0-9]*$)|(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)"/>
+ </module>
+ <module name="MissingOverride"/>
+
+ <!--whitespace-->
+ <module name="GenericWhitespace"/>
+ <module name="WhitespaceAfter"/>
+ <module name="WhitespaceAround"/>
+ <module name="MethodParamPad"/>
+ <module name="ParenPad"/>
+ <module name="TypecastParenPad"/>
+ <module name="EmptyLineSeparator">
+ <property name="allowNoEmptyLineBetweenFields" value="true"/>
+ <property name="allowMultipleEmptyLines" value="false"/>
+ <property name="allowMultipleEmptyLinesInsideClassMembers" value="false"/>
+ </module>
+
+ <module name="ImportControl">
+ <property name="file" value="${import.control}"/>
+ <property name="path" value="apm-sniffer/(apm-sdk-plugin|bootstrap-plugins|optional-plugins)/.+/src/main/.+Instrumentation.java$"/>
+ </module>
+
+ <module name="ImportControl">
+ <property name="file" value="${import.control}"/>
+ <property name="path" value="apm-sniffer/apm-toolkit-activation/.+/src/main/.+Activation.java$"/>
+ </module>
+ </module>
+</module>
diff --git a/apm-checkstyle/importControl.xml b/apm-checkstyle/importControl.xml
new file mode 100755
index 0000000..ed6c0a4
--- /dev/null
+++ b/apm-checkstyle/importControl.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+<!DOCTYPE import-control PUBLIC
+ "-//Puppy Crawl//DTD Import Control 1.4//EN"
+ "https://checkstyle.org/dtds/import_control_1_4.dtd">
+
+<import-control pkg="org.apache.skywalking.apm.(toolkit|plugin)" regex="true">
+ <allow pkg="java"/>
+ <allow pkg="org.apache.skywalking"/>
+ <allow pkg="net.bytebuddy"/>
+</import-control>
diff --git a/mvnw b/mvnw
new file mode 100755
index 0000000..a3925bb
--- /dev/null
+++ b/mvnw
@@ -0,0 +1,310 @@
+#!/bin/sh
+# ----------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# ----------------------------------------------------------------------------
+
+# ----------------------------------------------------------------------------
+# Maven Start Up Batch script
+#
+# Required ENV vars:
+# ------------------
+# JAVA_HOME - location of a JDK home dir
+#
+# Optional ENV vars
+# -----------------
+# M2_HOME - location of maven2's installed home dir
+# MAVEN_OPTS - parameters passed to the Java VM when running Maven
+# e.g. to debug Maven itself, use
+# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
+# MAVEN_SKIP_RC - flag to disable loading of mavenrc files
+# ----------------------------------------------------------------------------
+
+if [ -z "$MAVEN_SKIP_RC" ] ; then
+
+ if [ -f /etc/mavenrc ] ; then
+ . /etc/mavenrc
+ fi
+
+ if [ -f "$HOME/.mavenrc" ] ; then
+ . "$HOME/.mavenrc"
+ fi
+
+fi
+
+# OS specific support. $var _must_ be set to either true or false.
+cygwin=false;
+darwin=false;
+mingw=false
+case "`uname`" in
+ CYGWIN*) cygwin=true ;;
+ MINGW*) mingw=true;;
+ Darwin*) darwin=true
+ # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home
+ # See https://developer.apple.com/library/mac/qa/qa1170/_index.html
+ if [ -z "$JAVA_HOME" ]; then
+ if [ -x "/usr/libexec/java_home" ]; then
+ export JAVA_HOME="`/usr/libexec/java_home`"
+ else
+ export JAVA_HOME="/Library/Java/Home"
+ fi
+ fi
+ ;;
+esac
+
+if [ -z "$JAVA_HOME" ] ; then
+ if [ -r /etc/gentoo-release ] ; then
+ JAVA_HOME=`java-config --jre-home`
+ fi
+fi
+
+if [ -z "$M2_HOME" ] ; then
+ ## resolve links - $0 may be a link to maven's home
+ PRG="$0"
+
+ # need this for relative symlinks
+ while [ -h "$PRG" ] ; do
+ ls=`ls -ld "$PRG"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG="`dirname "$PRG"`/$link"
+ fi
+ done
+
+ saveddir=`pwd`
+
+ M2_HOME=`dirname "$PRG"`/..
+
+ # make it fully qualified
+ M2_HOME=`cd "$M2_HOME" && pwd`
+
+ cd "$saveddir"
+ # echo Using m2 at $M2_HOME
+fi
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched
+if $cygwin ; then
+ [ -n "$M2_HOME" ] &&
+ M2_HOME=`cygpath --unix "$M2_HOME"`
+ [ -n "$JAVA_HOME" ] &&
+ JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
+ [ -n "$CLASSPATH" ] &&
+ CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
+fi
+
+# For Mingw, ensure paths are in UNIX format before anything is touched
+if $mingw ; then
+ [ -n "$M2_HOME" ] &&
+ M2_HOME="`(cd "$M2_HOME"; pwd)`"
+ [ -n "$JAVA_HOME" ] &&
+ JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
+fi
+
+if [ -z "$JAVA_HOME" ]; then
+ javaExecutable="`which javac`"
+ if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then
+ # readlink(1) is not available as standard on Solaris 10.
+ readLink=`which readlink`
+ if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then
+ if $darwin ; then
+ javaHome="`dirname \"$javaExecutable\"`"
+ javaExecutable="`cd \"$javaHome\" && pwd -P`/javac"
+ else
+ javaExecutable="`readlink -f \"$javaExecutable\"`"
+ fi
+ javaHome="`dirname \"$javaExecutable\"`"
+ javaHome=`expr "$javaHome" : '\(.*\)/bin'`
+ JAVA_HOME="$javaHome"
+ export JAVA_HOME
+ fi
+ fi
+fi
+
+if [ -z "$JAVACMD" ] ; then
+ if [ -n "$JAVA_HOME" ] ; then
+ if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+ # IBM's JDK on AIX uses strange locations for the executables
+ JAVACMD="$JAVA_HOME/jre/sh/java"
+ else
+ JAVACMD="$JAVA_HOME/bin/java"
+ fi
+ else
+ JAVACMD="`which java`"
+ fi
+fi
+
+if [ ! -x "$JAVACMD" ] ; then
+ echo "Error: JAVA_HOME is not defined correctly." >&2
+ echo " We cannot execute $JAVACMD" >&2
+ exit 1
+fi
+
+if [ -z "$JAVA_HOME" ] ; then
+ echo "Warning: JAVA_HOME environment variable is not set."
+fi
+
+CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher
+
+# traverses directory structure from process work directory to filesystem root
+# first directory with .mvn subdirectory is considered project base directory
+find_maven_basedir() {
+
+ if [ -z "$1" ]
+ then
+ echo "Path not specified to find_maven_basedir"
+ return 1
+ fi
+
+ basedir="$1"
+ wdir="$1"
+ while [ "$wdir" != '/' ] ; do
+ if [ -d "$wdir"/.mvn ] ; then
+ basedir=$wdir
+ break
+ fi
+ # workaround for JBEAP-8937 (on Solaris 10/Sparc)
+ if [ -d "${wdir}" ]; then
+ wdir=`cd "$wdir/.."; pwd`
+ fi
+ # end of workaround
+ done
+ echo "${basedir}"
+}
+
+# concatenates all lines of a file
+concat_lines() {
+ if [ -f "$1" ]; then
+ echo "$(tr -s '\n' ' ' < "$1")"
+ fi
+}
+
+BASE_DIR=`find_maven_basedir "$(pwd)"`
+if [ -z "$BASE_DIR" ]; then
+ exit 1;
+fi
+
+##########################################################################################
+# Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
+# This allows using the maven wrapper in projects that prohibit checking in binary data.
+##########################################################################################
+if [ -r "$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" ]; then
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Found .mvn/wrapper/maven-wrapper.jar"
+ fi
+else
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Couldn't find .mvn/wrapper/maven-wrapper.jar, downloading it ..."
+ fi
+ if [ -n "$MVNW_REPOURL" ]; then
+ jarUrl="$MVNW_REPOURL/io/takari/maven-wrapper/0.5.5/maven-wrapper-0.5.5.jar"
+ else
+ jarUrl="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.5/maven-wrapper-0.5.5.jar"
+ fi
+ while IFS="=" read key value; do
+ case "$key" in (wrapperUrl) jarUrl="$value"; break ;;
+ esac
+ done < "$BASE_DIR/.mvn/wrapper/maven-wrapper.properties"
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Downloading from: $jarUrl"
+ fi
+ wrapperJarPath="$BASE_DIR/.mvn/wrapper/maven-wrapper.jar"
+ if $cygwin; then
+ wrapperJarPath=`cygpath --path --windows "$wrapperJarPath"`
+ fi
+
+ if command -v wget > /dev/null; then
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Found wget ... using wget"
+ fi
+ if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
+ wget "$jarUrl" -O "$wrapperJarPath"
+ else
+ wget --http-user=$MVNW_USERNAME --http-password=$MVNW_PASSWORD "$jarUrl" -O "$wrapperJarPath"
+ fi
+ elif command -v curl > /dev/null; then
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Found curl ... using curl"
+ fi
+ if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
+ curl -o "$wrapperJarPath" "$jarUrl" -f
+ else
+ curl --user $MVNW_USERNAME:$MVNW_PASSWORD -o "$wrapperJarPath" "$jarUrl" -f
+ fi
+
+ else
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Falling back to using Java to download"
+ fi
+ javaClass="$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.java"
+ # For Cygwin, switch paths to Windows format before running javac
+ if $cygwin; then
+ javaClass=`cygpath --path --windows "$javaClass"`
+ fi
+ if [ -e "$javaClass" ]; then
+ if [ ! -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo " - Compiling MavenWrapperDownloader.java ..."
+ fi
+ # Compiling the Java class
+ ("$JAVA_HOME/bin/javac" "$javaClass")
+ fi
+ if [ -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then
+ # Running the downloader
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo " - Running MavenWrapperDownloader.java ..."
+ fi
+ ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$MAVEN_PROJECTBASEDIR")
+ fi
+ fi
+ fi
+fi
+##########################################################################################
+# End of extension
+##########################################################################################
+
+export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}
+if [ "$MVNW_VERBOSE" = true ]; then
+ echo $MAVEN_PROJECTBASEDIR
+fi
+MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin; then
+ [ -n "$M2_HOME" ] &&
+ M2_HOME=`cygpath --path --windows "$M2_HOME"`
+ [ -n "$JAVA_HOME" ] &&
+ JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
+ [ -n "$CLASSPATH" ] &&
+ CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
+ [ -n "$MAVEN_PROJECTBASEDIR" ] &&
+ MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"`
+fi
+
+# Provide a "standardized" way to retrieve the CLI args that will
+# work with both Windows and non-Windows executions.
+MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@"
+export MAVEN_CMD_LINE_ARGS
+
+WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
+
+exec "$JAVACMD" \
+ $MAVEN_OPTS \
+ -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
+ "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
+ ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@"
diff --git a/mvnw.cmd b/mvnw.cmd
new file mode 100755
index 0000000..15123ea
--- /dev/null
+++ b/mvnw.cmd
@@ -0,0 +1,182 @@
+@REM ----------------------------------------------------------------------------
+@REM Licensed to the Apache Software Foundation (ASF) under one
+@REM or more contributor license agreements. See the NOTICE file
+@REM distributed with this work for additional information
+@REM regarding copyright ownership. The ASF licenses this file
+@REM to you under the Apache License, Version 2.0 (the
+@REM "License"); you may not use this file except in compliance
+@REM with the License. You may obtain a copy of the License at
+@REM
+@REM http://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM Unless required by applicable law or agreed to in writing,
+@REM software distributed under the License is distributed on an
+@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+@REM KIND, either express or implied. See the License for the
+@REM specific language governing permissions and limitations
+@REM under the License.
+@REM ----------------------------------------------------------------------------
+
+@REM ----------------------------------------------------------------------------
+@REM Maven Start Up Batch script
+@REM
+@REM Required ENV vars:
+@REM JAVA_HOME - location of a JDK home dir
+@REM
+@REM Optional ENV vars
+@REM M2_HOME - location of maven2's installed home dir
+@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
+@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending
+@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
+@REM e.g. to debug Maven itself, use
+@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
+@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
+@REM ----------------------------------------------------------------------------
+
+@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
+@echo off
+@REM set title of command window
+title %0
+@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on'
+@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO%
+
+@REM set %HOME% to equivalent of $HOME
+if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
+
+@REM Execute a user defined script before this one
+if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
+@REM check for pre script, once with legacy .bat ending and once with .cmd ending
+if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
+if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
+:skipRcPre
+
+@setlocal
+
+set ERROR_CODE=0
+
+@REM To isolate internal variables from possible post scripts, we use another setlocal
+@setlocal
+
+@REM ==== START VALIDATION ====
+if not "%JAVA_HOME%" == "" goto OkJHome
+
+echo.
+echo Error: JAVA_HOME not found in your environment. >&2
+echo Please set the JAVA_HOME variable in your environment to match the >&2
+echo location of your Java installation. >&2
+echo.
+goto error
+
+:OkJHome
+if exist "%JAVA_HOME%\bin\java.exe" goto init
+
+echo.
+echo Error: JAVA_HOME is set to an invalid directory. >&2
+echo JAVA_HOME = "%JAVA_HOME%" >&2
+echo Please set the JAVA_HOME variable in your environment to match the >&2
+echo location of your Java installation. >&2
+echo.
+goto error
+
+@REM ==== END VALIDATION ====
+
+:init
+
+@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
+@REM Fallback to current working directory if not found.
+
+set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
+IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
+
+set EXEC_DIR=%CD%
+set WDIR=%EXEC_DIR%
+:findBaseDir
+IF EXIST "%WDIR%"\.mvn goto baseDirFound
+cd ..
+IF "%WDIR%"=="%CD%" goto baseDirNotFound
+set WDIR=%CD%
+goto findBaseDir
+
+:baseDirFound
+set MAVEN_PROJECTBASEDIR=%WDIR%
+cd "%EXEC_DIR%"
+goto endDetectBaseDir
+
+:baseDirNotFound
+set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
+cd "%EXEC_DIR%"
+
+:endDetectBaseDir
+
+IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
+
+@setlocal EnableExtensions EnableDelayedExpansion
+for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
+@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
+
+:endReadAdditionalConfig
+
+SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
+set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
+set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
+
+set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.5/maven-wrapper-0.5.5.jar"
+
+FOR /F "tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO (
+ IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B
+)
+
+@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
+@REM This allows using the maven wrapper in projects that prohibit checking in binary data.
+if exist %WRAPPER_JAR% (
+ if "%MVNW_VERBOSE%" == "true" (
+ echo Found %WRAPPER_JAR%
+ )
+) else (
+ if not "%MVNW_REPOURL%" == "" (
+ SET DOWNLOAD_URL="%MVNW_REPOURL%/io/takari/maven-wrapper/0.5.5/maven-wrapper-0.5.5.jar"
+ )
+ if "%MVNW_VERBOSE%" == "true" (
+ echo Couldn't find %WRAPPER_JAR%, downloading it ...
+ echo Downloading from: %DOWNLOAD_URL%
+ )
+
+ powershell -Command "&{"^
+ "$webclient = new-object System.Net.WebClient;"^
+ "if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^
+ "$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^
+ "}"^
+ "[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"^
+ "}"
+ if "%MVNW_VERBOSE%" == "true" (
+ echo Finished downloading %WRAPPER_JAR%
+ )
+)
+@REM End of extension
+
+@REM Provide a "standardized" way to retrieve the CLI args that will
+@REM work with both Windows and non-Windows executions.
+set MAVEN_CMD_LINE_ARGS=%*
+
+%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
+if ERRORLEVEL 1 goto error
+goto end
+
+:error
+set ERROR_CODE=1
+
+:end
+@endlocal & set ERROR_CODE=%ERROR_CODE%
+
+if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
+@REM check for post script, once with legacy .bat ending and once with .cmd ending
+if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
+if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
+:skipRcPost
+
+@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
+if "%MAVEN_BATCH_PAUSE%" == "on" pause
+
+if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
+
+exit /B %ERROR_CODE%
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..fb36c46
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,357 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>banyandb-java-client</artifactId>
+ <version>0.1.0-SNAPSHOT</version>
+
+ <parent>
+ <groupId>org.apache</groupId>
+ <artifactId>apache</artifactId>
+ <version>21</version>
+ <relativePath/>
+ </parent>
+
+ <packaging>jar</packaging>
+
+ <name>banyandb-java-client</name>
+ <url>https://github.com/apache/skywalking-banyandb-java-client</url>
+
+ <scm>
+ <url>https://github.com/apache/skywalking-banyandb-java-client</url>
+ <connection>scm:git:https://github.com/apache/skywalking-banyandb-java-client.git</connection>
+ <developerConnection>scm:git:https://github.com/apache/skywalking-banyandb-java-client.git</developerConnection>
+ <tag>HEAD</tag>
+ </scm>
+
+ <issueManagement>
+ <system>GitHub</system>
+ <url>https://github.com/apache/skywalking/issues</url>
+ </issueManagement>
+
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ </license>
+ </licenses>
+
+ <mailingLists>
+ <mailingList>
+ <name>SkyWalking Developer List</name>
+ <post>dev@skywalking.apache.org</post>
+ <subscribe>dev-subscribe@skywalking.apache.org</subscribe>
+ <unsubscribe>dev-unsubscribe@skywalking.apache.org</unsubscribe>
+ </mailingList>
+ <mailingList>
+ <name>SkyWalking Commits</name>
+ <post>commits@skywalking.apache.org</post>
+ <subscribe>commits-subscribe@skywalking.apache.org</subscribe>
+ <unsubscribe>commits-unsubscribe@skywalking.apache.org</unsubscribe>
+ </mailingList>
+ </mailingLists>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <compiler.version>1.8</compiler.version>
+ <powermock.version>2.0.7</powermock.version>
+ <checkstyle.version>6.18</checkstyle.version>
+ <junit.version>4.12</junit.version>
+ <mockito-core.version>3.5.13</mockito-core.version>
+ <lombok.version>1.18.20</lombok.version>
+
+ <!-- core lib dependency -->
+ <bytebuddy.version>1.10.19</bytebuddy.version>
+ <grpc.version>1.32.1</grpc.version>
+ <gson.version>2.8.6</gson.version>
+ <os-maven-plugin.version>1.6.2</os-maven-plugin.version>
+ <protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
+ <com.google.protobuf.protoc.version>3.12.0</com.google.protobuf.protoc.version>
+ <protoc-gen-grpc-java.plugin.version>1.32.1</protoc-gen-grpc-java.plugin.version>
+ <netty-tcnative-boringssl-static.version>2.0.39.Final</netty-tcnative-boringssl-static.version>
+ <javax.annotation-api.version>1.3.2</javax.annotation-api.version>
+ <!-- necessary for Java 9+ -->
+ <org.apache.tomcat.annotations-api.version>6.0.53</org.apache.tomcat.annotations-api.version>
+ <slf4j.version>1.7.30</slf4j.version>
+
+ <!-- Plugin versions -->
+ <docker.plugin.version>0.4.13</docker.plugin.version>
+ <takari-maven-plugin.version>0.6.1</takari-maven-plugin.version>
+ <exec-maven-plugin.version>1.6.0</exec-maven-plugin.version>
+ <maven-antrun-plugin.version>1.8</maven-antrun-plugin.version>
+ <maven-dependency-plugin.version>2.10</maven-dependency-plugin.version>
+ <maven-deploy-plugin.version>2.8.2</maven-deploy-plugin.version>
+ <maven-assembly-plugin.version>3.1.0</maven-assembly-plugin.version>
+ <maven-failsafe-plugin.version>2.22.0</maven-failsafe-plugin.version>
+ <build-helper-maven-plugin.version>3.2.0</build-helper-maven-plugin.version>
+ <maven-jar-plugin.version>3.1.0</maven-jar-plugin.version>
+ <maven-enforcer-plugin.version>3.0.0-M2</maven-enforcer-plugin.version>
+ <maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
+ <maven-resource-plugin.version>3.1.0</maven-resource-plugin.version>
+ <maven-source-plugin.version>3.0.1</maven-source-plugin.version>
+ <versions-maven-plugin.version>2.5</versions-maven-plugin.version>
+ <coveralls-maven-plugin.version>4.3.0</coveralls-maven-plugin.version>
+ <maven-checkstyle-plugin.version>3.1.0</maven-checkstyle-plugin.version>
+ <jmh.version>1.21</jmh.version>
+ <checkstyle.fails.on.error>true</checkstyle.fails.on.error>
+
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty-shaded</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-tcnative-boringssl-static</artifactId>
+ <version>${netty-tcnative-boringssl-static.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency> <!-- necessary for Java 9+ -->
+ <groupId>org.apache.tomcat</groupId>
+ <artifactId>annotations-api</artifactId>
+ <version>${org.apache.tomcat.annotations-api.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-testing</artifactId>
+ <version>${grpc.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>${mockito-core.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito2</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <version>${lombok.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>javax.annotation</groupId>
+ <artifactId>javax.annotation-api</artifactId>
+ <version>${javax.annotation-api.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>${os-maven-plugin.version}</version>
+ <executions>
+ <execution>
+ <phase>initialize</phase>
+ <goals>
+ <goal>detect</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <version>${protobuf-maven-plugin.version}</version>
+ <configuration>
+ <!--
+ The version of protoc must match protobuf-java. If you don't depend on
+ protobuf-java directly, you will be transitively depending on the
+ protobuf-java version that grpc depends on.
+ -->
+ <protocArtifact>
+ com.google.protobuf:protoc:${com.google.protobuf.protoc.version}:exe:${os.detected.classifier}
+ </protocArtifact>
+ <pluginId>grpc-java</pluginId>
+ <pluginArtifact>
+ io.grpc:protoc-gen-grpc-java:${protoc-gen-grpc-java.plugin.version}:exe:${os.detected.classifier}
+ </pluginArtifact>
+ </configuration>
+ <executions>
+ <execution>
+ <id>grpc-build</id>
+ <goals>
+ <goal>compile</goal>
+ <goal>compile-custom</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <version>${maven-enforcer-plugin.version}</version>
+ <executions>
+ <execution>
+ <id>enforce-java</id>
+ <goals>
+ <goal>enforce</goal>
+ </goals>
+ <phase>validate</phase>
+ <configuration>
+ <rules>
+ <requireJavaVersion>
+ <!-- Build has not yet been updated for Java 9+ -->
+ <version>1.8</version>
+ </requireJavaVersion>
+ <requireMavenVersion>
+ <version>3.6</version>
+ </requireMavenVersion>
+ </rules>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>${maven-compiler-plugin.version}</version>
+ <configuration>
+ <source>${compiler.version}</source>
+ <target>${compiler.version}</target>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>${maven-resource-plugin.version}</version>
+ <configuration>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>com.spotify</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <version>${docker.plugin.version}</version>
+ <configuration>
+ <skipDocker>true</skipDocker>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>${maven-source-plugin.version}</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <phase>none</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>${maven-checkstyle-plugin.version}</version>
+ <configuration>
+ <configLocation>${maven.multiModuleProjectDirectory}/apm-checkstyle/checkStyle.xml</configLocation>
+ <headerLocation>${maven.multiModuleProjectDirectory}/apm-checkstyle/CHECKSTYLE_HEAD</headerLocation>
+ <encoding>UTF-8</encoding>
+ <consoleOutput>true</consoleOutput>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ <failOnViolation>${checkstyle.fails.on.error}</failOnViolation>
+ <sourceDirectories>
+ <sourceDirectory>${project.build.sourceDirectory}</sourceDirectory>
+ <sourceDirectory>${project.build.testSourceDirectory}</sourceDirectory>
+ </sourceDirectories>
+ <resourceIncludes>
+ **/*.properties,
+ **/*.sh,
+ **/*.bat,
+ **/*.yml,
+ **/*.yaml,
+ **/*.xml
+ </resourceIncludes>
+ <resourceExcludes>
+ **/.asf.yaml,
+ **/.github/**
+ </resourceExcludes>
+ <excludes>
+ **/target/generated-test-sources/**,
+ org/apache/skywalking/apm/network/register/v2/**/*.java,
+ org/apache/skywalking/apm/network/common/**/*.java,
+ org/apache/skywalking/apm/network/servicemesh/**/*.java,
+ org/apache/skywalking/apm/network/language/**/*.java,
+ org/apache/skywalking/oap/server/core/remote/grpc/proto/*.java,
+ org/apache/skywalking/oal/rt/grammar/*.java,
+ org/apache/skywalking/oap/server/exporter/grpc/*.java,
+ org/apache/skywalking/oap/server/configuration/service/*.java,
+ **/generated/*_jmhType*.java,
+ **/generated/*_jmhTest.java
+ </excludes>
+ <propertyExpansion>
+ import.control=${maven.multiModuleProjectDirectory}/apm-checkstyle/importControl.xml
+ </propertyExpansion>
+ </configuration>
+ <executions>
+ <execution>
+ <id>validate</id>
+ <phase>process-sources</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/DataCarrier.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/DataCarrier.java
new file mode 100644
index 0000000..db2ed2d
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/DataCarrier.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier;
+
+import java.util.Properties;
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.BufferStrategy;
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Channels;
+import org.apache.skywalking.banyandb.commons.datacarrier.consumer.ConsumeDriver;
+import org.apache.skywalking.banyandb.commons.datacarrier.consumer.ConsumerPool;
+import org.apache.skywalking.banyandb.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.banyandb.commons.datacarrier.consumer.IDriver;
+import org.apache.skywalking.banyandb.commons.datacarrier.partition.IDataPartitioner;
+import org.apache.skywalking.banyandb.commons.datacarrier.partition.SimpleRollingPartitioner;
+
+/**
+ * DataCarrier main class. use this instance to set Producer/Consumer Model.
+ */
+public class DataCarrier<T> {
+ private Channels<T> channels;
+ private IDriver driver;
+ private String name;
+
+ public DataCarrier(int channelSize, int bufferSize) {
+ this("DEFAULT", channelSize, bufferSize);
+ }
+
+ public DataCarrier(String name, int channelSize, int bufferSize) {
+ this(name, name, channelSize, bufferSize);
+ }
+
+ public DataCarrier(String name, String envPrefix, int channelSize, int bufferSize) {
+ this(name, envPrefix, channelSize, bufferSize, BufferStrategy.BLOCKING);
+ }
+
+ public DataCarrier(String name, String envPrefix, int channelSize, int bufferSize, BufferStrategy strategy) {
+ this.name = name;
+ bufferSize = EnvUtil.getInt(envPrefix + "_BUFFER_SIZE", bufferSize);
+ channelSize = EnvUtil.getInt(envPrefix + "_CHANNEL_SIZE", channelSize);
+ channels = new Channels<>(channelSize, bufferSize, new SimpleRollingPartitioner<T>(), strategy);
+ }
+
+ public DataCarrier(int channelSize, int bufferSize, BufferStrategy strategy) {
+ this("DEFAULT", "DEFAULT", channelSize, bufferSize, strategy);
+ }
+
+ /**
+ * set a new IDataPartitioner. It will cover the current one or default one.(Default is {@link
+ * SimpleRollingPartitioner}
+ *
+ * @param dataPartitioner to partition data into different channel by some rules.
+ * @return DataCarrier instance for chain
+ */
+ public DataCarrier setPartitioner(IDataPartitioner<T> dataPartitioner) {
+ this.channels.setPartitioner(dataPartitioner);
+ return this;
+ }
+
+ /**
+ * produce data to buffer, using the given {@link BufferStrategy}.
+ *
+ * @return false means produce data failure. The data will not be consumed.
+ */
+ public boolean produce(T data) {
+ if (driver != null) {
+ if (!driver.isRunning(channels)) {
+ return false;
+ }
+ }
+
+ return this.channels.save(data);
+ }
+
+ /**
+ * set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work.
+ *
+ * @param consumerClass class of consumer
+ * @param num number of consumer threads
+ * @param properties for initializing consumer.
+ */
+ public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass,
+ int num,
+ long consumeCycle,
+ Properties properties) {
+ if (driver != null) {
+ driver.close(channels);
+ }
+ driver = new ConsumeDriver<T>(this.name, this.channels, consumerClass, num, consumeCycle, properties);
+ driver.begin(channels);
+ return this;
+ }
+
+ /**
+ * set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work with 20
+ * millis consume cycle.
+ *
+ * @param consumerClass class of consumer
+ * @param num number of consumer threads
+ */
+ public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass, int num) {
+ return this.consume(consumerClass, num, 20, new Properties());
+ }
+
+ /**
+ * set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work.
+ *
+ * @param consumer single instance of consumer, all consumer threads will all use this instance.
+ * @param num number of consumer threads
+ */
+ public DataCarrier consume(IConsumer<T> consumer, int num, long consumeCycle) {
+ if (driver != null) {
+ driver.close(channels);
+ }
+ driver = new ConsumeDriver<T>(this.name, this.channels, consumer, num, consumeCycle);
+ driver.begin(channels);
+ return this;
+ }
+
+ /**
+ * set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work with 20
+ * millis consume cycle.
+ *
+ * @param consumer single instance of consumer, all consumer threads will all use this instance.
+ * @param num number of consumer threads
+ */
+ public DataCarrier consume(IConsumer<T> consumer, int num) {
+ return this.consume(consumer, num, 20);
+ }
+
+ /**
+ * Set a consumer pool to manage the channels of this DataCarrier. Then consumerPool could use its own consuming
+ * model to adjust the consumer thread and throughput.
+ */
+ public DataCarrier consume(ConsumerPool consumerPool, IConsumer<T> consumer) {
+ driver = consumerPool;
+ consumerPool.add(this.name, channels, consumer);
+ driver.begin(channels);
+ return this;
+ }
+
+ /**
+ * shutdown all consumer threads, if consumer threads are running. Notice {@link BufferStrategy}: if {@link
+ * BufferStrategy} == {@link BufferStrategy#BLOCKING}, shutdown consumeDriver maybe cause blocking when producing.
+ * Better way to change consumeDriver are use {@link DataCarrier#consume}
+ */
+ public void shutdownConsumers() {
+ if (driver != null) {
+ driver.close(channels);
+ }
+ }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/EnvUtil.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/EnvUtil.java
new file mode 100644
index 0000000..36f0cb6
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/EnvUtil.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier;
+
+/**
+ * Read value from system env.
+ */
+public class EnvUtil {
+ public static int getInt(String envName, int defaultValue) {
+ int value = defaultValue;
+ String envValue = System.getenv(envName);
+ if (envValue != null) {
+ try {
+ value = Integer.parseInt(envValue);
+ } catch (NumberFormatException e) {
+
+ }
+ }
+ return value;
+ }
+
+ public static long getLong(String envName, long defaultValue) {
+ long value = defaultValue;
+ String envValue = System.getenv(envName);
+ if (envValue != null) {
+ try {
+ value = Long.parseLong(envValue);
+ } catch (NumberFormatException e) {
+
+ }
+ }
+ return value;
+ }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/ArrayBlockingQueueBuffer.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/ArrayBlockingQueueBuffer.java
new file mode 100644
index 0000000..feb2a99
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/ArrayBlockingQueueBuffer.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.buffer;
+
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+
+/**
+ * The buffer implementation based on JDK ArrayBlockingQueue.
+ * <p>
+ * This implementation has better performance in server side. We are still trying to research whether this is suitable
+ * for agent side, which is more sensitive about blocks.
+ */
+public class ArrayBlockingQueueBuffer<T> implements QueueBuffer<T> {
+ private BufferStrategy strategy;
+ private ArrayBlockingQueue<T> queue;
+ private int bufferSize;
+
+ ArrayBlockingQueueBuffer(int bufferSize, BufferStrategy strategy) {
+ this.strategy = strategy;
+ this.queue = new ArrayBlockingQueue<T>(bufferSize);
+ this.bufferSize = bufferSize;
+ }
+
+ @Override
+ public boolean save(T data) {
+ //only BufferStrategy.BLOCKING
+ try {
+ queue.put(data);
+ } catch (InterruptedException e) {
+ // Ignore the error
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void setStrategy(BufferStrategy strategy) {
+ this.strategy = strategy;
+ }
+
+ @Override
+ public void obtain(List<T> consumeList) {
+ queue.drainTo(consumeList);
+ }
+
+ @Override
+ public int getBufferSize() {
+ return bufferSize;
+ }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/Buffer.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/Buffer.java
new file mode 100644
index 0000000..38e92af
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/Buffer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.buffer;
+
+import java.util.List;
+import org.apache.skywalking.banyandb.commons.datacarrier.common.AtomicRangeInteger;
+
+/**
+ * Self implementation ring queue.
+ */
+public class Buffer<T> implements QueueBuffer<T> {
+ private final Object[] buffer;
+ private BufferStrategy strategy;
+ private AtomicRangeInteger index;
+
+ Buffer(int bufferSize, BufferStrategy strategy) {
+ buffer = new Object[bufferSize];
+ this.strategy = strategy;
+ index = new AtomicRangeInteger(0, bufferSize);
+ }
+
+ @Override
+ public void setStrategy(BufferStrategy strategy) {
+ this.strategy = strategy;
+ }
+
+ @Override
+ public boolean save(T data) {
+ int i = index.getAndIncrement();
+ if (buffer[i] != null) {
+ switch (strategy) {
+ case IF_POSSIBLE:
+ return false;
+ default:
+ }
+ }
+ buffer[i] = data;
+ return true;
+ }
+
+ @Override
+ public int getBufferSize() {
+ return buffer.length;
+ }
+
+ @Override
+ public void obtain(List<T> consumeList) {
+ this.obtain(consumeList, 0, buffer.length);
+ }
+
+ void obtain(List<T> consumeList, int start, int end) {
+ for (int i = start; i < end; i++) {
+ if (buffer[i] != null) {
+ consumeList.add((T) buffer[i]);
+ buffer[i] = null;
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/BufferStrategy.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/BufferStrategy.java
new file mode 100644
index 0000000..9dbc556
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/BufferStrategy.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.buffer;
+
+public enum BufferStrategy {
+ BLOCKING, IF_POSSIBLE
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/Channels.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/Channels.java
new file mode 100644
index 0000000..46ce98f
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/Channels.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.buffer;
+
+import org.apache.skywalking.banyandb.commons.datacarrier.partition.IDataPartitioner;
+
+/**
+ * Channels of Buffer It contains all buffer data which belongs to this channel. It supports several strategy when
+ * buffer is full. The Default is BLOCKING <p> Created by wusheng on 2016/10/25.
+ */
+public class Channels<T> {
+ private final QueueBuffer<T>[] bufferChannels;
+ private IDataPartitioner<T> dataPartitioner;
+ private final BufferStrategy strategy;
+ private final long size;
+
+ public Channels(int channelSize, int bufferSize, IDataPartitioner<T> partitioner, BufferStrategy strategy) {
+ this.dataPartitioner = partitioner;
+ this.strategy = strategy;
+ bufferChannels = new QueueBuffer[channelSize];
+ for (int i = 0; i < channelSize; i++) {
+ if (BufferStrategy.BLOCKING.equals(strategy)) {
+ bufferChannels[i] = new ArrayBlockingQueueBuffer<>(bufferSize, strategy);
+ } else {
+ bufferChannels[i] = new Buffer<>(bufferSize, strategy);
+ }
+ }
+ // noinspection PointlessArithmeticExpression
+ size = 1L * channelSize * bufferSize; // it's not pointless, it prevents numeric overflow before assigning an integer to a long
+ }
+
+ public boolean save(T data) {
+ int index = dataPartitioner.partition(bufferChannels.length, data);
+ int retryCountDown = 1;
+ if (BufferStrategy.IF_POSSIBLE.equals(strategy)) {
+ int maxRetryCount = dataPartitioner.maxRetryCount();
+ if (maxRetryCount > 1) {
+ retryCountDown = maxRetryCount;
+ }
+ }
+ for (; retryCountDown > 0; retryCountDown--) {
+ if (bufferChannels[index].save(data)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public void setPartitioner(IDataPartitioner<T> dataPartitioner) {
+ this.dataPartitioner = dataPartitioner;
+ }
+
+ /**
+ * override the strategy at runtime. Notice, this will override several channels one by one. So, when running
+ * setStrategy, each channel may use different BufferStrategy
+ */
+ public void setStrategy(BufferStrategy strategy) {
+ for (QueueBuffer<T> buffer : bufferChannels) {
+ buffer.setStrategy(strategy);
+ }
+ }
+
+ /**
+ * get channelSize
+ */
+ public int getChannelSize() {
+ return this.bufferChannels.length;
+ }
+
+ public long size() {
+ return size;
+ }
+
+ public QueueBuffer<T> getBuffer(int index) {
+ return this.bufferChannels[index];
+ }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/QueueBuffer.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/QueueBuffer.java
new file mode 100644
index 0000000..4baf83c
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/buffer/QueueBuffer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.buffer;
+
+import java.util.List;
+
+/**
+ * Queue buffer interface.
+ */
+public interface QueueBuffer<T> {
+ /**
+ * Save data into the queue;
+ *
+ * @param data to add.
+ * @return true if saved
+ */
+ boolean save(T data);
+
+ /**
+ * Set different strategy when queue is full.
+ */
+ void setStrategy(BufferStrategy strategy);
+
+ /**
+ * Obtain the existing data from the queue
+ */
+ void obtain(List<T> consumeList);
+
+ int getBufferSize();
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/common/AtomicRangeInteger.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/common/AtomicRangeInteger.java
new file mode 100644
index 0000000..cb2b4be
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/common/AtomicRangeInteger.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.common;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicIntegerArray;
+
+public class AtomicRangeInteger extends Number implements Serializable {
+ private static final long serialVersionUID = -4099792402691141643L;
+ private AtomicIntegerArray values;
+
+ private static final int VALUE_OFFSET = 15;
+
+ private int startValue;
+ private int endValue;
+
+ public AtomicRangeInteger(int startValue, int maxValue) {
+ this.values = new AtomicIntegerArray(31);
+ this.values.set(VALUE_OFFSET, startValue);
+ this.startValue = startValue;
+ this.endValue = maxValue - 1;
+ }
+
+ public final int getAndIncrement() {
+ int next;
+ do {
+ next = this.values.incrementAndGet(VALUE_OFFSET);
+ if (next > endValue && this.values.compareAndSet(VALUE_OFFSET, next, startValue)) {
+ return endValue;
+ }
+ }
+ while (next > endValue);
+
+ return next - 1;
+ }
+
+ public final int get() {
+ return this.values.get(VALUE_OFFSET);
+ }
+
+ @Override
+ public int intValue() {
+ return this.values.get(VALUE_OFFSET);
+ }
+
+ @Override
+ public long longValue() {
+ return this.values.get(VALUE_OFFSET);
+ }
+
+ @Override
+ public float floatValue() {
+ return this.values.get(VALUE_OFFSET);
+ }
+
+ @Override
+ public double doubleValue() {
+ return this.values.get(VALUE_OFFSET);
+ }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/BulkConsumePool.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/BulkConsumePool.java
new file mode 100644
index 0000000..fd45499
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/BulkConsumePool.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import org.apache.skywalking.banyandb.commons.datacarrier.EnvUtil;
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Channels;
+
+/**
+ * BulkConsumePool works for consuming data from multiple channels(DataCarrier instances), with multiple {@link
+ * MultipleChannelsConsumer}s.
+ * <p>
+ * In typical case, the number of {@link MultipleChannelsConsumer} should be less than the number of channels.
+ */
+public class BulkConsumePool implements ConsumerPool {
+ private List<MultipleChannelsConsumer> allConsumers;
+ private volatile boolean isStarted = false;
+
+ public BulkConsumePool(String name, int size, long consumeCycle) {
+ size = EnvUtil.getInt(name + "_THREAD", size);
+ allConsumers = new ArrayList<MultipleChannelsConsumer>(size);
+ for (int i = 0; i < size; i++) {
+ MultipleChannelsConsumer multipleChannelsConsumer = new MultipleChannelsConsumer("DataCarrier." + name + ".BulkConsumePool." + i + ".Thread", consumeCycle);
+ multipleChannelsConsumer.setDaemon(true);
+ allConsumers.add(multipleChannelsConsumer);
+ }
+ }
+
+ @Override
+ synchronized public void add(String name, Channels channels, IConsumer consumer) {
+ MultipleChannelsConsumer multipleChannelsConsumer = getLowestPayload();
+ multipleChannelsConsumer.addNewTarget(channels, consumer);
+ }
+
+ /**
+ * Get the lowest payload consumer thread based on current allocate status.
+ *
+ * @return the lowest consumer.
+ */
+ private MultipleChannelsConsumer getLowestPayload() {
+ MultipleChannelsConsumer winner = allConsumers.get(0);
+ for (int i = 1; i < allConsumers.size(); i++) {
+ MultipleChannelsConsumer option = allConsumers.get(i);
+ if (option.size() < winner.size()) {
+ winner = option;
+ }
+ }
+ return winner;
+ }
+
+ /**
+ *
+ */
+ @Override
+ public boolean isRunning(Channels channels) {
+ return isStarted;
+ }
+
+ @Override
+ public void close(Channels channels) {
+ for (MultipleChannelsConsumer consumer : allConsumers) {
+ consumer.shutdown();
+ }
+ }
+
+ @Override
+ public void begin(Channels channels) {
+ if (isStarted) {
+ return;
+ }
+ for (MultipleChannelsConsumer consumer : allConsumers) {
+ consumer.start();
+ }
+ isStarted = true;
+ }
+
+ /**
+ * The creator for {@link BulkConsumePool}.
+ */
+ public static class Creator implements Callable<ConsumerPool> {
+ private String name;
+ private int size;
+ private long consumeCycle;
+
+ public Creator(String name, int poolSize, long consumeCycle) {
+ this.name = name;
+ this.size = poolSize;
+ this.consumeCycle = consumeCycle;
+ }
+
+ @Override
+ public ConsumerPool call() {
+ return new BulkConsumePool(name, size, consumeCycle);
+ }
+
+ public static int recommendMaxSize() {
+ return Runtime.getRuntime().availableProcessors() * 2;
+ }
+ }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumeDriver.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumeDriver.java
new file mode 100644
index 0000000..4535a95
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumeDriver.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Properties;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Channels;
+
+/**
+ * Pool of consumers <p> Created by wusheng on 2016/10/25.
+ */
+public class ConsumeDriver<T> implements IDriver {
+ private boolean running;
+ private ConsumerThread[] consumerThreads;
+ private Channels<T> channels;
+ private ReentrantLock lock;
+
+ public ConsumeDriver(String name,
+ Channels<T> channels, Class<? extends IConsumer<T>> consumerClass,
+ int num,
+ long consumeCycle,
+ Properties properties) {
+ this(channels, num);
+ for (int i = 0; i < num; i++) {
+ consumerThreads[i] = new ConsumerThread(
+ "DataCarrier." + name + ".Consumer." + i + ".Thread", getNewConsumerInstance(consumerClass, properties),
+ consumeCycle
+ );
+ consumerThreads[i].setDaemon(true);
+ }
+ }
+
+ public ConsumeDriver(String name, Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) {
+ this(channels, num);
+ prototype.init(new Properties());
+ for (int i = 0; i < num; i++) {
+ consumerThreads[i] = new ConsumerThread(
+ "DataCarrier." + name + ".Consumer." + i + ".Thread", prototype, consumeCycle);
+ consumerThreads[i].setDaemon(true);
+ }
+
+ }
+
+ private ConsumeDriver(Channels<T> channels, int num) {
+ running = false;
+ this.channels = channels;
+ consumerThreads = new ConsumerThread[num];
+ lock = new ReentrantLock();
+ }
+
+ private IConsumer<T> getNewConsumerInstance(Class<? extends IConsumer<T>> consumerClass, Properties properties) {
+ try {
+ IConsumer<T> inst = consumerClass.getDeclaredConstructor().newInstance();
+ inst.init(properties);
+ return inst;
+ } catch (InstantiationException e) {
+ throw new ConsumerCannotBeCreatedException(e);
+ } catch (IllegalAccessException e) {
+ throw new ConsumerCannotBeCreatedException(e);
+ } catch (NoSuchMethodException e) {
+ throw new ConsumerCannotBeCreatedException(e);
+ } catch (InvocationTargetException e) {
+ throw new ConsumerCannotBeCreatedException(e);
+ }
+ }
+
+ @Override
+ public void begin(Channels channels) {
+ if (running) {
+ return;
+ }
+ lock.lock();
+ try {
+ this.allocateBuffer2Thread();
+ for (ConsumerThread consumerThread : consumerThreads) {
+ consumerThread.start();
+ }
+ running = true;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public boolean isRunning(Channels channels) {
+ return running;
+ }
+
+ private void allocateBuffer2Thread() {
+ int channelSize = this.channels.getChannelSize();
+ /**
+ * if consumerThreads.length < channelSize
+ * each consumer will process several channels.
+ *
+ * if consumerThreads.length == channelSize
+ * each consumer will process one channel.
+ *
+ * if consumerThreads.length > channelSize
+ * there will be some threads do nothing.
+ */
+ for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {
+ int consumerIndex = channelIndex % consumerThreads.length;
+ consumerThreads[consumerIndex].addDataSource(channels.getBuffer(channelIndex));
+ }
+
+ }
+
+ @Override
+ public void close(Channels channels) {
+ lock.lock();
+ try {
+ this.running = false;
+ for (ConsumerThread consumerThread : consumerThreads) {
+ consumerThread.shutdown();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerCannotBeCreatedException.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerCannotBeCreatedException.java
new file mode 100644
index 0000000..0d0c8de
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerCannotBeCreatedException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
+
+public class ConsumerCannotBeCreatedException extends RuntimeException {
+ ConsumerCannotBeCreatedException(Throwable t) {
+ super(t);
+ }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerPool.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerPool.java
new file mode 100644
index 0000000..fd93dfb
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerPool.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
+
+import org.apache.skywalking.banyandb.commons.datacarrier.DataCarrier;
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Channels;
+
+/**
+ * The Consumer pool could support data consumer from multiple {@link DataCarrier}s, by using different consume thread
+ * management models.
+ */
+public interface ConsumerPool extends IDriver {
+ void add(String name, Channels channels, IConsumer consumer);
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerPoolFactory.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerPoolFactory.java
new file mode 100644
index 0000000..8eb1581
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerPoolFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+/**
+ * Consumer Pool Factory provides global management for all Consumer Pool.
+ */
+public enum ConsumerPoolFactory {
+ INSTANCE;
+
+ private final Map<String, ConsumerPool> pools;
+
+ ConsumerPoolFactory() {
+ pools = new HashMap<>();
+ }
+
+ public synchronized boolean createIfAbsent(String poolName, Callable<ConsumerPool> creator) throws Exception {
+ if (pools.containsKey(poolName)) {
+ return false;
+ } else {
+ pools.put(poolName, creator.call());
+ return true;
+ }
+ }
+
+ public ConsumerPool get(String poolName) {
+ return pools.get(poolName);
+ }
+
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerThread.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerThread.java
new file mode 100644
index 0000000..b5f5478
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/ConsumerThread.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Buffer;
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.QueueBuffer;
+
+public class ConsumerThread<T> extends Thread {
+ private volatile boolean running;
+ private IConsumer<T> consumer;
+ private List<DataSource> dataSources;
+ private long consumeCycle;
+
+ ConsumerThread(String threadName, IConsumer<T> consumer, long consumeCycle) {
+ super(threadName);
+ this.consumer = consumer;
+ running = false;
+ dataSources = new ArrayList<DataSource>(1);
+ this.consumeCycle = consumeCycle;
+ }
+
+ /**
+ * add whole buffer to consume
+ */
+ void addDataSource(QueueBuffer<T> sourceBuffer) {
+ this.dataSources.add(new DataSource(sourceBuffer));
+ }
+
+ @Override
+ public void run() {
+ running = true;
+
+ final List<T> consumeList = new ArrayList<T>(1500);
+ while (running) {
+ if (!consume(consumeList)) {
+ try {
+ Thread.sleep(consumeCycle);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ // consumer thread is going to stop
+ // consume the last time
+ consume(consumeList);
+
+ consumer.onExit();
+ }
+
+ private boolean consume(List<T> consumeList) {
+ for (DataSource dataSource : dataSources) {
+ dataSource.obtain(consumeList);
+ }
+
+ if (!consumeList.isEmpty()) {
+ try {
+ consumer.consume(consumeList);
+ } catch (Throwable t) {
+ consumer.onError(consumeList, t);
+ } finally {
+ consumeList.clear();
+ }
+ return true;
+ }
+ consumer.nothingToConsume();
+ return false;
+ }
+
+ void shutdown() {
+ running = false;
+ }
+
+ /**
+ * DataSource is a refer to {@link Buffer}.
+ */
+ class DataSource {
+ private QueueBuffer<T> sourceBuffer;
+
+ DataSource(QueueBuffer<T> sourceBuffer) {
+ this.sourceBuffer = sourceBuffer;
+ }
+
+ void obtain(List<T> consumeList) {
+ sourceBuffer.obtain(consumeList);
+ }
+ }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/IConsumer.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/IConsumer.java
new file mode 100644
index 0000000..9d965e1
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/IConsumer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
+
+import java.util.List;
+import java.util.Properties;
+
+public interface IConsumer<T> {
+ void init(final Properties properties);
+
+ void consume(List<T> data);
+
+ void onError(List<T> data, Throwable t);
+
+ void onExit();
+
+ /**
+ * Notify the implementation, if there is nothing fetched from the queue. This could be used as a timer to trigger
+ * reaction if the queue has no element.
+ */
+ default void nothingToConsume() {
+ return;
+ }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/IDriver.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/IDriver.java
new file mode 100644
index 0000000..7011c4e
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/IDriver.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
+
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Channels;
+
+/**
+ * The driver of consumer.
+ */
+public interface IDriver {
+ boolean isRunning(Channels channels);
+
+ void close(Channels channels);
+
+ void begin(Channels channels);
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/MultipleChannelsConsumer.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/MultipleChannelsConsumer.java
new file mode 100644
index 0000000..19969fa
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/consumer/MultipleChannelsConsumer.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.consumer;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.Channels;
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.QueueBuffer;
+
+/**
+ * MultipleChannelsConsumer represent a single consumer thread, but support multiple channels with their {@link
+ * IConsumer}s
+ */
+public class MultipleChannelsConsumer extends Thread {
+ private volatile boolean running;
+ private volatile ArrayList<Group> consumeTargets;
+ @SuppressWarnings("NonAtomicVolatileUpdate")
+ private volatile long size;
+ private final long consumeCycle;
+
+ public MultipleChannelsConsumer(String threadName, long consumeCycle) {
+ super(threadName);
+ this.consumeTargets = new ArrayList<Group>();
+ this.consumeCycle = consumeCycle;
+ }
+
+ @Override
+ public void run() {
+ running = true;
+
+ final List consumeList = new ArrayList(2000);
+ while (running) {
+ boolean hasData = false;
+ for (Group target : consumeTargets) {
+ boolean consume = consume(target, consumeList);
+ hasData = hasData || consume;
+ }
+
+ if (!hasData) {
+ try {
+ Thread.sleep(consumeCycle);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ // consumer thread is going to stop
+ // consume the last time
+ for (Group target : consumeTargets) {
+ consume(target, consumeList);
+
+ target.consumer.onExit();
+ }
+ }
+
+ private boolean consume(Group target, List consumeList) {
+ for (int i = 0; i < target.channels.getChannelSize(); i++) {
+ QueueBuffer buffer = target.channels.getBuffer(i);
+ buffer.obtain(consumeList);
+ }
+
+ if (!consumeList.isEmpty()) {
+ try {
+ target.consumer.consume(consumeList);
+ } catch (Throwable t) {
+ target.consumer.onError(consumeList, t);
+ } finally {
+ consumeList.clear();
+ }
+ return true;
+ }
+ target.consumer.nothingToConsume();
+ return false;
+ }
+
+ /**
+ * Add a new target channels.
+ */
+ public void addNewTarget(Channels channels, IConsumer consumer) {
+ Group group = new Group(channels, consumer);
+ // Recreate the new list to avoid change list while the list is used in consuming.
+ ArrayList<Group> newList = new ArrayList<Group>();
+ for (Group target : consumeTargets) {
+ newList.add(target);
+ }
+ newList.add(group);
+ consumeTargets = newList;
+ size += channels.size();
+ }
+
+ public long size() {
+ return size;
+ }
+
+ void shutdown() {
+ running = false;
+ }
+
+ private static class Group {
+ private Channels channels;
+ private IConsumer consumer;
+
+ public Group(Channels channels, IConsumer consumer) {
+ this.channels = channels;
+ this.consumer = consumer;
+ }
+ }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/IDataPartitioner.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/IDataPartitioner.java
new file mode 100644
index 0000000..4b6c576
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/IDataPartitioner.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.partition;
+
+import org.apache.skywalking.banyandb.commons.datacarrier.buffer.BufferStrategy;
+
+public interface IDataPartitioner<T> {
+ int partition(int total, T data);
+
+ /**
+ * @return an integer represents how many times should retry when {@link BufferStrategy#IF_POSSIBLE}.
+ * <p>
+ * Less or equal 1, means not support retry.
+ */
+ int maxRetryCount();
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/ProducerThreadPartitioner.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/ProducerThreadPartitioner.java
new file mode 100644
index 0000000..b83fd1f
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/ProducerThreadPartitioner.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.partition;
+
+/**
+ * use threadid % total to partition
+ */
+public class ProducerThreadPartitioner<T> implements IDataPartitioner<T> {
+ public ProducerThreadPartitioner() {
+ }
+
+ @Override
+ public int partition(int total, T data) {
+ return (int) Thread.currentThread().getId() % total;
+ }
+
+ @Override
+ public int maxRetryCount() {
+ return 1;
+ }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/SimpleRollingPartitioner.java b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/SimpleRollingPartitioner.java
new file mode 100644
index 0000000..e22f92f
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/commons/datacarrier/partition/SimpleRollingPartitioner.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.commons.datacarrier.partition;
+
+/**
+ * use normal int to rolling.
+ */
+public class SimpleRollingPartitioner<T> implements IDataPartitioner<T> {
+ @SuppressWarnings("NonAtomicVolatileUpdate")
+ private volatile int i = 0;
+
+ @Override
+ public int partition(int total, T data) {
+ return Math.abs(i++ % total);
+ }
+
+ @Override
+ public int maxRetryCount() {
+ return 3;
+ }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
new file mode 100644
index 0000000..ea12319
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.NameResolverRegistry;
+import io.grpc.internal.DnsNameResolverProvider;
+import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+import org.apache.skywalking.banyandb.v1.trace.TraceServiceGrpc;
+
+/**
+ * BanyanDBClient represents a client instance interacting with BanyanDB server. This is built on the top of BanyanDB v1
+ * gRPC APIs.
+ */
+@Slf4j
+public class BanyanDBClient implements Closeable {
+ /**
+ * The hostname of BanyanDB server.
+ */
+ private final String host;
+ /**
+ * The port of BanyanDB server.
+ */
+ private final int port;
+ /**
+ * The instance name.
+ */
+ private final String group;
+ /**
+ * Options for server connection.
+ */
+ private Options options;
+ /**
+ * Managed gRPC connection.
+ */
+ private volatile ManagedChannel managedChannel;
+ /**
+ * gRPC client stub
+ */
+ private volatile TraceServiceGrpc.TraceServiceStub traceServiceStub;
+ /**
+ * gRPC blocking stub.
+ */
+ private volatile TraceServiceGrpc.TraceServiceBlockingStub traceServiceBlockingStub;
+ /**
+ * The connection status.
+ */
+ private volatile boolean isConnected = false;
+ /**
+ * A lock to control the race condition in establishing and disconnecting network connection.
+ */
+ private volatile ReentrantLock connectionEstablishLock;
+
+ /**
+ * Create a BanyanDB client instance
+ *
+ * @param host IP or domain name
+ * @param port Server port
+ * @param group Database instance name
+ */
+ public BanyanDBClient(final String host, final int port, final String group) {
+ this(host, port, group, new Options());
+ }
+
+ /**
+ * Create a BanyanDB client instance with custom options
+ *
+ * @param host IP or domain name
+ * @param port Server port
+ * @param group Database instance name
+ * @param options for database connection
+ */
+ public BanyanDBClient(final String host,
+ final int port,
+ final String group,
+ final Options options) {
+ this.host = host;
+ this.port = port;
+ this.group = group;
+ this.options = options;
+ this.connectionEstablishLock = new ReentrantLock();
+
+ NameResolverRegistry.getDefaultRegistry().register(new DnsNameResolverProvider());
+ }
+
+ /**
+ * Connect to the server.
+ *
+ * @throws RuntimeException if server is not reachable.
+ */
+ public void connect() {
+ connectionEstablishLock.lock();
+ try {
+ if (!isConnected) {
+ final ManagedChannelBuilder<?> nettyChannelBuilder = NettyChannelBuilder.forAddress(host, port).usePlaintext();
+ nettyChannelBuilder.maxInboundMessageSize(options.getMaxInboundMessageSize());
+
+ managedChannel = nettyChannelBuilder.build();
+ traceServiceStub = TraceServiceGrpc.newStub(managedChannel);
+ traceServiceBlockingStub = TraceServiceGrpc.newBlockingStub(
+ managedChannel);
+ isConnected = true;
+ }
+ } finally {
+ connectionEstablishLock.unlock();
+ }
+ }
+
+ /**
+ * Connect to the mock server.
+ * Created for testing purpose.
+ *
+ * @param channel the channel used for communication.
+ * For tests, it is normally an in-process channel.
+ */
+ void connect(ManagedChannel channel) {
+ connectionEstablishLock.lock();
+ try {
+ if (!isConnected) {
+ traceServiceStub = TraceServiceGrpc.newStub(channel);
+ traceServiceBlockingStub = TraceServiceGrpc.newBlockingStub(
+ channel);
+ isConnected = true;
+ }
+ } finally {
+ connectionEstablishLock.unlock();
+ }
+ }
+
+ /**
+ * Create a build process for trace write.
+ *
+ * @param maxBulkSize the max bulk size for the flush operation
+ * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger
+ * automatically. Unit is second
+ * @param concurrency the number of concurrency would run for the flush max
+ * @return trace bulk write processor
+ */
+ public TraceBulkWriteProcessor buildTraceWriteProcessor(int maxBulkSize, int flushInterval, int concurrency) {
+ return new TraceBulkWriteProcessor(group, traceServiceStub, maxBulkSize, flushInterval, concurrency);
+ }
+
+ /**
+ * Query trace according to given conditions
+ *
+ * @param traceQuery condition for query
+ * @return hint traces.
+ */
+ public TraceQueryResponse queryTraces(TraceQuery traceQuery) {
+ final BanyandbTrace.QueryResponse response = traceServiceBlockingStub
+ .withDeadlineAfter(options.getDeadline(), TimeUnit.SECONDS)
+ .query(traceQuery.build(group));
+ return new TraceQueryResponse(response);
+ }
+
+ @Override
+ public void close() throws IOException {
+ connectionEstablishLock.lock();
+ try {
+ if (isConnected) {
+ this.managedChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+ isConnected = false;
+ }
+ } catch (InterruptedException interruptedException) {
+ log.warn("fail to wait for channel termination, shutdown now!", interruptedException);
+ this.managedChannel.shutdownNow();
+ isConnected = false;
+ } finally {
+ connectionEstablishLock.unlock();
+ }
+ }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java
new file mode 100644
index 0000000..2fda151
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.skywalking.banyandb.commons.datacarrier.DataCarrier;
+import org.apache.skywalking.banyandb.commons.datacarrier.consumer.IConsumer;
+
+/**
+ * BulkWriteProcessor is a timeline and size dual driven processor.
+ * <p>
+ * It includes an internal queue and timer, and accept the data sequentially. With the given thresholds of time and
+ * size, it could activate flush to continue the process to the next step.
+ */
+public abstract class BulkWriteProcessor implements Closeable {
+ protected final int flushInterval;
+ protected DataCarrier buffer;
+
+ /**
+ * Create the processor.
+ *
+ * @param maxBulkSize the max bulk size for the flush operation
+ * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger
+ * automatically. Unit is second.
+ * @param concurrency the number of concurrency would run for the flush max.
+ */
+ protected BulkWriteProcessor(String processorName, int maxBulkSize, int flushInterval, int concurrency) {
+ this.flushInterval = flushInterval;
+ this.buffer = new DataCarrier(processorName, maxBulkSize, concurrency);
+ Properties properties = new Properties();
+ properties.put("maxBulkSize", maxBulkSize);
+ properties.put("flushInterval", flushInterval);
+ properties.put("BulkWriteProcessor", this);
+ buffer.consume(QueueWatcher.class, concurrency, 20, properties);
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.buffer.shutdownConsumers();
+ }
+
+ /**
+ * The internal queue consumer for build process.
+ */
+ public static class QueueWatcher implements IConsumer {
+ private long lastFlushTimestamp;
+ private int maxBulkSize;
+ private int flushInterval;
+ private List cachedData;
+ private BulkWriteProcessor bulkWriteProcessor;
+
+ public QueueWatcher() {
+ }
+
+ @Override
+ public void init(Properties properties) {
+ lastFlushTimestamp = System.currentTimeMillis();
+ maxBulkSize = (Integer) properties.get("maxBulkSize");
+ flushInterval = (Integer) properties.get("flushInterval") * 1000;
+ cachedData = new ArrayList(maxBulkSize);
+ bulkWriteProcessor = (BulkWriteProcessor) properties.get("BulkWriteProcessor");
+ }
+
+ @Override
+ public void consume(final List data) {
+ if (data.size() >= maxBulkSize) {
+ // The data#size actually wouldn't over the maxBulkSize due to the DataCarrier channel's max size.
+ // This is just to preventing unexpected case and avoid confusion about dropping into else section.
+ bulkWriteProcessor.flush(data);
+ lastFlushTimestamp = System.currentTimeMillis();
+ } else {
+ data.forEach(element -> {
+ cachedData.add(element);
+ if (cachedData.size() >= maxBulkSize) {
+ // Flush and re-init.
+ bulkWriteProcessor.flush(cachedData);
+ cachedData = new ArrayList(maxBulkSize);
+ lastFlushTimestamp = System.currentTimeMillis();
+ }
+ });
+ }
+ }
+
+ @Override
+ public void onError(final List data, final Throwable t) {
+
+ }
+
+ @Override
+ public void onExit() {
+
+ }
+
+ @Override
+ public void nothingToConsume() {
+ if (System.currentTimeMillis() - lastFlushTimestamp > flushInterval) {
+ bulkWriteProcessor.flush(cachedData);
+ cachedData = new ArrayList(maxBulkSize);
+ lastFlushTimestamp = System.currentTimeMillis();
+ }
+ }
+ }
+
+ /**
+ * @param data to be flush.
+ */
+ protected abstract void flush(List data);
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/Field.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/Field.java
new file mode 100644
index 0000000..cf2c04b
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/Field.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import java.util.List;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+
+import static com.google.protobuf.NullValue.NULL_VALUE;
+
+/**
+ * Field represents a value of column/field in the write-op or response.
+ */
+@EqualsAndHashCode
+public abstract class Field<T> {
+ @Getter
+ protected final T value;
+
+ protected Field(T value) {
+ this.value = value;
+ }
+
+ /**
+ * NullField is a value which can be converted to {@link com.google.protobuf.NullValue}.
+ * Users should use the singleton instead of create a new instance everytime.
+ */
+ public static class NullField extends Field<Object> implements SerializableField {
+ private static final NullField INSTANCE = new NullField();
+
+ private NullField() {
+ super(null);
+ }
+
+ @Override
+ public Banyandb.Field toField() {
+ return Banyandb.Field.newBuilder().setNull(NULL_VALUE).build();
+ }
+ }
+
+ /**
+ * The value of a String type field.
+ */
+ public static class StringField extends Field<String> implements SerializableField {
+ private StringField(String value) {
+ super(value);
+ }
+
+ @Override
+ public Banyandb.Field toField() {
+ return Banyandb.Field.newBuilder().setStr(Banyandb.Str.newBuilder().setValue(value)).build();
+ }
+ }
+
+ /**
+ * The value of a String array type field.
+ */
+ public static class StringArrayField extends Field<List<String>> implements SerializableField {
+ private StringArrayField(List<String> value) {
+ super(value);
+ }
+
+ @Override
+ public Banyandb.Field toField() {
+ return Banyandb.Field.newBuilder().setStrArray(Banyandb.StrArray.newBuilder().addAllValue(value)).build();
+ }
+ }
+
+ /**
+ * The value of an int64(Long) type field.
+ */
+ public static class LongField extends Field<Long> implements SerializableField {
+ private LongField(Long value) {
+ super(value);
+ }
+
+ @Override
+ public Banyandb.Field toField() {
+ return Banyandb.Field.newBuilder().setInt(Banyandb.Int.newBuilder().setValue(value)).build();
+ }
+ }
+
+ /**
+ * The value of an int64(Long) array type field.
+ */
+ public static class LongArrayField extends Field<List<Long>> implements SerializableField {
+ private LongArrayField(List<Long> value) {
+ super(value);
+ }
+
+ @Override
+ public Banyandb.Field toField() {
+ return Banyandb.Field.newBuilder().setIntArray(Banyandb.IntArray.newBuilder().addAllValue(value)).build();
+ }
+ }
+
+ /**
+ * Construct a string field
+ *
+ * @param val payload
+ * @return Anonymous field with String payload
+ */
+ public static SerializableField stringField(String val) {
+ return new StringField(val);
+ }
+
+ /**
+ * Construct a numeric field
+ *
+ * @param val payload
+ * @return Anonymous field with numeric payload
+ */
+ public static SerializableField longField(long val) {
+ return new LongField(val);
+ }
+
+ /**
+ * Construct a string array field
+ *
+ * @param val payload
+ * @return Anonymous field with string array payload
+ */
+ public static SerializableField stringArrayField(List<String> val) {
+ return new StringArrayField(val);
+ }
+
+ /**
+ * Construct a long array field
+ *
+ * @param val payload
+ * @return Anonymous field with numeric array payload
+ */
+ public static SerializableField longArrayField(List<Long> val) {
+ return new LongArrayField(val);
+ }
+
+ public static SerializableField nullField() {
+ return NullField.INSTANCE;
+ }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/FieldAndValue.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/FieldAndValue.java
new file mode 100644
index 0000000..5182544
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/FieldAndValue.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import java.util.List;
+
+import lombok.EqualsAndHashCode;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+
+/**
+ * FieldAndValue represents a value of column in the response
+ */
+@EqualsAndHashCode(callSuper = true)
+public abstract class FieldAndValue<T> extends Field<T> {
+ protected final String fieldName;
+
+ protected FieldAndValue(String fieldName, T value) {
+ super(value);
+ this.fieldName = fieldName;
+ }
+
+ /**
+ * @return field name
+ */
+ public String getFieldName() {
+ return this.fieldName;
+ }
+
+ /**
+ * @return true if value is null;
+ */
+ public boolean isNull() {
+ return this.value == null;
+ }
+
+ static FieldAndValue<?> build(Banyandb.TypedPair typedPair) {
+ if (typedPair.hasNullPair()) {
+ switch (typedPair.getNullPair().getType()) {
+ case FIELD_TYPE_INT:
+ return new LongFieldPair(typedPair.getKey(), null);
+ case FIELD_TYPE_INT_ARRAY:
+ return new LongArrayFieldPair(typedPair.getKey(), null);
+ case FIELD_TYPE_STRING:
+ return new StringFieldPair(typedPair.getKey(), null);
+ case FIELD_TYPE_STRING_ARRAY:
+ return new StringArrayFieldPair(typedPair.getKey(), null);
+ default:
+ throw new IllegalArgumentException("Unrecognized NullType, " + typedPair.getNullPair().getType());
+ }
+ } else if (typedPair.hasIntPair()) {
+ return new LongFieldPair(typedPair.getKey(), typedPair.getIntPair().getValue());
+ } else if (typedPair.hasStrPair()) {
+ return new StringFieldPair(typedPair.getKey(), typedPair.getStrPair().getValue());
+ } else if (typedPair.hasIntArrayPair()) {
+ return new LongArrayFieldPair(typedPair.getKey(), typedPair.getIntArrayPair().getValueList());
+ } else if (typedPair.hasStrArrayPair()) {
+ return new StringArrayFieldPair(typedPair.getKey(), typedPair.getStrArrayPair().getValueList());
+ }
+ throw new IllegalArgumentException("Unrecognized TypedPair, " + typedPair);
+ }
+
+ public static class StringFieldPair extends FieldAndValue<String> {
+ StringFieldPair(final String fieldName, final String value) {
+ super(fieldName, value);
+ }
+ }
+
+ public static class StringArrayFieldPair extends FieldAndValue<List<String>> {
+ StringArrayFieldPair(final String fieldName, final List<String> value) {
+ super(fieldName, value);
+ }
+ }
+
+ public static class LongFieldPair extends FieldAndValue<Long> {
+ LongFieldPair(final String fieldName, final Long value) {
+ super(fieldName, value);
+ }
+ }
+
+ public static class LongArrayFieldPair extends FieldAndValue<List<Long>> {
+ LongArrayFieldPair(final String fieldName, final List<Long> value) {
+ super(fieldName, value);
+ }
+ }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
new file mode 100644
index 0000000..89b8038
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * Client connection options.
+ */
+@Setter
+@Getter(AccessLevel.PACKAGE)
+public class Options {
+ /**
+ * Max inbound message size
+ */
+ private int maxInboundMessageSize = 1024 * 1024 * 50;
+ /**
+ * Threshold of gRPC blocking query, unit is second
+ */
+ private int deadline = 30;
+
+ Options() {
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java
new file mode 100644
index 0000000..78f714f
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import org.apache.skywalking.banyandb.v1.Banyandb;
+
+import java.util.List;
+
+/**
+ * PairQuery represents a query condition, including field name, operator, and value(s);
+ */
+public abstract class PairQueryCondition<T> extends FieldAndValue<T> {
+ protected final Banyandb.PairQuery.BinaryOp op;
+
+ private PairQueryCondition(String fieldName, Banyandb.PairQuery.BinaryOp op, T value) {
+ super(fieldName, value);
+ this.op = op;
+ }
+
+ Banyandb.PairQuery build() {
+ return Banyandb.PairQuery.newBuilder()
+ .setOp(this.op)
+ .setCondition(buildTypedPair()).build();
+ }
+
+ /**
+ * The various implementations should build different TypedPair
+ *
+ * @return typedPair to be included
+ */
+ protected abstract Banyandb.TypedPair buildTypedPair();
+
+ /**
+ * LongQueryCondition represents `Field(Long) $op value` condition.
+ */
+ public static class LongQueryCondition extends PairQueryCondition<Long> {
+ private LongQueryCondition(String fieldName, Banyandb.PairQuery.BinaryOp op, Long value) {
+ super(fieldName, op, value);
+ }
+
+ @Override
+ protected Banyandb.TypedPair buildTypedPair() {
+ return Banyandb.TypedPair.newBuilder()
+ .setKey(fieldName)
+ .setIntPair(Banyandb.Int.newBuilder()
+ .setValue(value)).build();
+ }
+
+ /**
+ * Build a query condition for {@link Long} type
+ * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_EQ} as the relation
+ *
+ * @param fieldName name of the field
+ * @param val value of the field
+ * @return a query that `Long == value`
+ */
+ public static PairQueryCondition<Long> eq(String fieldName, Long val) {
+ return new LongQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_EQ, val);
+ }
+
+ /**
+ * Build a query condition for {@link Long} type
+ * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_NE} as the relation
+ *
+ * @param fieldName name of the field
+ * @param val value of the field
+ * @return a query that `Long != value`
+ */
+ public static PairQueryCondition<Long> ne(String fieldName, Long val) {
+ return new LongQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_NE, val);
+ }
+
+ /**
+ * Build a query condition for {@link Long} type
+ * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_GT} as the relation
+ *
+ * @param fieldName name of the field
+ * @param val value of the field
+ * @return a query that `Long > value`
+ */
+ public static PairQueryCondition<Long> gt(String fieldName, Long val) {
+ return new LongQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_GT, val);
+ }
+
+ /**
+ * Build a query condition for {@link Long} type
+ * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_GE} as the relation
+ *
+ * @param fieldName name of the field
+ * @param val value of the field
+ * @return a query that `Long ≥ value`
+ */
+ public static PairQueryCondition<Long> ge(String fieldName, Long val) {
+ return new LongQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_GE, val);
+ }
+
+ /**
+ * Build a query condition for {@link Long} type
+ * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_LT} as the relation
+ *
+ * @param fieldName name of the field
+ * @param val value of the field
+ * @return a query that `Long < value`
+ */
+ public static PairQueryCondition<Long> lt(String fieldName, Long val) {
+ return new LongQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_LT, val);
+ }
+
+ /**
+ * Build a query condition for {@link Long} type
+ * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_LE} as the relation
+ *
+ * @param fieldName name of the field
+ * @param val value of the field
+ * @return a query that `Long ≤ value`
+ */
+ public static PairQueryCondition<Long> le(String fieldName, Long val) {
+ return new LongQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_LE, val);
+ }
+ }
+
+ /**
+ * StringQueryCondition represents `Field(String) $op value` condition.
+ */
+ public static class StringQueryCondition extends PairQueryCondition<String> {
+ private StringQueryCondition(String fieldName, Banyandb.PairQuery.BinaryOp op, String value) {
+ super(fieldName, op, value);
+ }
+
+ @Override
+ protected Banyandb.TypedPair buildTypedPair() {
+ return Banyandb.TypedPair.newBuilder()
+ .setKey(fieldName)
+ .setStrPair(Banyandb.Str.newBuilder().setValue(value)).build();
+ }
+
+ /**
+ * Build a query condition for {@link String} type
+ * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_EQ} as the relation
+ *
+ * @param fieldName name of the field
+ * @param val value of the field
+ * @return a query that `String == value`
+ */
+ public static PairQueryCondition<String> eq(String fieldName, String val) {
+ return new StringQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_EQ, val);
+ }
+
+ /**
+ * Build a query condition for {@link String} type
+ * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_NE} as the relation
+ *
+ * @param fieldName name of the field
+ * @param val value of the field
+ * @return a query that `String != value`
+ */
+ public static PairQueryCondition<String> ne(String fieldName, String val) {
+ return new StringQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_NE, val);
+ }
+ }
+
+ /**
+ * StringArrayQueryCondition represents `Field(List of String) $op value` condition.
+ */
+ public static class StringArrayQueryCondition extends PairQueryCondition<List<String>> {
+ private StringArrayQueryCondition(String fieldName, Banyandb.PairQuery.BinaryOp op, List<String> value) {
+ super(fieldName, op, value);
+ }
+
+ @Override
+ protected Banyandb.TypedPair buildTypedPair() {
+ return Banyandb.TypedPair.newBuilder()
+ .setKey(fieldName)
+ .setStrArrayPair(Banyandb.StrArray.newBuilder()
+ .addAllValue(value)).build();
+ }
+
+ /**
+ * Build a query condition for {@link List} of {@link String} as the type
+ * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_EQ} as the relation
+ *
+ * @param fieldName name of the field
+ * @param val value of the field
+ * @return a query that `[String] == values`
+ */
+ public static PairQueryCondition<List<String>> eq(String fieldName, List<String> val) {
+ return new StringArrayQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_EQ, val);
+ }
+
+ /**
+ * Build a query condition for {@link List} of {@link String} as the type
+ * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_NE} as the relation
+ *
+ * @param fieldName name of the field
+ * @param val value of the field
+ * @return a query that `[String] != values`
+ */
+ public static PairQueryCondition<List<String>> ne(String fieldName, List<String> val) {
+ return new StringArrayQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_NE, val);
+ }
+
+ /**
+ * Build a query condition for {@link List} of {@link String} as the type
+ * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_HAVING} as the relation
+ *
+ * @param fieldName name of the field
+ * @param val value of the field
+ * @return a query that `[String] having values`
+ */
+ public static PairQueryCondition<List<String>> having(String fieldName, List<String> val) {
+ return new StringArrayQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_HAVING, val);
+ }
+
+ /**
+ * Build a query condition for {@link List} of {@link String} as the type
+ * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_NOT_HAVING} as the relation
+ *
+ * @param fieldName name of the field
+ * @param val value of the field
+ * @return a query that `[String] not having values`
+ */
+ public static PairQueryCondition<List<String>> notHaving(String fieldName, List<String> val) {
+ return new StringArrayQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_NOT_HAVING, val);
+ }
+ }
+
+ /**
+ * LongArrayQueryCondition represents `Field(List of Long) $op value` condition.
+ */
+ public static class LongArrayQueryCondition extends PairQueryCondition<List<Long>> {
+ private LongArrayQueryCondition(String fieldName, Banyandb.PairQuery.BinaryOp op, List<Long> value) {
+ super(fieldName, op, value);
+ }
+
+ @Override
+ protected Banyandb.TypedPair buildTypedPair() {
+ return Banyandb.TypedPair.newBuilder()
+ .setKey(fieldName)
+ .setIntArrayPair(Banyandb.IntArray.newBuilder()
+ .addAllValue(value)).build();
+ }
+
+ /**
+ * Build a query condition for {@link List} of {@link Long} as the type
+ * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_EQ} as the relation
+ *
+ * @param fieldName name of the field
+ * @param val value of the field
+ * @return a query that `[Long] == value`
+ */
+ public static PairQueryCondition<List<Long>> eq(String fieldName, List<Long> val) {
+ return new LongArrayQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_EQ, val);
+ }
+
+ /**
+ * Build a query condition for {@link List} of {@link Long} as the type
+ * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_NE} as the relation
+ *
+ * @param fieldName name of the field
+ * @param val value of the field
+ * @return a query that `[Long] != value`
+ */
+ public static PairQueryCondition<List<Long>> ne(String fieldName, List<Long> val) {
+ return new LongArrayQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_NE, val);
+ }
+
+ /**
+ * Build a query condition for {@link List} of {@link Long} as the type
+ * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_HAVING} as the relation
+ *
+ * @param fieldName name of the field
+ * @param val value of the field
+ * @return a query that `[Long] having values`
+ */
+ public static PairQueryCondition<List<Long>> having(String fieldName, List<Long> val) {
+ return new LongArrayQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_HAVING, val);
+ }
+
+ /**
+ * Build a query condition for {@link List} of {@link Long} as the type
+ * and {@link Banyandb.PairQuery.BinaryOp#BINARY_OP_NOT_HAVING} as the relation
+ *
+ * @param fieldName name of the field
+ * @param val value of the field
+ * @return a query that `[Long] not having values`
+ */
+ public static PairQueryCondition<List<Long>> notHaving(String fieldName, List<Long> val) {
+ return new LongArrayQueryCondition(fieldName, Banyandb.PairQuery.BinaryOp.BINARY_OP_NOT_HAVING, val);
+ }
+ }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/RowEntity.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/RowEntity.java
new file mode 100644
index 0000000..6b9f7ad
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/RowEntity.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import java.util.ArrayList;
+import java.util.List;
+import lombok.Getter;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+
+/**
+ * RowEntity represents an entity of BanyanDB entity.
+ */
+@Getter
+public class RowEntity {
+ /**
+ * identity of the entity.
+ * For a trace entity, it is the spanID of a Span or the segmentId of a segment in Skywalking.
+ */
+ private final String id;
+
+ /**
+ * timestamp of the entity in the timeunit of milliseconds.
+ */
+ private final long timestamp;
+
+ /**
+ * binary part of the entity
+ */
+ private final byte[] binary;
+
+ /**
+ * fields are indexed-fields that are searchable in BanyanBD
+ */
+ private final List<FieldAndValue<?>> fields;
+
+ RowEntity(BanyandbTrace.Entity entity) {
+ id = entity.getEntityId();
+ timestamp = entity.getTimestamp().getSeconds() * 1000 + entity.getTimestamp().getNanos() / 1_000_000;
+ binary = entity.getDataBinary().toByteArray();
+ fields = new ArrayList<>(entity.getFieldsCount());
+ entity.getFieldsList().forEach(field -> fields.add(FieldAndValue.build(field)));
+ }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/SerializableField.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/SerializableField.java
new file mode 100644
index 0000000..2deef2d
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/SerializableField.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import org.apache.skywalking.banyandb.v1.Banyandb;
+
+/**
+ * An interface that represents an object which can be converted to the protobuf representation
+ * of BanyanDB.Field. BanyanDB.Field is used for writing entities to the database.
+ */
+public interface SerializableField {
+ Banyandb.Field toField();
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/TimestampRange.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/TimestampRange.java
new file mode 100644
index 0000000..e1f1f14
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/TimestampRange.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import com.google.protobuf.Timestamp;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+
+@RequiredArgsConstructor
+@Getter(AccessLevel.PROTECTED)
+public class TimestampRange {
+ /**
+ * start timestamp in timeunit of milliseconds. inclusive.
+ */
+ private final long begin;
+
+ /**
+ * end timestamp in timeunit of milliseconds. inclusive.
+ */
+ private final long end;
+
+ /**
+ * @return TimeRange accordingly.
+ */
+ Banyandb.TimeRange build() {
+ final Banyandb.TimeRange.Builder builder = Banyandb.TimeRange.newBuilder();
+ builder.setBegin(Timestamp.newBuilder()
+ .setSeconds(begin / 1000)
+ .setNanos((int) (begin % 1000 * 1_000_000)));
+ builder.setEnd(Timestamp.newBuilder()
+ .setSeconds(end / 1000)
+ .setNanos((int) (end % 1000 * 1_000_000)));
+ return builder.build();
+ }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java
new file mode 100644
index 0000000..5bab418
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import io.grpc.stub.StreamObserver;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+import org.apache.skywalking.banyandb.v1.trace.TraceServiceGrpc;
+
+/**
+ * TraceWriteProcessor works for trace flush.
+ */
+@Slf4j
+public class TraceBulkWriteProcessor extends BulkWriteProcessor {
+ /**
+ * The BanyanDB instance name.
+ */
+ private final String group;
+ private TraceServiceGrpc.TraceServiceStub traceServiceStub;
+
+ /**
+ * Create the processor.
+ *
+ * @param traceServiceStub stub for gRPC call.
+ * @param maxBulkSize the max bulk size for the flush operation
+ * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger
+ * automatically. Unit is second.
+ * @param concurrency the number of concurrency would run for the flush max.
+ */
+ protected TraceBulkWriteProcessor(final String group,
+ final TraceServiceGrpc.TraceServiceStub traceServiceStub,
+ final int maxBulkSize,
+ final int flushInterval,
+ final int concurrency) {
+ super("TraceBulkWriteProcessor", maxBulkSize, flushInterval, concurrency);
+ this.group = group;
+ this.traceServiceStub = traceServiceStub;
+ }
+
+ /**
+ * Add the trace to the bulk processor.
+ *
+ * @param traceWrite to add.
+ */
+ public void add(TraceWrite traceWrite) {
+ this.buffer.produce(traceWrite);
+ }
+
+ @Override
+ protected void flush(final List data) {
+ final StreamObserver<BanyandbTrace.WriteRequest> writeRequestStreamObserver
+ = traceServiceStub.withDeadlineAfter(
+ flushInterval, TimeUnit.SECONDS)
+ .write(
+ new StreamObserver<BanyandbTrace.WriteResponse>() {
+ @Override
+ public void onNext(
+ BanyandbTrace.WriteResponse writeResponse) {
+ }
+
+ @Override
+ public void onError(
+ Throwable throwable) {
+ log.error(
+ "Error occurs in flushing traces.",
+ throwable
+ );
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ });
+ try {
+ data.forEach(write -> {
+ final TraceWrite traceWrite = (TraceWrite) write;
+ BanyandbTrace.WriteRequest request = traceWrite.build(group);
+ writeRequestStreamObserver.onNext(request);
+ });
+ } finally {
+ writeRequestStreamObserver.onCompleted();
+ }
+ }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQuery.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQuery.java
new file mode 100644
index 0000000..850ddda
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQuery.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+
+/**
+ * TraceQuery is the high-level query API for the trace model.
+ */
+@Setter
+public class TraceQuery {
+ /**
+ * Owner name current entity
+ */
+ private final String name;
+ /**
+ * The time range for query.
+ */
+ private final TimestampRange timestampRange;
+ /**
+ * The projections of query result. These should have defined in the schema.
+ */
+ private final List<String> projections;
+ /**
+ * Query conditions.
+ */
+ private final List<PairQueryCondition<?>> conditions;
+ /**
+ * The starting row id of the query. Default value is 0.
+ */
+ private int offset;
+ /**
+ * The limit size of the query. Default value is 20.
+ */
+ private int limit;
+ /**
+ * One order condition is supported and optional.
+ */
+ private OrderBy orderBy;
+ /**
+ * Whether to fetch data_binary for the query
+ */
+ private boolean dataBinary;
+
+ public TraceQuery(final String name, final TimestampRange timestampRange, final List<String> projections) {
+ this.name = name;
+ this.timestampRange = timestampRange;
+ this.projections = projections;
+ this.conditions = new ArrayList<>(10);
+ this.offset = 0;
+ this.limit = 20;
+ this.dataBinary = false;
+ }
+
+ public TraceQuery(final String name, final List<String> projections) {
+ this(name, null, projections);
+ }
+
+ /**
+ * Fluent API for appending query condition
+ *
+ * @param condition the query condition to be appended
+ */
+ public TraceQuery appendCondition(PairQueryCondition<?> condition) {
+ this.conditions.add(condition);
+ return this;
+ }
+
+ /**
+ * @param group The instance name.
+ * @return QueryRequest for gRPC level query.
+ */
+ BanyandbTrace.QueryRequest build(String group) {
+ final BanyandbTrace.QueryRequest.Builder builder = BanyandbTrace.QueryRequest.newBuilder();
+ builder.setMetadata(Banyandb.Metadata.newBuilder()
+ .setGroup(group)
+ .setName(name)
+ .build());
+ if (timestampRange != null) {
+ builder.setTimeRange(timestampRange.build());
+ }
+ builder.setProjection(Banyandb.Projection.newBuilder().setDataBinary(this.dataBinary).addAllKeyNames(projections).build());
+ conditions.forEach(pairQueryCondition -> builder.addFields(pairQueryCondition.build()));
+ builder.setOffset(offset);
+ builder.setLimit(limit);
+ if (orderBy != null) {
+ builder.setOrderBy(orderBy.build());
+ }
+ return builder.build();
+ }
+
+ @RequiredArgsConstructor
+ public static class OrderBy {
+ /**
+ * The field name for ordering.
+ */
+ private final String fieldName;
+ /**
+ * The type of ordering.
+ */
+ private final Type type;
+
+ private Banyandb.QueryOrder build() {
+ final Banyandb.QueryOrder.Builder builder = Banyandb.QueryOrder.newBuilder();
+ builder.setKeyName(fieldName);
+ builder.setSort(
+ Type.DESC.equals(type) ? Banyandb.QueryOrder.Sort.SORT_DESC : Banyandb.QueryOrder.Sort.SORT_ASC);
+ return builder.build();
+ }
+
+ public enum Type {
+ ASC, DESC
+ }
+ }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQueryResponse.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQueryResponse.java
new file mode 100644
index 0000000..49d0881
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQueryResponse.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import java.util.ArrayList;
+import java.util.List;
+import lombok.Getter;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+
+/**
+ * TraceQueryResponse represents the trace query result.
+ */
+public class TraceQueryResponse {
+ @Getter
+ private final List<RowEntity> entities;
+
+ TraceQueryResponse(BanyandbTrace.QueryResponse response) {
+ final List<BanyandbTrace.Entity> entitiesList = response.getEntitiesList();
+ entities = new ArrayList<>(entitiesList.size());
+ entitiesList.forEach(entity -> entities.add(new RowEntity(entity)));
+ }
+
+ /**
+ * @return size of the response set.
+ */
+ public int size() {
+ return entities.size();
+ }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java
new file mode 100644
index 0000000..0e4008a
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Timestamp;
+import java.util.List;
+import lombok.AccessLevel;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Singular;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+
+/**
+ * TraceWrite represents a write operation, including necessary fields, for {@link
+ * BanyanDBClient#buildTraceWriteProcessor}.
+ */
+@Builder
+@Getter(AccessLevel.PROTECTED)
+public class TraceWrite {
+ /**
+ * Owner name current entity
+ */
+ private final String name;
+ /**
+ * ID of current entity
+ */
+ private final String entityId;
+ /**
+ * Timestamp represents the time of current trace or trace segment
+ * in the timeunit of milliseconds.
+ */
+ private final long timestamp;
+ /**
+ * The binary raw data represents the whole object of current trace or trace segment. It could be organized by
+ * different serialization formats. Natively, SkyWalking uses protobuf, but it is not required. The BanyanDB server
+ * wouldn't deserialize this. So, no format requirement.
+ */
+ private final byte[] binary;
+ /**
+ * The values of fields, which are defined by the schema. In the bulk write process, BanyanDB client doesn't require
+ * field names anymore.
+ */
+ @Singular
+ private final List<SerializableField> fields;
+
+ /**
+ * @param group of the BanyanDB client connected.
+ * @return {@link BanyandbTrace.WriteRequest} for the bulk process.
+ */
+ BanyandbTrace.WriteRequest build(String group) {
+ final BanyandbTrace.WriteRequest.Builder builder = BanyandbTrace.WriteRequest.newBuilder();
+ builder.setMetadata(Banyandb.Metadata.newBuilder().setGroup(group).setName(name).build());
+ final BanyandbTrace.EntityValue.Builder entityBuilder = BanyandbTrace.EntityValue.newBuilder();
+ entityBuilder.setEntityId(entityId);
+ entityBuilder.setTimestamp(Timestamp.newBuilder()
+ .setSeconds(timestamp / 1000)
+ .setNanos((int) (timestamp % 1000 * 1_000_000)));
+ entityBuilder.setDataBinary(ByteString.copyFrom(binary));
+ fields.forEach(writeField -> entityBuilder.addFields(writeField.toField()));
+ builder.setEntity(entityBuilder.build());
+ return builder.build();
+ }
+}
diff --git a/src/main/proto/banyandb/v1/banyandb-trace.proto b/src/main/proto/banyandb/v1/banyandb-trace.proto
new file mode 100644
index 0000000..44d98c1
--- /dev/null
+++ b/src/main/proto/banyandb/v1/banyandb-trace.proto
@@ -0,0 +1,107 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto3";
+
+option java_package = "org.apache.skywalking.banyandb.v1.trace";
+
+package banyandb.trace.v1;
+
+import "google/protobuf/timestamp.proto";
+import "google/protobuf/struct.proto";
+import "banyandb/v1/banyandb.proto";
+
+service TraceService {
+ rpc Query(banyandb.trace.v1.QueryRequest) returns (banyandb.trace.v1.QueryResponse);
+ rpc Write(stream banyandb.trace.v1.WriteRequest) returns (stream banyandb.trace.v1.WriteResponse);
+}
+
+// QueryRequest is the request contract for query.
+message QueryRequest {
+ // metadata is required
+ banyandb.v1.Metadata metadata = 1;
+ // time_range is a range query with begin/end time of entities in the timeunit of nanoseconds.
+ // In the context of Trace, it represents the range of the `startTime` for spans/segments,
+ // while in the context of Log, it means the range of the timestamp(s) for logs.
+ // it is always recommended to specify time range for performance reason
+ banyandb.v1.TimeRange time_range = 2;
+ // offset is used to support pagination, together with the following limit
+ uint32 offset = 3;
+ // limit is used to impose a boundary on the number of records being returned
+ uint32 limit = 4;
+ // order_by is given to specify the sort for a field. So far, only fields in the type of Integer are supported
+ banyandb.v1.QueryOrder order_by = 5;
+ // fields are indexed. Some typical fields are listed below,
+ // - trace_id: if given, it takes precedence over other fields and will be used to retrieve entities before other conditions are imposed
+ // - duration: typical for trace context
+ repeated banyandb.v1.PairQuery fields = 6;
+ // projection can be used to select the key names of the entities in the response
+ banyandb.v1.Projection projection = 7;
+}
+
+// QueryResponse is the response for a query to the Query module.
+message QueryResponse {
+ // entities are the actual data returned
+ repeated Entity entities = 1;
+}
+
+// Entity represents
+// (Trace context) a Span defined in Google Dapper paper or equivalently a Segment in Skywalking.
+// (Log context) a log
+message Entity {
+ // entity_id could be span_id of a Span or segment_id of a Segment in the context of Trace
+ string entity_id = 1;
+ // timestamp represents
+ // 1) either the start time of a Span/Segment,
+ // 2) or the timestamp of a log
+ google.protobuf.Timestamp timestamp = 2;
+ // data_binary contains all un-indexed Tags and other key-value pairs
+ bytes data_binary = 3;
+ // fields contains all indexed Field. Some typical names,
+ // - trace_id
+ // - duration
+ // - service_name
+ // - service_instance_id
+ // - end_time_nanoseconds
+ repeated banyandb.v1.TypedPair fields = 4;
+}
+
+
+message WriteRequest {
+ // the metadata is only required in the first write.
+ banyandb.v1.Metadata metadata = 1;
+ // the entity is required.
+ EntityValue entity = 2;
+}
+
+message WriteResponse {}
+
+message EntityValue {
+ // entity_id could be span_id of a Span or segment_id of a Segment in the context of Trace
+ string entity_id = 1;
+ // timestamp_nanoseconds is in the timeunit of nanoseconds. It represents
+ // 1) either the start time of a Span/Segment,
+ // 2) or the timestamp of a log
+ google.protobuf.Timestamp timestamp = 2;
+ // binary representation of segments, including tags, spans...
+ bytes data_binary = 3;
+ // support all of indexed fields in the fields.
+ // Pair only has value, as the value of PairValue match with the key
+ // by the index rules and index rule bindings of Metadata group.
+ // indexed fields of multiple entities are compression in the fields.
+ repeated banyandb.v1.Field fields = 4;
+}
\ No newline at end of file
diff --git a/src/main/proto/banyandb/v1/banyandb.proto b/src/main/proto/banyandb/v1/banyandb.proto
new file mode 100644
index 0000000..e86186e
--- /dev/null
+++ b/src/main/proto/banyandb/v1/banyandb.proto
@@ -0,0 +1,136 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto3";
+
+option java_package = "org.apache.skywalking.banyandb.v1";
+
+package banyandb.v1;
+
+import "google/protobuf/timestamp.proto";
+import "google/protobuf/struct.proto";
+
+// Metadata is for multi-tenant, multi-model use
+message Metadata {
+ // group contains a set of options, like retention policy, max
+ string group = 1;
+ // name of the entity
+ string name = 2;
+}
+
+enum FieldType {
+ FIELD_TYPE_UNSPECIFIED = 0;
+ FIELD_TYPE_STRING = 1;
+ FIELD_TYPE_INT = 2;
+ FIELD_TYPE_STRING_ARRAY = 3;
+ FIELD_TYPE_INT_ARRAY = 4;
+}
+
+// Pair is the building block of a record which is equivalent to a key-value pair.
+// In the context of Trace, it could be metadata of a trace such as service_name, service_instance, etc.
+// Besides, other fields/tags are organized in key-value pair in the underlying storage layer.
+// One should notice that the values can be a multi-value.
+message TypedPair {
+ string key = 1;
+ oneof typed {
+ NullWithType null_pair = 2;
+ Int int_pair = 3;
+ Str str_pair = 4;
+ IntArray int_array_pair = 5;
+ StrArray str_array_pair = 6;
+ }
+
+ message NullWithType {
+ FieldType type = 1;
+ }
+}
+
+// PairQuery consists of the query condition with a single binary operator to be imposed
+// For 1:1 BinaryOp, values in condition must be an array with length = 1,
+// while for 1:N BinaryOp, values can be an array with length >= 1.
+message PairQuery {
+ // BinaryOp specifies the operation imposed to the given query condition
+ // For EQ, NE, LT, GT, LE and GE, only one operand should be given, i.e. one-to-one relationship.
+ // HAVING and NOT_HAVING allow multi-value to be the operand such as array/vector, i.e. one-to-many relationship.
+ // For example, "keyA" contains "valueA" **and** "valueB"
+ enum BinaryOp {
+ BINARY_OP_UNSPECIFIED = 0;
+ BINARY_OP_EQ = 1;
+ BINARY_OP_NE = 2;
+ BINARY_OP_LT = 3;
+ BINARY_OP_GT = 4;
+ BINARY_OP_LE = 5;
+ BINARY_OP_GE = 6;
+ BINARY_OP_HAVING = 7;
+ BINARY_OP_NOT_HAVING = 8;
+ }
+ BinaryOp op = 1;
+ TypedPair condition = 2;
+}
+
+// QueryOrder means a Sort operation to be done for a given field.
+// The key_name refers to the key of a Pair.
+message QueryOrder {
+ string key_name = 1;
+ enum Sort {
+ SORT_UNSPECIFIED = 0;
+ SORT_DESC = 1;
+ SORT_ASC = 2;
+ }
+ Sort sort = 2;
+}
+
+// Projection is used to select the names of keys to be returned.
+message Projection {
+ // whether binary part is needed
+ bool data_binary = 1;
+ // The key_name refers to the key(s) of Pair(s).
+ repeated string key_names = 2;
+}
+
+// TimeRange is a range query for uint64,
+// the range here follows left-inclusive and right-exclusive rule, i.e. [begin, end) if both edges exist
+message TimeRange {
+ google.protobuf.Timestamp begin = 1;
+ google.protobuf.Timestamp end = 2;
+}
+
+message Str {
+ string value = 1;
+}
+
+message Int {
+ int64 value = 1;
+}
+
+message StrArray {
+ repeated string value = 1;
+}
+
+message IntArray {
+ repeated int64 value = 1;
+}
+
+message Field {
+ oneof value_type {
+ google.protobuf.NullValue null = 1;
+ Str str = 2;
+ StrArray str_array = 3;
+ Int int = 4;
+ IntArray int_array = 5;
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientQueryTest.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientQueryTest.java
new file mode 100644
index 0000000..1667254
--- /dev/null
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientQueryTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Timestamp;
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcCleanupRule;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+import org.apache.skywalking.banyandb.v1.trace.TraceServiceGrpc;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatchers;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.AdditionalAnswers.delegatesTo;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.management.*")
+public class BanyanDBClientQueryTest {
+ @Rule
+ public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+
+ private final TraceServiceGrpc.TraceServiceImplBase serviceImpl =
+ mock(TraceServiceGrpc.TraceServiceImplBase.class, delegatesTo(
+ new TraceServiceGrpc.TraceServiceImplBase() {
+ @Override
+ public void query(BanyandbTrace.QueryRequest request, StreamObserver<BanyandbTrace.QueryResponse> responseObserver) {
+ responseObserver.onNext(BanyandbTrace.QueryResponse.newBuilder().build());
+ responseObserver.onCompleted();
+ }
+ }));
+
+ private BanyanDBClient client;
+
+ @Before
+ public void setUp() throws IOException {
+ // Generate a unique in-process server name.
+ String serverName = InProcessServerBuilder.generateName();
+
+ // Create a server, add service, start, and register for automatic graceful shutdown.
+ Server server = InProcessServerBuilder
+ .forName(serverName).directExecutor().addService(serviceImpl).build();
+ grpcCleanup.register(server.start());
+
+ // Create a client channel and register for automatic graceful shutdown.
+ ManagedChannel channel = grpcCleanup.register(
+ InProcessChannelBuilder.forName(serverName).directExecutor().build());
+ client = new BanyanDBClient("127.0.0.1", server.getPort(), "default");
+
+ client.connect(channel);
+ }
+
+ @Test
+ public void testNonNull() {
+ Assert.assertNotNull(this.client);
+ }
+
+ @Test
+ public void testQuery_tableScan() {
+ ArgumentCaptor<BanyandbTrace.QueryRequest> requestCaptor = ArgumentCaptor.forClass(BanyandbTrace.QueryRequest.class);
+
+ Instant end = Instant.now();
+ Instant begin = end.minus(15, ChronoUnit.MINUTES);
+ TraceQuery query = new TraceQuery("sw",
+ new TimestampRange(begin.toEpochMilli(), end.toEpochMilli()),
+ Arrays.asList("state", "start_time", "duration", "trace_id"));
+ // search for all states
+ query.appendCondition(PairQueryCondition.LongQueryCondition.eq("state", 0L));
+ query.setOrderBy(new TraceQuery.OrderBy("duration", TraceQuery.OrderBy.Type.DESC));
+ client.queryTraces(query);
+
+ verify(serviceImpl).query(requestCaptor.capture(), ArgumentMatchers.any());
+
+ final BanyandbTrace.QueryRequest request = requestCaptor.getValue();
+ // assert metadata
+ Assert.assertEquals("sw", request.getMetadata().getName());
+ Assert.assertEquals("default", request.getMetadata().getGroup());
+ // assert timeRange, both seconds and the nanos
+ Assert.assertEquals(begin.toEpochMilli() / 1000, request.getTimeRange().getBegin().getSeconds());
+ Assert.assertEquals(TimeUnit.MILLISECONDS.toNanos(begin.toEpochMilli() % 1000), request.getTimeRange().getBegin().getNanos());
+ Assert.assertEquals(end.toEpochMilli() / 1000, request.getTimeRange().getEnd().getSeconds());
+ Assert.assertEquals(TimeUnit.MILLISECONDS.toNanos(end.toEpochMilli() % 1000), request.getTimeRange().getEnd().getNanos());
+ // assert fields, we only have state as a condition which should be state
+ Assert.assertEquals(1, request.getFieldsCount());
+ // assert orderBy, by default DESC
+ Assert.assertEquals(Banyandb.QueryOrder.Sort.SORT_DESC, request.getOrderBy().getSort());
+ Assert.assertEquals("duration", request.getOrderBy().getKeyName());
+ // assert state
+ Assert.assertEquals(Banyandb.PairQuery.BinaryOp.BINARY_OP_EQ, request.getFields(0).getOp());
+ Assert.assertEquals(0, request.getFields(0).getCondition().getIntPair().getValue());
+ // assert projections
+ assertCollectionEqual(Lists.newArrayList("duration", "state", "start_time", "trace_id"), request.getProjection().getKeyNamesList());
+ }
+
+ @Test
+ public void testQuery_indexScan() {
+ ArgumentCaptor<BanyandbTrace.QueryRequest> requestCaptor = ArgumentCaptor.forClass(BanyandbTrace.QueryRequest.class);
+ Instant begin = Instant.now().minus(5, ChronoUnit.MINUTES);
+ Instant end = Instant.now();
+ String serviceId = "service_id_b";
+ String serviceInstanceId = "service_id_b_1";
+ String endpointId = "/check_0";
+ long minDuration = 10;
+ long maxDuration = 100;
+
+ TraceQuery query = new TraceQuery("sw",
+ new TimestampRange(begin.toEpochMilli(), end.toEpochMilli()),
+ Arrays.asList("state", "start_time", "duration", "trace_id"));
+ // search for the successful states
+ query.appendCondition(PairQueryCondition.LongQueryCondition.eq("state", 1L))
+ .appendCondition(PairQueryCondition.StringQueryCondition.eq("service_id", serviceId))
+ .appendCondition(PairQueryCondition.StringQueryCondition.eq("service_instance_id", serviceInstanceId))
+ .appendCondition(PairQueryCondition.StringQueryCondition.eq("endpoint_id", endpointId))
+ .appendCondition(PairQueryCondition.LongQueryCondition.ge("duration", minDuration))
+ .appendCondition(PairQueryCondition.LongQueryCondition.le("duration", maxDuration))
+ .setOrderBy(new TraceQuery.OrderBy("start_time", TraceQuery.OrderBy.Type.ASC));
+
+ client.queryTraces(query);
+
+ verify(serviceImpl).query(requestCaptor.capture(), ArgumentMatchers.any());
+ final BanyandbTrace.QueryRequest request = requestCaptor.getValue();
+ // assert metadata
+ Assert.assertEquals("sw", request.getMetadata().getName());
+ Assert.assertEquals("default", request.getMetadata().getGroup());
+ // assert timeRange
+ Assert.assertEquals(begin.getEpochSecond(), request.getTimeRange().getBegin().getSeconds());
+ Assert.assertEquals(end.getEpochSecond(), request.getTimeRange().getEnd().getSeconds());
+ // assert fields, we only have state as a condition
+ Assert.assertEquals(6, request.getFieldsCount());
+ // assert orderBy, by default DESC
+ Assert.assertEquals(Banyandb.QueryOrder.Sort.SORT_ASC, request.getOrderBy().getSort());
+ Assert.assertEquals("start_time", request.getOrderBy().getKeyName());
+ // assert projections
+ assertCollectionEqual(Lists.newArrayList("duration", "state", "start_time", "trace_id"), request.getProjection().getKeyNamesList());
+ // assert fields
+ assertCollectionEqual(request.getFieldsList(), ImmutableList.of(
+ PairQueryCondition.LongQueryCondition.ge("duration", minDuration).build(), // 1 -> duration >= minDuration
+ PairQueryCondition.LongQueryCondition.le("duration", maxDuration).build(), // 2 -> duration <= maxDuration
+ PairQueryCondition.StringQueryCondition.eq("service_id", serviceId).build(), // 3 -> service_id
+ PairQueryCondition.StringQueryCondition.eq("service_instance_id", serviceInstanceId).build(), // 4 -> service_instance_id
+ PairQueryCondition.StringQueryCondition.eq("endpoint_id", endpointId).build(), // 5 -> endpoint_id
+ PairQueryCondition.LongQueryCondition.eq("state", 1L).build() // 7 -> state
+ ));
+ }
+
+ @Test
+ public void testQuery_traceIDFetch() {
+ ArgumentCaptor<BanyandbTrace.QueryRequest> requestCaptor = ArgumentCaptor.forClass(BanyandbTrace.QueryRequest.class);
+ String traceId = "1111.222.333";
+
+ TraceQuery query = new TraceQuery("sw", Arrays.asList("state", "start_time", "duration", "trace_id"));
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("trace_id", traceId));
+
+ client.queryTraces(query);
+
+ verify(serviceImpl).query(requestCaptor.capture(), ArgumentMatchers.any());
+ final BanyandbTrace.QueryRequest request = requestCaptor.getValue();
+ // assert metadata
+ Assert.assertEquals("sw", request.getMetadata().getName());
+ Assert.assertEquals("default", request.getMetadata().getGroup());
+ Assert.assertEquals(1, request.getFieldsCount());
+ // assert fields
+ assertCollectionEqual(request.getFieldsList(), ImmutableList.of(
+ PairQueryCondition.StringQueryCondition.eq("trace_id", traceId).build()
+ ));
+ }
+
+ @Test
+ public void testQuery_responseConversion() {
+ final byte[] binaryData = new byte[]{13};
+ final String segmentId = "1231.dfd.123123ssf";
+ final String traceId = "trace_id-xxfff.111323";
+ final long duration = 200L;
+ final Instant now = Instant.now();
+ final BanyandbTrace.QueryResponse responseObj = BanyandbTrace.QueryResponse.newBuilder()
+ .addEntities(BanyandbTrace.Entity.newBuilder()
+ .setDataBinary(ByteString.copyFrom(binaryData))
+ .setEntityId(segmentId)
+ .setTimestamp(Timestamp.newBuilder()
+ .setSeconds(now.toEpochMilli() / 1000)
+ .setNanos((int) TimeUnit.MILLISECONDS.toNanos(now.toEpochMilli() % 1000))
+ .build())
+ .addFields(Banyandb.TypedPair.newBuilder()
+ .setKey("trace_id")
+ .setStrPair(Banyandb.Str.newBuilder().setValue(traceId).build()).build())
+ .addFields(Banyandb.TypedPair.newBuilder()
+ .setKey("duration")
+ .setIntPair(Banyandb.Int.newBuilder().setValue(duration).build()).build())
+ .addFields(Banyandb.TypedPair.newBuilder()
+ .setKey("mq.broker")
+ .setNullPair(Banyandb.TypedPair.NullWithType.newBuilder().setType(Banyandb.FieldType.FIELD_TYPE_STRING).build()).build())
+ .build())
+ .build();
+ TraceQueryResponse resp = new TraceQueryResponse(responseObj);
+ Assert.assertNotNull(resp);
+ Assert.assertEquals(1, resp.getEntities().size());
+ Assert.assertEquals(3, resp.getEntities().get(0).getFields().size());
+ Assert.assertEquals(3, resp.getEntities().get(0).getFields().size());
+ Assert.assertEquals(new FieldAndValue.StringFieldPair("trace_id", traceId), resp.getEntities().get(0).getFields().get(0));
+ Assert.assertEquals(new FieldAndValue.LongFieldPair("duration", duration), resp.getEntities().get(0).getFields().get(1));
+ Assert.assertEquals(new FieldAndValue.StringFieldPair("mq.broker", null), resp.getEntities().get(0).getFields().get(2));
+ }
+
+ static <T> void assertCollectionEqual(Collection<T> c1, Collection<T> c2) {
+ Assert.assertTrue(c1.size() == c2.size() && c1.containsAll(c2) && c2.containsAll(c1));
+ }
+}
diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientWriteTest.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientWriteTest.java
new file mode 100644
index 0000000..731f560
--- /dev/null
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientWriteTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcCleanupRule;
+import io.grpc.util.MutableHandlerRegistry;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+import org.apache.skywalking.banyandb.v1.trace.TraceServiceGrpc;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class BanyanDBClientWriteTest {
+ @Rule
+ public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+
+ private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
+
+ private BanyanDBClient client;
+ private TraceBulkWriteProcessor traceBulkWriteProcessor;
+
+ @Before
+ public void setUp() throws IOException {
+ String serverName = InProcessServerBuilder.generateName();
+
+ Server server = InProcessServerBuilder
+ .forName(serverName).fallbackHandlerRegistry(serviceRegistry).directExecutor().build();
+ grpcCleanup.register(server.start());
+
+ ManagedChannel channel = grpcCleanup.register(
+ InProcessChannelBuilder.forName(serverName).directExecutor().build());
+
+ client = new BanyanDBClient("127.0.0.1", server.getPort(), "default");
+ client.connect(channel);
+ traceBulkWriteProcessor = client.buildTraceWriteProcessor(1000, 1, 1);
+ }
+
+ @After
+ public void shutdown() throws IOException {
+ traceBulkWriteProcessor.close();
+ }
+
+ @Test
+ public void testWrite() throws Exception {
+ final CountDownLatch allRequestsDelivered = new CountDownLatch(1);
+ final List<BanyandbTrace.WriteRequest> writeRequestDelivered = new ArrayList<>();
+
+ // implement the fake service
+ final TraceServiceGrpc.TraceServiceImplBase serviceImpl =
+ new TraceServiceGrpc.TraceServiceImplBase() {
+ @Override
+ public StreamObserver<BanyandbTrace.WriteRequest> write(StreamObserver<BanyandbTrace.WriteResponse> responseObserver) {
+ return new StreamObserver<BanyandbTrace.WriteRequest>() {
+ @Override
+ public void onNext(BanyandbTrace.WriteRequest value) {
+ writeRequestDelivered.add(value);
+ responseObserver.onNext(BanyandbTrace.WriteResponse.newBuilder().build());
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ }
+
+ @Override
+ public void onCompleted() {
+ responseObserver.onCompleted();
+ allRequestsDelivered.countDown();
+ }
+ };
+ }
+ };
+ serviceRegistry.addService(serviceImpl);
+
+ String segmentId = "1231.dfd.123123ssf";
+ String traceId = "trace_id-xxfff.111323";
+ String serviceId = "webapp_id";
+ String serviceInstanceId = "10.0.0.1_id";
+ String endpointId = "home_id";
+ int latency = 200;
+ int state = 1;
+ Instant now = Instant.now();
+ byte[] byteData = new byte[]{14};
+ String broker = "172.16.10.129:9092";
+ String topic = "topic_1";
+ String queue = "queue_2";
+ String httpStatusCode = "200";
+ String dbType = "SQL";
+ String dbInstance = "127.0.0.1:3306";
+
+ TraceWrite traceWrite = TraceWrite.builder()
+ .entityId(segmentId)
+ .binary(byteData)
+ .timestamp(now.toEpochMilli())
+ .name("sw")
+ .field(Field.stringField(traceId)) // 0
+ .field(Field.stringField(serviceId))
+ .field(Field.stringField(serviceInstanceId))
+ .field(Field.stringField(endpointId))
+ .field(Field.longField(latency)) // 4
+ .field(Field.longField(state))
+ .field(Field.stringField(httpStatusCode))
+ .field(Field.nullField()) // 7
+ .field(Field.stringField(dbType))
+ .field(Field.stringField(dbInstance))
+ .field(Field.stringField(broker))
+ .field(Field.stringField(topic))
+ .field(Field.stringField(queue)) // 12
+ .build();
+
+ traceBulkWriteProcessor.add(traceWrite);
+
+ if (allRequestsDelivered.await(5, TimeUnit.SECONDS)) {
+ Assert.assertEquals(1, writeRequestDelivered.size());
+ final BanyandbTrace.WriteRequest request = writeRequestDelivered.get(0);
+ Assert.assertEquals(13, request.getEntity().getFieldsCount());
+ Assert.assertEquals(traceId, request.getEntity().getFields(0).getStr().getValue());
+ Assert.assertEquals(latency, request.getEntity().getFields(4).getInt().getValue());
+ Assert.assertEquals(request.getEntity().getFields(7).getValueTypeCase(), Banyandb.Field.ValueTypeCase.NULL);
+ Assert.assertEquals(queue, request.getEntity().getFields(12).getStr().getValue());
+ } else {
+ Assert.fail();
+ }
+ }
+}