You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/08/13 12:29:31 UTC

[skywalking] branch banyandb-client-api updated: Finish the prototype of banyandb client api.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch banyandb-client-api
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/banyandb-client-api by this push:
     new ee51a1d  Finish the prototype of banyandb client api.
ee51a1d is described below

commit ee51a1d3b5682a8d85871988c299fcceb0b6887f
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Fri Aug 13 20:29:15 2021 +0800

    Finish the prototype of banyandb client api.
---
 oap-server/banyandb-java-client/pom.xml            |   1 +
 .../banyandb/v1/client/BanyanDBClient.java         |  24 +++-
 .../banyandb/v1/client/BulkWriteProcessor.java     |  13 +-
 .../v1/client/{WriteField.java => Field.java}      |  40 ++++--
 .../banyandb/v1/client/FieldAndValue.java          | 143 +++++++++++++++++++++
 .../skywalking/banyandb/v1/client/Options.java     |  10 +-
 .../banyandb/v1/client/PairQueryCondition.java     |  59 +++++++++
 .../v1/client/{Options.java => RowEntity.java}     |  24 ++--
 .../client/{Options.java => TimestampRange.java}   |  33 +++--
 .../v1/client/TraceBulkWriteProcessor.java         |  11 +-
 .../skywalking/banyandb/v1/client/TraceQuery.java  | 115 +++++++++++++++++
 .../{Options.java => TraceQueryResponse.java}      |  26 ++--
 .../skywalking/banyandb/v1/client/TraceWrite.java  |  14 +-
 .../src/main/proto/banyandb/v1/banyandb.proto      |  26 +++-
 14 files changed, 482 insertions(+), 57 deletions(-)

diff --git a/oap-server/banyandb-java-client/pom.xml b/oap-server/banyandb-java-client/pom.xml
index 3898053..17d85d6 100644
--- a/oap-server/banyandb-java-client/pom.xml
+++ b/oap-server/banyandb-java-client/pom.xml
@@ -47,6 +47,7 @@
         <dependency>
             <groupId>org.apache.skywalking</groupId>
             <artifactId>apm-datacarrier</artifactId>
+            <version>${project.version}</version>
         </dependency>
     </dependencies>
 
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
index 485c4c7..389ded8 100644
--- a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
@@ -23,9 +23,10 @@ import io.grpc.ManagedChannelBuilder;
 import io.grpc.NameResolverRegistry;
 import io.grpc.internal.DnsNameResolverProvider;
 import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
-import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
 import org.apache.skywalking.banyandb.v1.trace.TraceServiceGrpc;
 
 /**
@@ -49,8 +50,7 @@ public class BanyanDBClient {
     /**
      * Options for server connection.
      */
-    @Setter
-    private Options options = new Options();
+    private Options options;
     /**
      * Managed gRPC connection.
      */
