You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/08/13 14:04:40 UTC

[skywalking] branch banyandb-integration updated: Add banyandb client APIs with most implementation. (#7455)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch banyandb-integration
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/banyandb-integration by this push:
     new 207a5df  Add banyandb client APIs with most implementation. (#7455)
207a5df is described below

commit 207a5dfa8deff23cc5b416a7c279c43b5ed8f292
Author: 吴晟 Wu Sheng <wu...@foxmail.com>
AuthorDate: Fri Aug 13 22:04:23 2021 +0800

    Add banyandb client APIs with most implementation. (#7455)
    
    * Init module structure.
    
    * Provide BanyanDBClient
    
    * Add Trace bulk write API
    
    * Add Trace query API
    
    * Finish prototype codebase
---
 oap-server/banyandb-java-client/pom.xml            | 118 ++++++++++++++++
 .../banyandb/v1/client/BanyanDBClient.java         | 155 +++++++++++++++++++++
 .../banyandb/v1/client/BulkWriteProcessor.java     | 118 ++++++++++++++++
 .../skywalking/banyandb/v1/client/Field.java       |  97 +++++++++++++
 .../banyandb/v1/client/FieldAndValue.java          | 143 +++++++++++++++++++
 .../skywalking/banyandb/v1/client/Options.java     |  43 ++++++
 .../banyandb/v1/client/PairQueryCondition.java     |  59 ++++++++
 .../skywalking/banyandb/v1/client/RowEntity.java   |  43 ++++++
 .../banyandb/v1/client/TimestampRange.java         |  46 ++++++
 .../v1/client/TraceBulkWriteProcessor.java         | 102 ++++++++++++++
 .../skywalking/banyandb/v1/client/TraceQuery.java  | 115 +++++++++++++++
 .../banyandb/v1/client/TraceQueryResponse.java     |  45 ++++++
 .../skywalking/banyandb/v1/client/TraceWrite.java  |  78 +++++++++++
 .../main/proto/banyandb/v1/banyandb-trace.proto    | 107 ++++++++++++++
 .../src/main/proto/banyandb/v1/banyandb.proto      | 148 ++++++++++++++++++++
 oap-server/pom.xml                                 |   1 +
 16 files changed, 1418 insertions(+)

diff --git a/oap-server/banyandb-java-client/pom.xml b/oap-server/banyandb-java-client/pom.xml
new file mode 100644
index 0000000..e9f58f1
--- /dev/null
+++ b/oap-server/banyandb-java-client/pom.xml
@@ -0,0 +1,118 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <!-- This module is going to release in a new repo, apache/skywalking-banyandb-java-api -->
+    <parent>
+        <artifactId>oap-server</artifactId>
+        <groupId>org.apache.skywalking</groupId>
+        <version>8.8.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>banyandb-java-client</artifactId>
+    <packaging>jar</packaging>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-netty-shaded</artifactId>
+            <version>${grpc.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-protobuf</artifactId>
+            <version>${grpc.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-stub</artifactId>
+            <version>${grpc.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-tcnative-boringssl-static</artifactId>
+            <version>${netty-tcnative-boringssl-static.version}</version>
+        </dependency>
+        <dependency> <!-- necessary for Java 9+ -->
+            <groupId>org.apache.tomcat</groupId>
+            <artifactId>annotations-api</artifactId>
+            <version>${org.apache.tomcat.annotations-api.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.skywalking</groupId>
+            <artifactId>apm-datacarrier</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+
+        <plugins>
+            <plugin>
+                <groupId>kr.motd.maven</groupId>
+                <artifactId>os-maven-plugin</artifactId>
+                <version>${os-maven-plugin.version}</version>
+                <executions>
+                    <execution>
+                        <phase>initialize</phase>
+                        <goals>
+                            <goal>detect</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.xolstice.maven.plugins</groupId>
+                <artifactId>protobuf-maven-plugin</artifactId>
+                <version>${protobuf-maven-plugin.version}</version>
+                <configuration>
+                    <!--
+                      The version of protoc must match protobuf-java. If you don't depend on
+                      protobuf-java directly, you will be transitively depending on the
+                      protobuf-java version that grpc depends on.
+                    -->
+                    <protocArtifact>
+                        com.google.protobuf:protoc:${com.google.protobuf.protoc.version}:exe:${os.detected.classifier}
+                    </protocArtifact>
+                    <pluginId>grpc-java</pluginId>
+                    <pluginArtifact>
+                        io.grpc:protoc-gen-grpc-java:${protoc-gen-grpc-java.plugin.version}:exe:${os.detected.classifier}
+                    </pluginArtifact>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>grpc-build</id>
+                        <goals>
+                            <goal>compile</goal>
+                            <goal>compile-custom</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
new file mode 100644
index 0000000..389ded8
--- /dev/null
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.NameResolverRegistry;
+import io.grpc.internal.DnsNameResolverProvider;
+import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+import org.apache.skywalking.banyandb.v1.trace.TraceServiceGrpc;
+
+/**
+ * BanyanDBClient represents a client instance interacting with BanyanDB server. This is built on the top of BanyanDB v1
+ * gRPC APIs.
+ */
+@Slf4j
+public class BanyanDBClient {
+    /**
+     * The hostname of BanyanDB server.
+     */
+    private final String host;
+    /**
+     * The port of BanyanDB server.
+     */
+    private final int port;
+    /**
+     * The instance name.
+     */
+    private final String group;
+    /**
+     * Options for server connection.
+     */
+    private Options options;
+    /**
+     * Managed gRPC connection.
+     */
+    private volatile ManagedChannel managedChannel;
+    /**
+     * gRPC client stub
+     */
+    private volatile TraceServiceGrpc.TraceServiceStub traceServiceStub;
+    /**
+     * gRPC blocking stub.
+     */
+    private volatile TraceServiceGrpc.TraceServiceBlockingStub traceServiceBlockingStub;
+    /**
+     * The connection status.
+     */
+    private volatile boolean isConnected = false;
+    /**
+     * A lock to control the race condition in establishing and disconnecting network connection.
+     */
+    private volatile ReentrantLock connectionEstablishLock;
+
+    /**
+     * Create a BanyanDB client instance
+     *
+     * @param host  IP or domain name
+     * @param port  Server port
+     * @param group Database instance name
+     */
+    public BanyanDBClient(final String host, final int port, final String group) {
+        this(host, port, group, new Options());
+    }
+
+    /**
+     * Create a BanyanDB client instance with custom options
+     *
+     * @param host    IP or domain name
+     * @param port    Server port
+     * @param group   Database instance name
+     * @param options for database connection
+     */
+    public BanyanDBClient(final String host,
+                          final int port,
+                          final String group,
+                          final Options options) {
+        this.host = host;
+        this.port = port;
+        this.group = group;
+        this.options = options;
+        this.connectionEstablishLock = new ReentrantLock();
+
+        NameResolverRegistry.getDefaultRegistry().register(new DnsNameResolverProvider());
+    }
+
+    /**
+     * Connect to the server.
+     *
+     * @throws RuntimeException if server is not reachable.
+     */
+    public void connect() {
+        connectionEstablishLock.lock();
+        try {
+            if (!isConnected) {
+                final ManagedChannelBuilder nettyChannelBuilder = NettyChannelBuilder.forAddress(host, port);
+                nettyChannelBuilder.maxInboundMessageSize(options.getMaxInboundMessageSize());
+
+                managedChannel = nettyChannelBuilder.build();
+                traceServiceStub = TraceServiceGrpc.newStub(managedChannel);
+                traceServiceBlockingStub = TraceServiceGrpc.newBlockingStub(
+                    managedChannel);
+                isConnected = true;
+            }
+        } finally {
+            connectionEstablishLock.unlock();
+        }
+    }
+
+    /**
+     * Create a build process for trace write.
+     *
+     * @param maxBulkSize   the max bulk size for the flush operation
+     * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger
+     *                      automatically. Unit is second
+     * @param concurrency   the number of concurrency would run for the flush max
+     * @return trace bulk write processor
+     */
+    public TraceBulkWriteProcessor buildTraceWriteProcessor(int maxBulkSize, int flushInterval, int concurrency) {
+        return new TraceBulkWriteProcessor(group, traceServiceStub, maxBulkSize, flushInterval, concurrency);
+    }
+
+    /**
+     * Query trace according to given conditions
+     *
+     * @param traceQuery condition for query
+     * @return hint traces.
+     */
+    public TraceQueryResponse queryTraces(TraceQuery traceQuery) {
+        final BanyandbTrace.QueryResponse response = traceServiceBlockingStub
+            .withDeadlineAfter(options.getDeadline(), TimeUnit.SECONDS)
+            .query(traceQuery.build(group));
+        return new TraceQueryResponse(response);
+    }
+}
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java
new file mode 100644
index 0000000..fc5bce5
--- /dev/null
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+
+/**
+ * BulkWriteProcessor is a timeline and size dual driven processor.
+ *
+ * It includes an internal queue and timer, and accept the data sequentially. With the given thresholds of time and
+ * size, it could activate flush to continue the process to the next step.
+ */
+public abstract class BulkWriteProcessor {
+    protected final int flushInterval;
+    protected DataCarrier buffer;
+
+    /**
+     * Create the processor.
+     *
+     * @param maxBulkSize   the max bulk size for the flush operation
+     * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger
+     *                      automatically. Unit is second.
+     * @param concurrency   the number of concurrency would run for the flush max.
+     */
+    protected BulkWriteProcessor(String processorName, int maxBulkSize, int flushInterval, int concurrency) {
+        this.flushInterval = flushInterval;
+        this.buffer = new DataCarrier(processorName, maxBulkSize, concurrency);
+        Properties properties = new Properties();
+        properties.put("maxBulkSize", maxBulkSize);
+        properties.put("flushInterval", flushInterval);
+        properties.put("BulkWriteProcessor", this);
+        buffer.consume(QueueWatcher.class, concurrency, 20, properties);
+    }
+
+    /**
+     * The internal queue consumer for build process.
+     */
+    private static class QueueWatcher implements IConsumer {
+        private long lastFlushTimestamp;
+        private int maxBulkSize;
+        private int flushInterval;
+        private List cachedData;
+        private BulkWriteProcessor bulkWriteProcessor;
+
+        @Override
+        public void init(Properties properties) {
+            lastFlushTimestamp = System.currentTimeMillis();
+            maxBulkSize = (Integer) properties.get("maxBulkSize");
+            flushInterval = (Integer) properties.get("flushInterval") * 1000;
+            cachedData = new ArrayList(maxBulkSize);
+            bulkWriteProcessor = (BulkWriteProcessor) properties.get("BulkWriteProcessor");
+        }
+
+        @Override
+        public void consume(final List data) {
+            if (data.size() >= maxBulkSize) {
+                // The data#size actually wouldn't over the maxBulkSize due to the DataCarrier channel's max size.
+                // This is just to preventing unexpected case and avoid confusion about dropping into else section.
+                bulkWriteProcessor.flush(data);
+                lastFlushTimestamp = System.currentTimeMillis();
+            } else {
+                data.forEach(element -> {
+                    cachedData.add(element);
+                    if (cachedData.size() >= maxBulkSize) {
+                        // Flush and re-init.
+                        bulkWriteProcessor.flush(cachedData);
+                        cachedData = new ArrayList(maxBulkSize);
+                        lastFlushTimestamp = System.currentTimeMillis();
+                    }
+                });
+            }
+        }
+
+        @Override
+        public void onError(final List data, final Throwable t) {
+
+        }
+
+        @Override
+        public void onExit() {
+
+        }
+
+        @Override
+        public void nothingToConsume() {
+            if (System.currentTimeMillis() - lastFlushTimestamp > flushInterval) {
+                bulkWriteProcessor.flush(cachedData);
+                cachedData = new ArrayList(maxBulkSize);
+                lastFlushTimestamp = System.currentTimeMillis();
+            }
+        }
+    }
+
+    /**
+     * @param data to be flush.
+     */
+    protected abstract void flush(List data);
+}
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Field.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Field.java
new file mode 100644
index 0000000..38e6e42
--- /dev/null
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Field.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import java.util.List;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+
+import static com.google.protobuf.NullValue.NULL_VALUE;
+
+/**
+ * WriteField represents a value of column/field in the write-op or response.
+ */
+public interface Field {
+    Banyandb.Field toField();
+
+    class NullField implements Field {
+
+        @Override
+        public Banyandb.Field toField() {
+            return Banyandb.Field.newBuilder().setNull(NULL_VALUE).build();
+        }
+    }
+
+    /**
+     * The value of a String type field.
+     */
+    @RequiredArgsConstructor
+    @Getter
+    class StringField implements Field {
+        protected final String value;
+
+        @Override
+        public Banyandb.Field toField() {
+            return Banyandb.Field.newBuilder().setStr(Banyandb.Str.newBuilder().setValue(value)).build();
+        }
+    }
+
+    /**
+     * The value of a String array type field.
+     */
+    @RequiredArgsConstructor
+    @Getter
+    class StringArrayField implements Field {
+        protected final List<String> value;
+
+        @Override
+        public Banyandb.Field toField() {
+            return Banyandb.Field.newBuilder().setStrArray(Banyandb.StrArray.newBuilder().addAllValue(value)).build();
+        }
+    }
+
+    /**
+     * The value of an int64(Long) type field.
+     */
+    @RequiredArgsConstructor
+    @Getter
+    class LongField implements Field {
+        protected final Long value;
+
+        @Override
+        public Banyandb.Field toField() {
+            return Banyandb.Field.newBuilder().setInt(Banyandb.Int.newBuilder().setValue(value)).build();
+        }
+    }
+
+    /**
+     * The value of an int64(Long) array type field.
+     */
+    @RequiredArgsConstructor
+    @Getter
+    class LongArrayField implements Field {
+        protected final List<Long> value;
+
+        @Override
+        public Banyandb.Field toField() {
+            return Banyandb.Field.newBuilder().setIntArray(Banyandb.IntArray.newBuilder().addAllValue(value)).build();
+        }
+    }
+}
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/FieldAndValue.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/FieldAndValue.java
new file mode 100644
index 0000000..0c1e13e
--- /dev/null
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/FieldAndValue.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import java.util.List;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+
+public interface FieldAndValue extends Field {
+    /**
+     * @return field name
+     */
+    String getFieldName();
+
+    /**
+     * @return true if value is null;
+     */
+    boolean isNull();
+
+    static FieldAndValue build(Banyandb.TypedPair typedPair) {
+        if (typedPair.hasIntPair()) {
+            final Banyandb.IntPair intPair = typedPair.getIntPair();
+            if (intPair.getIsNull()) {
+                return new LongFieldPair(intPair.getKey(), null);
+            } else {
+                return new LongFieldPair(intPair.getKey(), intPair.getValue());
+            }
+        } else if (typedPair.hasStrPair()) {
+            final Banyandb.StrPair strPair = typedPair.getStrPair();
+            if (strPair.getIsNull()) {
+                return new StringFieldPair(strPair.getKey(), null);
+            } else {
+                return new StringFieldPair(strPair.getKey(), strPair.getValue());
+            }
+        } else if (typedPair.hasIntArrayPair()) {
+            final Banyandb.IntArrayPair intArrayPair = typedPair.getIntArrayPair();
+            if (intArrayPair.getIsNull()) {
+                return new LongArrayFieldPair(intArrayPair.getKey(), null);
+            } else {
+                return new LongArrayFieldPair(intArrayPair.getKey(), intArrayPair.getValueList());
+            }
+        } else if (typedPair.hasStrArrayPair()) {
+            final Banyandb.StrArrayPair strArrayPair = typedPair.getStrArrayPair();
+            if (strArrayPair.getIsNull()) {
+                return new StringArrayFieldPair(strArrayPair.getKey(), null);
+            } else {
+                return new StringArrayFieldPair(strArrayPair.getKey(), strArrayPair.getValueList());
+            }
+        }
+        throw new IllegalArgumentException("Unrecognized TypedPair, " + typedPair);
+    }
+
+    class StringFieldPair extends StringField implements FieldAndValue {
+        private final String fieldName;
+
+        StringFieldPair(final String fieldName, final String value) {
+            super(value);
+            this.fieldName = fieldName;
+        }
+
+        @Override
+        public String getFieldName() {
+            return fieldName;
+        }
+
+        @Override
+        public boolean isNull() {
+            return value == null;
+        }
+    }
+
+    class StringArrayFieldPair extends StringArrayField implements FieldAndValue {
+        private final String fieldName;
+
+        StringArrayFieldPair(final String fieldName, final List<String> value) {
+            super(value);
+            this.fieldName = fieldName;
+        }
+
+        @Override
+        public String getFieldName() {
+            return fieldName;
+        }
+
+        @Override
+        public boolean isNull() {
+            return value == null;
+        }
+    }
+
+    class LongFieldPair extends LongField implements FieldAndValue {
+        private final String fieldName;
+
+        LongFieldPair(final String fieldName, final Long value) {
+            super(value);
+            this.fieldName = fieldName;
+        }
+
+        @Override
+        public String getFieldName() {
+            return null;
+        }
+
+        @Override
+        public boolean isNull() {
+            return value == null;
+        }
+    }
+
+    class LongArrayFieldPair extends LongArrayField implements FieldAndValue {
+        private final String fieldName;
+
+        LongArrayFieldPair(final String fieldName, final List<Long> value) {
+            super(value);
+            this.fieldName = fieldName;
+        }
+
+        @Override
+        public String getFieldName() {
+            return fieldName;
+        }
+
+        @Override
+        public boolean isNull() {
+            return value == null;
+        }
+    }
+}
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
new file mode 100644
index 0000000..89b8038
--- /dev/null
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * Client connection options.
+ */
+@Setter
+@Getter(AccessLevel.PACKAGE)
+public class Options {
+    /**
+     * Max inbound message size
+     */
+    private int maxInboundMessageSize = 1024 * 1024 * 50;
+    /**
+     * Threshold of gRPC blocking query, unit is second
+     */
+    private int deadline = 30;
+
+    Options() {
+    }
+
+}
\ No newline at end of file
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java
new file mode 100644
index 0000000..9276e0e
--- /dev/null
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import org.apache.skywalking.banyandb.v1.Banyandb;
+
+import static org.apache.skywalking.banyandb.v1.Banyandb.PairQuery.BinaryOp.BINARY_OP_EQ;
+
+/**
+ * PairQuery represents a query condition, including field name, operator, and value(s);
+ */
+public abstract class PairQueryCondition {
+    protected String fieldName;
+
+    protected abstract Banyandb.PairQuery build();
+
+    /**
+     * LongEqual represents `Field == value` condition.
+     */
+    public class LongEqual extends PairQueryCondition {
+        private final long value;
+
+        public LongEqual(String fieldName, long value) {
+            this.fieldName = fieldName;
+            this.value = value;
+        }
+
+        @Override
+        protected Banyandb.PairQuery build() {
+            return Banyandb.PairQuery.newBuilder()
+                                     .setOp(BINARY_OP_EQ)
+                                     .setCondition(
+                                         Banyandb.TypedPair.newBuilder()
+                                                           .setIntPair(
+                                                               Banyandb.IntPair.newBuilder()
+                                                                               .setKey(fieldName)
+                                                                               .setValue(value)))
+                                     .build();
+        }
+    }
+
+    //TODO, Add all conditions.
+}
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/RowEntity.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/RowEntity.java
new file mode 100644
index 0000000..4f95faa
--- /dev/null
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/RowEntity.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import java.util.ArrayList;
+import java.util.List;
+import lombok.Getter;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+
+/**
+ * RowEntity represents an entity of BanyanDB entity.
+ */
+@Getter
+public class RowEntity {
+    private final String id;
+    private final long timestamp;
+    private final byte[] binary;
+    private final List<FieldAndValue> fields;
+
+    RowEntity(BanyandbTrace.Entity entity) {
+        id = entity.getEntityId();
+        timestamp = entity.getTimestamp().getSeconds() * 1000 + entity.getTimestamp().getNanos() / 1000;
+        binary = entity.getDataBinary().toByteArray();
+        fields = new ArrayList<>(entity.getFieldsCount());
+        entity.getFieldsList().forEach(field -> fields.add(FieldAndValue.build(field)));
+    }
+}
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TimestampRange.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TimestampRange.java
new file mode 100644
index 0000000..2259068
--- /dev/null
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TimestampRange.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import com.google.protobuf.Timestamp;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+
+@RequiredArgsConstructor
+@Getter(AccessLevel.PROTECTED)
+public class TimestampRange {
+    private final long begin;
+    private final long end;
+
+    /**
+     * @return TimeRange accordingly.
+     */
+    Banyandb.TimeRange build() {
+        final Banyandb.TimeRange.Builder builder = Banyandb.TimeRange.newBuilder();
+        builder.setBegin(Timestamp.newBuilder()
+                                  .setSeconds(begin / 1000)
+                                  .setNanos((int) (begin % 1000 * 1000)));
+        builder.setBegin(Timestamp.newBuilder()
+                                  .setSeconds(end / 1000)
+                                  .setNanos((int) (end % 1000 * 1000)));
+        return builder.build();
+    }
+}
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java
new file mode 100644
index 0000000..5bab418
--- /dev/null
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import io.grpc.stub.StreamObserver;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+import org.apache.skywalking.banyandb.v1.trace.TraceServiceGrpc;
+
+/**
+ * TraceWriteProcessor works for trace flush.
+ */
+@Slf4j
+public class TraceBulkWriteProcessor extends BulkWriteProcessor {
+    /**
+     * The BanyanDB instance name.
+     */
+    private final String group;
+    private TraceServiceGrpc.TraceServiceStub traceServiceStub;
+
+    /**
+     * Create the processor.
+     *
+     * @param traceServiceStub stub for gRPC call.
+     * @param maxBulkSize      the max bulk size for the flush operation
+     * @param flushInterval    if given maxBulkSize is not reached in this period, the flush would be trigger
+     *                         automatically. Unit is second.
+     * @param concurrency      the number of concurrency would run for the flush max.
+     */
+    protected TraceBulkWriteProcessor(final String group,
+                                      final TraceServiceGrpc.TraceServiceStub traceServiceStub,
+                                      final int maxBulkSize,
+                                      final int flushInterval,
+                                      final int concurrency) {
+        super("TraceBulkWriteProcessor", maxBulkSize, flushInterval, concurrency);
+        this.group = group;
+        this.traceServiceStub = traceServiceStub;
+    }
+
+    /**
+     * Add the trace to the bulk processor.
+     *
+     * @param traceWrite to add.
+     */
+    public void add(TraceWrite traceWrite) {
+        this.buffer.produce(traceWrite);
+    }
+
+    @Override
+    protected void flush(final List data) {
+        final StreamObserver<BanyandbTrace.WriteRequest> writeRequestStreamObserver
+            = traceServiceStub.withDeadlineAfter(
+                                  flushInterval, TimeUnit.SECONDS)
+                              .write(
+                                  new StreamObserver<BanyandbTrace.WriteResponse>() {
+                                      @Override
+                                      public void onNext(
+                                          BanyandbTrace.WriteResponse writeResponse) {
+                                      }
+
+                                      @Override
+                                      public void onError(
+                                          Throwable throwable) {
+                                          log.error(
+                                              "Error occurs in flushing traces.",
+                                              throwable
+                                          );
+                                      }
+
+                                      @Override
+                                      public void onCompleted() {
+                                      }
+                                  });
+        try {
+            data.forEach(write -> {
+                final TraceWrite traceWrite = (TraceWrite) write;
+                BanyandbTrace.WriteRequest request = traceWrite.build(group);
+                writeRequestStreamObserver.onNext(request);
+            });
+        } finally {
+            writeRequestStreamObserver.onCompleted();
+        }
+    }
+}
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQuery.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQuery.java
new file mode 100644
index 0000000..5f84a99
--- /dev/null
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQuery.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import java.util.ArrayList;
+import java.util.List;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+
+/**
+ * TraceQuery is the high-level query API for the trace model.
+ */
+@Setter
+public class TraceQuery {
+    /**
+     * Owner name current entity
+     */
+    private final String name;
+    /**
+     * The time range for query.
+     */
+    private final TimestampRange timestampRange;
+    /**
+     * The projections of query result. These should have defined in the schema.
+     */
+    private final List<String> projections;
+    /**
+     * Query conditions.
+     */
+    private final List<PairQueryCondition> conditions;
+    /**
+     * The starting row id of the query. Default value is 0.
+     */
+    private int offset;
+    /**
+     * The limit size of the query. Default value is 20.
+     */
+    private int limit;
+    /**
+     * One order condition is supported and optional.
+     */
+    private OrderBy orderBy;
+
+    public TraceQuery(final String name, final TimestampRange timestampRange, final List<String> projections) {
+        this.name = name;
+        this.timestampRange = timestampRange;
+        this.projections = projections;
+        this.conditions = new ArrayList<>(10);
+        this.offset = 0;
+        this.limit = 20;
+    }
+
+    /**
+     * @param group The instance name.
+     * @return QueryRequest for gRPC level query.
+     */
+    BanyandbTrace.QueryRequest build(String group) {
+        final BanyandbTrace.QueryRequest.Builder builder = BanyandbTrace.QueryRequest.newBuilder();
+        builder.setMetadata(Banyandb.Metadata.newBuilder()
+                                             .setGroup(group)
+                                             .setName(name)
+                                             .build());
+        builder.setTimeRange(timestampRange.build());
+        builder.setProjection(Banyandb.Projection.newBuilder().addAllKeyNames(projections).build());
+        conditions.forEach(pairQueryCondition -> builder.addFields(pairQueryCondition.build()));
+        builder.setOffset(offset);
+        builder.setLimit(limit);
+        if (orderBy != null) {
+            builder.setOrderBy(orderBy.build());
+        }
+        return builder.build();
+    }
+
+    @RequiredArgsConstructor
+    public static class OrderBy {
+        /**
+         * The field name for ordering.
+         */
+        private final String fieldName;
+        /**
+         * The type of ordering.
+         */
+        private final Type type;
+
+        private Banyandb.QueryOrder build() {
+            final Banyandb.QueryOrder.Builder builder = Banyandb.QueryOrder.newBuilder();
+            builder.setKeyName(fieldName);
+            builder.setSort(
+                Type.DESC.equals(type) ? Banyandb.QueryOrder.Sort.SORT_DESC : Banyandb.QueryOrder.Sort.SORT_ASC);
+            return builder.build();
+        }
+
+        public enum Type {
+            ASC, DESC
+        }
+    }
+}
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQueryResponse.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQueryResponse.java
new file mode 100644
index 0000000..26592fa
--- /dev/null
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQueryResponse.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import java.util.ArrayList;
+import java.util.List;
+import lombok.Getter;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+
+/**
+ * TraceQueryResponse represents the trace query result.
+ */
+public class TraceQueryResponse {
+    @Getter
+    private List<RowEntity> entities;
+
+    TraceQueryResponse(BanyandbTrace.QueryResponse response) {
+        final List<BanyandbTrace.Entity> entitiesList = response.getEntitiesList();
+        entities = new ArrayList<>(entitiesList.size());
+        entitiesList.forEach(entity -> entities.add(new RowEntity(entity)));
+    }
+
+    /**
+     * @return size of the response set.
+     */
+    public int size() {
+        return entities.size();
+    }
+}
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java
new file mode 100644
index 0000000..d1023f2
--- /dev/null
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Timestamp;
+import java.util.List;
+import lombok.AccessLevel;
+import lombok.Builder;
+import lombok.Getter;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+
+/**
+ * TraceWrite represents a write operation, including necessary fields, for {@link
+ * BanyanDBClient#buildTraceWriteProcessor}.
+ */
+@Builder
+@Getter(AccessLevel.PROTECTED)
+public class TraceWrite {
+    /**
+     * Owner name current entity
+     */
+    private final String name;
+    /**
+     * ID of current entity
+     */
+    private final String entityId;
+    /**
+     * Timestamp represents the time of current trace or trace segment.
+     */
+    private final long timestamp;
+    /**
+     * The binary raw data represents the whole object of current trace or trace segment. It could be organized by
+     * different serialization formats. Natively, SkyWalking uses protobuf, but it is not required. The BanyanDB server
+     * wouldn't deserialize this. So, no format requirement.
+     */
+    private final byte[] binary;
+    /**
+     * The values of fields, which are defined by the schema. In the bulk write process, BanyanDB client doesn't require
+     * field names anymore.
+     */
+    private final List<Field> fields;
+
+    /**
+     * @param group of the BanyanDB client connected.
+     * @return {@link BanyandbTrace.WriteRequest} for the bulk process.
+     */
+    BanyandbTrace.WriteRequest build(String group) {
+        final BanyandbTrace.WriteRequest.Builder builder = BanyandbTrace.WriteRequest.newBuilder();
+        builder.setMetadata(Banyandb.Metadata.newBuilder().setGroup(group).setName(name).build());
+        final BanyandbTrace.EntityValue.Builder entityBuilder = BanyandbTrace.EntityValue.newBuilder();
+        entityBuilder.setEntityId(entityId);
+        entityBuilder.setTimestamp(Timestamp.newBuilder()
+                                            .setSeconds(timestamp / 1000)
+                                            .setNanos((int) (timestamp % 1000 * 1000)));
+        entityBuilder.setDataBinary(ByteString.copyFrom(binary));
+        fields.forEach(writeField -> entityBuilder.addFields(writeField.toField()));
+        builder.setEntity(entityBuilder.build());
+        return builder.build();
+    }
+}
diff --git a/oap-server/banyandb-java-client/src/main/proto/banyandb/v1/banyandb-trace.proto b/oap-server/banyandb-java-client/src/main/proto/banyandb/v1/banyandb-trace.proto
new file mode 100644
index 0000000..d59798e
--- /dev/null
+++ b/oap-server/banyandb-java-client/src/main/proto/banyandb/v1/banyandb-trace.proto
@@ -0,0 +1,107 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto3";
+
+option java_package = "org.apache.skywalking.banyandb.v1.trace";
+
+package banyandb.v1.trace;
+
+import "google/protobuf/timestamp.proto";
+import "google/protobuf/struct.proto";
+import "banyandb/v1/banyandb.proto";
+
+service TraceService {
+  rpc Query(banyandb.v1.trace.QueryRequest) returns (banyandb.v1.trace.QueryResponse);
+  rpc Write(stream banyandb.v1.trace.WriteRequest) returns (stream banyandb.v1.trace.WriteResponse);
+}
+
+// QueryRequest is the request contract for query.
+message QueryRequest {
+  // metadata is required
+  Metadata metadata = 1;
+  // time_range is a range query with begin/end time of entities in the timeunit of nanoseconds.
+  // In the context of Trace, it represents the range of the `startTime` for spans/segments,
+  // while in the context of Log, it means the range of the timestamp(s) for logs.
+  // it is always recommended to specify time range for performance reason
+  TimeRange time_range = 2;
+  // offset is used to support pagination, together with the following limit
+  uint32 offset = 3;
+  // limit is used to impose a boundary on the number of records being returned
+  uint32 limit = 4;
+  // order_by is given to specify the sort for a field. So far, only fields in the type of Integer are supported
+  QueryOrder order_by = 5;
+  // fields are indexed. Some typical fields are listed below,
+  // - trace_id: if given, it takes precedence over other fields and will be used to retrieve entities before other conditions are imposed
+  // - duration: typical for trace context
+  repeated PairQuery fields = 6;
+  // projection can be used to select the key names of the entities in the response
+  Projection projection = 7;
+}
+
+// QueryResponse is the response for a query to the Query module.
+message QueryResponse {
+  // entities are the actual data returned
+  repeated Entity entities = 1;
+}
+
+// Entity represents
+// (Trace context) a Span defined in Google Dapper paper or equivalently a Segment in Skywalking.
+// (Log context) a log
+message Entity {
+  // entity_id could be span_id of a Span or segment_id of a Segment in the context of Trace
+  string entity_id = 1;
+  // timestamp represents
+  // 1) either the start time of a Span/Segment,
+  // 2) or the timestamp of a log
+  google.protobuf.Timestamp timestamp = 2;
+  // data_binary contains all un-indexed Tags and other key-value pairs
+  bytes data_binary = 3;
+  // fields contains all indexed Field. Some typical names,
+  // - trace_id
+  // - duration
+  // - service_name
+  // - service_instance_id
+  // - end_time_nanoseconds
+  repeated TypedPair fields = 4;
+}
+
+
+message WriteRequest {
+  // the metadata is only required in the first write.
+  Metadata metadata = 1;
+  // the entity is required.
+  EntityValue entity = 2;
+}
+
+message WriteResponse {}
+
+message EntityValue {
+  // entity_id could be span_id of a Span or segment_id of a Segment in the context of Trace
+  string entity_id = 1;
+  // timestamp_nanoseconds is in the timeunit of nanoseconds. It represents
+  // 1) either the start time of a Span/Segment,
+  // 2) or the timestamp of a log
+  google.protobuf.Timestamp timestamp = 2;
+  // binary representation of segments, including tags, spans...
+  bytes data_binary = 3;
+  // support all of indexed fields in the fields.
+  // Pair only has value, as the value of PairValue match with the key
+  // by the index rules and index rule bindings of Metadata group.
+  // indexed fields of multiple entities are compression in the fields.
+  repeated Field fields = 4;
+}
\ No newline at end of file
diff --git a/oap-server/banyandb-java-client/src/main/proto/banyandb/v1/banyandb.proto b/oap-server/banyandb-java-client/src/main/proto/banyandb/v1/banyandb.proto
new file mode 100644
index 0000000..636aa3c
--- /dev/null
+++ b/oap-server/banyandb-java-client/src/main/proto/banyandb/v1/banyandb.proto
@@ -0,0 +1,148 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto3";
+
+option java_package = "org.apache.skywalking.banyandb.v1";
+
+package banyandb.v1;
+
+import "google/protobuf/timestamp.proto";
+import "google/protobuf/struct.proto";
+
+// Metadata is for multi-tenant, multi-model use
+message Metadata {
+  // group contains a set of options, like retention policy, max
+  string group = 1;
+  // name of the entity
+  string name = 2;
+}
+
+// IntPair in a typed pair with an int64 as values
+message IntPair {
+  string key = 1;
+  int64 value = 2;
+  bool isNull = 3;
+}
+
+// StrPair in a typed pair with a string as values
+message StrPair {
+  string key = 1;
+  string value = 2;
+  bool isNull = 3;
+}
+
+// IntPair in a typed pair with an array of int64 as values
+message IntArrayPair {
+  string key = 1;
+  repeated int64 value = 2;
+  bool isNull = 3;
+}
+
+// StrPair in a typed pair with an array of string as values
+message StrArrayPair {
+  string key = 1;
+  repeated string value = 2;
+  bool isNull = 3;
+}
+
+// Pair is the building block of a record which is equivalent to a key-value pair.
+// In the context of Trace, it could be metadata of a trace such as service_name, service_instance, etc.
+// Besides, other fields/tags are organized in key-value pair in the underlying storage layer.
+// One should notice that the values can be a multi-value.
+message TypedPair {
+  oneof typed {
+    IntPair int_pair = 1;
+    StrPair str_pair = 2;
+    IntArrayPair int_array_pair = 3;
+    StrArrayPair str_array_pair = 4;
+  }
+}
+
+// PairQuery consists of the query condition with a single binary operator to be imposed
+// For 1:1 BinaryOp, values in condition must be an array with length = 1,
+// while for 1:N BinaryOp, values can be an array with length >= 1.
+message PairQuery {
+  // BinaryOp specifies the operation imposed to the given query condition
+  // For EQ, NE, LT, GT, LE and GE, only one operand should be given, i.e. one-to-one relationship.
+  // HAVING and NOT_HAVING allow multi-value to be the operand such as array/vector, i.e. one-to-many relationship.
+  // For example, "keyA" contains "valueA" **and** "valueB"
+  enum BinaryOp {
+    BINARY_OP_UNSPECIFIED = 0;
+    BINARY_OP_EQ = 1;
+    BINARY_OP_NE = 2;
+    BINARY_OP_LT = 3;
+    BINARY_OP_GT = 4;
+    BINARY_OP_LE = 5;
+    BINARY_OP_GE = 6;
+    BINARY_OP_HAVING = 7;
+    BINARY_OP_NOT_HAVING = 8;
+  }
+  BinaryOp op = 1;
+  TypedPair condition = 2;
+}
+
+// QueryOrder means a Sort operation to be done for a given field.
+// The key_name refers to the key of a Pair.
+message QueryOrder {
+  string key_name = 1;
+  enum Sort {
+    SORT_UNSPECIFIED = 0;
+    SORT_DESC = 1;
+    SORT_ASC = 2;
+  }
+  Sort sort = 2;
+}
+
+// Projection is used to select the names of keys to be returned.
+message Projection {
+  // The key_name refers to the key(s) of Pair(s).
+  repeated string key_names = 1;
+}
+
+// TimeRange is a range query for uint64,
+// the range here follows left-inclusive and right-exclusive rule, i.e. [begin, end) if both edges exist
+message TimeRange {
+  google.protobuf.Timestamp begin = 1;
+  google.protobuf.Timestamp end = 2;
+}
+
+message Str {
+  string value = 1;
+}
+
+message Int {
+  int64 value = 1;
+}
+
+message StrArray {
+  repeated string value = 1;
+}
+
+message IntArray {
+  repeated int64 value = 1;
+}
+
+message Field {
+  oneof value_type {
+    google.protobuf.NullValue null = 1;
+    Str str = 2;
+    StrArray str_array = 3;
+    Int int = 4;
+    IntArray int_array = 5;
+  }
+}
\ No newline at end of file
diff --git a/oap-server/pom.xml b/oap-server/pom.xml
index 6318e02..a8eea42 100755
--- a/oap-server/pom.xml
+++ b/oap-server/pom.xml
@@ -48,6 +48,7 @@
         <module>server-tools</module>
         <module>server-fetcher-plugin</module>
         <module>server-health-checker</module>
+        <module>banyandb-java-client</module>
     </modules>
 
     <properties>