@@ -60,6 +60,10 @@ public class BanyanDBClient {
      */
     private volatile TraceServiceGrpc.TraceServiceStub traceServiceStub;
     /**
+     * gRPC blocking stub.
+     */
+    private volatile TraceServiceGrpc.TraceServiceBlockingStub traceServiceBlockingStub;
+    /**
      * The connection status.
      */
     private volatile boolean isConnected = false;
@@ -114,6 +118,8 @@ public class BanyanDBClient {
 
                 managedChannel = nettyChannelBuilder.build();
                 traceServiceStub = TraceServiceGrpc.newStub(managedChannel);
+                traceServiceBlockingStub = TraceServiceGrpc.newBlockingStub(
+                    managedChannel);
                 isConnected = true;
             }
         } finally {
@@ -134,4 +140,16 @@ public class BanyanDBClient {
         return new TraceBulkWriteProcessor(group, traceServiceStub, maxBulkSize, flushInterval, concurrency);
     }
 
+    /**
+     * Query trace according to given conditions
+     *
+     * @param traceQuery condition for query
+     * @return hint traces.
+     */
+    public TraceQueryResponse queryTraces(TraceQuery traceQuery) {
+        final BanyandbTrace.QueryResponse response = traceServiceBlockingStub
+            .withDeadlineAfter(options.getDeadline(), TimeUnit.SECONDS)
+            .query(traceQuery.build(group));
+        return new TraceQueryResponse(response);
+    }
 }
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java
index a522b76..fc5bce5 100644
--- a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java
@@ -32,7 +32,7 @@ import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
  */
 public abstract class BulkWriteProcessor {
     protected final int flushInterval;
-    private DataCarrier queue;
+    protected DataCarrier buffer;
 
     /**
      * Create the processor.
@@ -44,12 +44,12 @@ public abstract class BulkWriteProcessor {
      */
     protected BulkWriteProcessor(String processorName, int maxBulkSize, int flushInterval, int concurrency) {
         this.flushInterval = flushInterval;
-        this.queue = new DataCarrier(processorName, maxBulkSize, concurrency);
+        this.buffer = new DataCarrier(processorName, maxBulkSize, concurrency);
         Properties properties = new Properties();
         properties.put("maxBulkSize", maxBulkSize);
         properties.put("flushInterval", flushInterval);
         properties.put("BulkWriteProcessor", this);
-        queue.consume(QueueWatcher.class, concurrency);
+        buffer.consume(QueueWatcher.class, concurrency, 20, properties);
     }
 
     /**
@@ -63,11 +63,12 @@ public abstract class BulkWriteProcessor {
         private BulkWriteProcessor bulkWriteProcessor;
 
         @Override
-        public void init() {
+        public void init(Properties properties) {
             lastFlushTimestamp = System.currentTimeMillis();
-            //TODO: initialize maxBulkSize and flushInterval
-            flushInterval = flushInterval * 1000;
+            maxBulkSize = (Integer) properties.get("maxBulkSize");
+            flushInterval = (Integer) properties.get("flushInterval") * 1000;
             cachedData = new ArrayList(maxBulkSize);
+            bulkWriteProcessor = (BulkWriteProcessor) properties.get("BulkWriteProcessor");
         }
 
         @Override
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/WriteField.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Field.java
similarity index 63%
rename from oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/WriteField.java
rename to oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Field.java
index 5918533..38e6e42 100644
--- a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/WriteField.java
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Field.java
@@ -18,18 +18,20 @@
 
 package org.apache.skywalking.banyandb.v1.client;
 
+import java.util.List;
+import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import org.apache.skywalking.banyandb.v1.Banyandb;
 
 import static com.google.protobuf.NullValue.NULL_VALUE;
 
 /**
- * WriteField represents a value of column/field for a write-op value.
+ * WriteField represents a value of column/field in the write-op or response.
  */
-public interface WriteField {
+public interface Field {
     Banyandb.Field toField();
 
-    class NullField implements WriteField {
+    class NullField implements Field {
 
         @Override
         public Banyandb.Field toField() {
@@ -41,8 +43,9 @@ public interface WriteField {
      * The value of a String type field.
      */
     @RequiredArgsConstructor
-    class StringField implements WriteField {
-        private final String value;
+    @Getter
+    class StringField implements Field {
+        protected final String value;
 
         @Override
         public Banyandb.Field toField() {
@@ -54,12 +57,13 @@ public interface WriteField {
      * The value of a String array type field.
      */
     @RequiredArgsConstructor
-    class StringArrayField implements WriteField {
-        private final String[] value;
+    @Getter
+    class StringArrayField implements Field {
+        protected final List<String> value;
 
         @Override
         public Banyandb.Field toField() {
-            return null;
+            return Banyandb.Field.newBuilder().setStrArray(Banyandb.StrArray.newBuilder().addAllValue(value)).build();
         }
     }
 
@@ -67,15 +71,27 @@ public interface WriteField {
      * The value of an int64(Long) type field.
      */
     @RequiredArgsConstructor
-    class LongField {
-        private final long value;
+    @Getter
+    class LongField implements Field {
+        protected final Long value;
+
+        @Override
+        public Banyandb.Field toField() {
+            return Banyandb.Field.newBuilder().setInt(Banyandb.Int.newBuilder().setValue(value)).build();
+        }
     }
 
     /**
      * The value of an int64(Long) array type field.
      */
     @RequiredArgsConstructor
-    class LongArrayField {
-        private final long[] value;
+    @Getter
+    class LongArrayField implements Field {
+        protected final List<Long> value;
+
+        @Override
+        public Banyandb.Field toField() {
+            return Banyandb.Field.newBuilder().setIntArray(Banyandb.IntArray.newBuilder().addAllValue(value)).build();
+        }
     }
 }
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/FieldAndValue.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/FieldAndValue.java
new file mode 100644
index 0000000..0c1e13e
--- /dev/null
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/FieldAndValue.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import java.util.List;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+
+public interface FieldAndValue extends Field {
+    /**
+     * @return field name
+     */
+    String getFieldName();
+
+    /**
+     * @return true if value is null;
+     */
+    boolean isNull();
+
+    static FieldAndValue build(Banyandb.TypedPair typedPair) {
+        if (typedPair.hasIntPair()) {
+            final Banyandb.IntPair intPair = typedPair.getIntPair();
+            if (intPair.getIsNull()) {
+                return new LongFieldPair(intPair.getKey(), null);
+            } else {
+                return new LongFieldPair(intPair.getKey(), intPair.getValue());
+            }
+        } else if (typedPair.hasStrPair()) {
+            final Banyandb.StrPair strPair = typedPair.getStrPair();
+            if (strPair.getIsNull()) {
+                return new StringFieldPair(strPair.getKey(), null);
+            } else {
+                return new StringFieldPair(strPair.getKey(), strPair.getValue());
+            }
+        } else if (typedPair.hasIntArrayPair()) {
+            final Banyandb.IntArrayPair intArrayPair = typedPair.getIntArrayPair();
+            if (intArrayPair.getIsNull()) {
+                return new LongArrayFieldPair(intArrayPair.getKey(), null);
+            } else {
+                return new LongArrayFieldPair(intArrayPair.getKey(), intArrayPair.getValueList());
+            }
+        } else if (typedPair.hasStrArrayPair()) {
+            final Banyandb.StrArrayPair strArrayPair = typedPair.getStrArrayPair();
+            if (strArrayPair.getIsNull()) {
+                return new StringArrayFieldPair(strArrayPair.getKey(), null);
+            } else {
+                return new StringArrayFieldPair(strArrayPair.getKey(), strArrayPair.getValueList());
+            }
+        }
+        throw new IllegalArgumentException("Unrecognized TypedPair, " + typedPair);
+    }
+
+    class StringFieldPair extends StringField implements FieldAndValue {
+        private final String fieldName;
+
+        StringFieldPair(final String fieldName, final String value) {
+            super(value);
+            this.fieldName = fieldName;
+        }
+
+        @Override
+        public String getFieldName() {
+            return fieldName;
+        }
+
+        @Override
+        public boolean isNull() {
+            return value == null;
+        }
+    }
+
+    class StringArrayFieldPair extends StringArrayField implements FieldAndValue {
+        private final String fieldName;
+
+        StringArrayFieldPair(final String fieldName, final List<String> value) {
+            super(value);
+            this.fieldName = fieldName;
+        }
+
+        @Override
+        public String getFieldName() {
+            return fieldName;
+        }
+
+        @Override
+        public boolean isNull() {
+            return value == null;
+        }
+    }
+
+    class LongFieldPair extends LongField implements FieldAndValue {
+        private final String fieldName;
+
+        LongFieldPair(final String fieldName, final Long value) {
+            super(value);
+            this.fieldName = fieldName;
+        }
+
+        @Override
+        public String getFieldName() {
+            return null;
+        }
+
+        @Override
+        public boolean isNull() {
+            return value == null;
+        }
+    }
+
+    class LongArrayFieldPair extends LongArrayField implements FieldAndValue {
+        private final String fieldName;
+
+        LongArrayFieldPair(final String fieldName, final List<Long> value) {
+            super(value);
+            this.fieldName = fieldName;
+        }
+
+        @Override
+        public String getFieldName() {
+            return fieldName;
+        }
+
+        @Override
+        public boolean isNull() {
+            return value == null;
+        }
+    }
+}
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
index 5e9f48c..89b8038 100644
--- a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.banyandb.v1.client;
 
+import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.Setter;
 
@@ -25,9 +26,16 @@ import lombok.Setter;
  * Client connection options.
  */
 @Setter
-@Getter
+@Getter(AccessLevel.PACKAGE)
 public class Options {
+    /**
+     * Max inbound message size
+     */
     private int maxInboundMessageSize = 1024 * 1024 * 50;
+    /**
+     * Threshold of gRPC blocking query, unit is second
+     */
+    private int deadline = 30;
 
     Options() {
     }
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java
new file mode 100644
index 0000000..9276e0e
--- /dev/null
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import org.apache.skywalking.banyandb.v1.Banyandb;
+
+import static org.apache.skywalking.banyandb.v1.Banyandb.PairQuery.BinaryOp.BINARY_OP_EQ;
+
+/**
+ * PairQuery represents a query condition, including field name, operator, and value(s);
+ */
+public abstract class PairQueryCondition {
+    protected String fieldName;
+
+    protected abstract Banyandb.PairQuery build();
+
+    /**
+     * LongEqual represents `Field == value` condition.
+     */
+    public class LongEqual extends PairQueryCondition {
+        private final long value;
+
+        public LongEqual(String fieldName, long value) {
+            this.fieldName = fieldName;
+            this.value = value;
+        }
+
+        @Override
+        protected Banyandb.PairQuery build() {
+            return Banyandb.PairQuery.newBuilder()
+                                     .setOp(BINARY_OP_EQ)
+                                     .setCondition(
+                                         Banyandb.TypedPair.newBuilder()
+                                                           .setIntPair(
+                                                               Banyandb.IntPair.newBuilder()
+                                                                               .setKey(fieldName)
+                                                                               .setValue(value)))
+                                     .build();
+        }
+    }
+
+    //TODO, Add all conditions.
+}
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/RowEntity.java
similarity index 55%
copy from oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
copy to oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/RowEntity.java
index 5e9f48c..4f95faa 100644
--- a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/RowEntity.java
@@ -18,18 +18,26 @@
 
 package org.apache.skywalking.banyandb.v1.client;
 
+import java.util.ArrayList;
+import java.util.List;
 import lombok.Getter;
-import lombok.Setter;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
 
 /**
- * Client connection options.
+ * RowEntity represents an entity of BanyanDB entity.
  */
-@Setter
 @Getter
-public class Options {
-    private int maxInboundMessageSize = 1024 * 1024 * 50;
+public class RowEntity {
+    private final String id;
+    private final long timestamp;
+    private final byte[] binary;
+    private final List<FieldAndValue> fields;
 
-    Options() {
+    RowEntity(BanyandbTrace.Entity entity) {
+        id = entity.getEntityId();
+        timestamp = entity.getTimestamp().getSeconds() * 1000 + entity.getTimestamp().getNanos() / 1000;
+        binary = entity.getDataBinary().toByteArray();
+        fields = new ArrayList<>(entity.getFieldsCount());
+        entity.getFieldsList().forEach(field -> fields.add(FieldAndValue.build(field)));
     }
-
-}
\ No newline at end of file
+}
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TimestampRange.java
similarity index 50%
copy from oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
copy to oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TimestampRange.java
index 5e9f48c..2259068 100644
--- a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TimestampRange.java
@@ -18,18 +18,29 @@
 
 package org.apache.skywalking.banyandb.v1.client;
 
+import com.google.protobuf.Timestamp;
+import lombok.AccessLevel;
 import lombok.Getter;
-import lombok.Setter;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.banyandb.v1.Banyandb;
 
-/**
- * Client connection options.
- */
-@Setter
-@Getter
-public class Options {
-    private int maxInboundMessageSize = 1024 * 1024 * 50;
+@RequiredArgsConstructor
+@Getter(AccessLevel.PROTECTED)
+public class TimestampRange {
+    private final long begin;
+    private final long end;
 
-    Options() {
+    /**
+     * @return TimeRange accordingly.
+     */
+    Banyandb.TimeRange build() {
+        final Banyandb.TimeRange.Builder builder = Banyandb.TimeRange.newBuilder();
+        builder.setBegin(Timestamp.newBuilder()
+                                  .setSeconds(begin / 1000)
+                                  .setNanos((int) (begin % 1000 * 1000)));
+        builder.setBegin(Timestamp.newBuilder()
+                                  .setSeconds(end / 1000)
+                                  .setNanos((int) (end % 1000 * 1000)));
+        return builder.build();
     }
-
-}
\ No newline at end of file
+}
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java
index 409ac97..5bab418 100644
--- a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java
@@ -31,7 +31,7 @@ import org.apache.skywalking.banyandb.v1.trace.TraceServiceGrpc;
 @Slf4j
 public class TraceBulkWriteProcessor extends BulkWriteProcessor {
     /**
-     * The instance name.
+     * The BanyanDB instance name.
      */
     private final String group;
     private TraceServiceGrpc.TraceServiceStub traceServiceStub;
@@ -55,6 +55,15 @@ public class TraceBulkWriteProcessor extends BulkWriteProcessor {
         this.traceServiceStub = traceServiceStub;
     }
 
+    /**
+     * Add the trace to the bulk processor.
+     *
+     * @param traceWrite to add.
+     */
+    public void add(TraceWrite traceWrite) {
+        this.buffer.produce(traceWrite);
+    }
+
     @Override
     protected void flush(final List data) {
         final StreamObserver<BanyandbTrace.WriteRequest> writeRequestStreamObserver
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQuery.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQuery.java
new file mode 100644
index 0000000..5f84a99
--- /dev/null
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQuery.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import java.util.ArrayList;
+import java.util.List;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+
+/**
+ * TraceQuery is the high-level query API for the trace model.
+ */
+@Setter
+public class TraceQuery {
+    /**
+     * Owner name current entity
+     */
+    private final String name;
+    /**
+     * The time range for query.
+     */
+    private final TimestampRange timestampRange;
+    /**
+     * The projections of query result. These should have defined in the schema.
+     */
+    private final List<String> projections;
+    /**
+     * Query conditions.
+     */
+    private final List<PairQueryCondition> conditions;
+    /**
+     * The starting row id of the query. Default value is 0.
+     */
+    private int offset;
+    /**
+     * The limit size of the query. Default value is 20.
+     */
+    private int limit;
+    /**
+     * One order condition is supported and optional.
+     */
+    private OrderBy orderBy;
+
+    public TraceQuery(final String name, final TimestampRange timestampRange, final List<String> projections) {
+        this.name = name;
+        this.timestampRange = timestampRange;
+        this.projections = projections;
+        this.conditions = new ArrayList<>(10);
+        this.offset = 0;
+        this.limit = 20;
+    }
+
+    /**
+     * @param group The instance name.
+     * @return QueryRequest for gRPC level query.
+     */
+    BanyandbTrace.QueryRequest build(String group) {
+        final BanyandbTrace.QueryRequest.Builder builder = BanyandbTrace.QueryRequest.newBuilder();
+        builder.setMetadata(Banyandb.Metadata.newBuilder()
+                                             .setGroup(group)
+                                             .setName(name)
+                                             .build());
+        builder.setTimeRange(timestampRange.build());
+        builder.setProjection(Banyandb.Projection.newBuilder().addAllKeyNames(projections).build());
+        conditions.forEach(pairQueryCondition -> builder.addFields(pairQueryCondition.build()));
+        builder.setOffset(offset);
+        builder.setLimit(limit);
+        if (orderBy != null) {
+            builder.setOrderBy(orderBy.build());
+        }
+        return builder.build();
+    }
+
+    @RequiredArgsConstructor
+    public static class OrderBy {
+        /**
+         * The field name for ordering.
+         */
+        private final String fieldName;
+        /**
+         * The type of ordering.
+         */
+        private final Type type;
+
+        private Banyandb.QueryOrder build() {
+            final Banyandb.QueryOrder.Builder builder = Banyandb.QueryOrder.newBuilder();
+            builder.setKeyName(fieldName);
+            builder.setSort(
+                Type.DESC.equals(type) ? Banyandb.QueryOrder.Sort.SORT_DESC : Banyandb.QueryOrder.Sort.SORT_ASC);
+            return builder.build();
+        }
+
+        public enum Type {
+            ASC, DESC
+        }
+    }
+}
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQueryResponse.java
similarity index 57%
copy from oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
copy to oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQueryResponse.java
index 5e9f48c..26592fa 100644
--- a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQueryResponse.java
@@ -18,18 +18,28 @@
 
 package org.apache.skywalking.banyandb.v1.client;
 
+import java.util.ArrayList;
+import java.util.List;
 import lombok.Getter;
-import lombok.Setter;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
 
 /**
- * Client connection options.
+ * TraceQueryResponse represents the trace query result.
  */
-@Setter
-@Getter
-public class Options {
-    private int maxInboundMessageSize = 1024 * 1024 * 50;
+public class TraceQueryResponse {
+    @Getter
+    private List<RowEntity> entities;
 
-    Options() {
+    TraceQueryResponse(BanyandbTrace.QueryResponse response) {
+        final List<BanyandbTrace.Entity> entitiesList = response.getEntitiesList();
+        entities = new ArrayList<>(entitiesList.size());
+        entitiesList.forEach(entity -> entities.add(new RowEntity(entity)));
     }
 
-}
\ No newline at end of file
+    /**
+     * @return size of the response set.
+     */
+    public int size() {
+        return entities.size();
+    }
+}
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java
index f40c48b..d1023f2 100644
--- a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java
@@ -52,12 +52,20 @@ public class TraceWrite {
      * wouldn't deserialize this. So, no format requirement.
      */
     private final byte[] binary;
-    private final List<WriteField> fields;
+    /**
+     * The values of fields, which are defined by the schema. In the bulk write process, BanyanDB client doesn't require
+     * field names anymore.
+     */
+    private final List<Field> fields;
 
+    /**
+     * @param group of the BanyanDB client connected.
+     * @return {@link BanyandbTrace.WriteRequest} for the bulk process.
+     */
     BanyandbTrace.WriteRequest build(String group) {
         final BanyandbTrace.WriteRequest.Builder builder = BanyandbTrace.WriteRequest.newBuilder();
         builder.setMetadata(Banyandb.Metadata.newBuilder().setGroup(group).setName(name).build());
-        final Banyandb.EntityValue.Builder entityBuilder = Banyandb.EntityValue.newBuilder();
+        final BanyandbTrace.EntityValue.Builder entityBuilder = BanyandbTrace.EntityValue.newBuilder();
         entityBuilder.setEntityId(entityId);
         entityBuilder.setTimestamp(Timestamp.newBuilder()
                                             .setSeconds(timestamp / 1000)
@@ -65,6 +73,6 @@ public class TraceWrite {
         entityBuilder.setDataBinary(ByteString.copyFrom(binary));
         fields.forEach(writeField -> entityBuilder.addFields(writeField.toField()));
         builder.setEntity(entityBuilder.build());
-        return null;
+        return builder.build();
     }
 }
diff --git a/oap-server/banyandb-java-client/src/main/proto/banyandb/v1/banyandb.proto b/oap-server/banyandb-java-client/src/main/proto/banyandb/v1/banyandb.proto
index bcf30fa..636aa3c 100644
--- a/oap-server/banyandb-java-client/src/main/proto/banyandb/v1/banyandb.proto
+++ b/oap-server/banyandb-java-client/src/main/proto/banyandb/v1/banyandb.proto
@@ -32,16 +32,32 @@ message Metadata {
   string name = 2;
 }
 
-// IntPair in a typed pair with an array of int64 as values
+// IntPair in a typed pair with an int64 as values
 message IntPair {
   string key = 1;
-  repeated int64 values = 2;
+  int64 value = 2;
+  bool isNull = 3;
 }
 
-// StrPair in a typed pair with an array of string as values
+// StrPair in a typed pair with a string as values
 message StrPair {
   string key = 1;
-  repeated string values = 2;
+  string value = 2;
+  bool isNull = 3;
+}
+
+// IntPair in a typed pair with an array of int64 as values
+message IntArrayPair {
+  string key = 1;
+  repeated int64 value = 2;
+  bool isNull = 3;
+}
+
+// StrPair in a typed pair with an array of string as values
+message StrArrayPair {
+  string key = 1;
+  repeated string value = 2;
+  bool isNull = 3;
 }
 
 // Pair is the building block of a record which is equivalent to a key-value pair.
@@ -52,6 +68,8 @@ message TypedPair {
   oneof typed {
     IntPair int_pair = 1;
     StrPair str_pair = 2;
+    IntArrayPair int_array_pair = 3;
+    StrArrayPair str_array_pair = 4;
   }
 }