You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/08/13 12:29:31 UTC
[skywalking] branch banyandb-client-api updated: Finish the
prototype of banyandb client api.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch banyandb-client-api
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/banyandb-client-api by this push:
new ee51a1d Finish the prototype of banyandb client api.
ee51a1d is described below
commit ee51a1d3b5682a8d85871988c299fcceb0b6887f
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Fri Aug 13 20:29:15 2021 +0800
Finish the prototype of banyandb client api.
---
oap-server/banyandb-java-client/pom.xml | 1 +
.../banyandb/v1/client/BanyanDBClient.java | 24 +++-
.../banyandb/v1/client/BulkWriteProcessor.java | 13 +-
.../v1/client/{WriteField.java => Field.java} | 40 ++++--
.../banyandb/v1/client/FieldAndValue.java | 143 +++++++++++++++++++++
.../skywalking/banyandb/v1/client/Options.java | 10 +-
.../banyandb/v1/client/PairQueryCondition.java | 59 +++++++++
.../v1/client/{Options.java => RowEntity.java} | 24 ++--
.../client/{Options.java => TimestampRange.java} | 33 +++--
.../v1/client/TraceBulkWriteProcessor.java | 11 +-
.../skywalking/banyandb/v1/client/TraceQuery.java | 115 +++++++++++++++++
.../{Options.java => TraceQueryResponse.java} | 26 ++--
.../skywalking/banyandb/v1/client/TraceWrite.java | 14 +-
.../src/main/proto/banyandb/v1/banyandb.proto | 26 +++-
14 files changed, 482 insertions(+), 57 deletions(-)
diff --git a/oap-server/banyandb-java-client/pom.xml b/oap-server/banyandb-java-client/pom.xml
index 3898053..17d85d6 100644
--- a/oap-server/banyandb-java-client/pom.xml
+++ b/oap-server/banyandb-java-client/pom.xml
@@ -47,6 +47,7 @@
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-datacarrier</artifactId>
+ <version>${project.version}</version>
</dependency>
</dependencies>
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
index 485c4c7..389ded8 100644
--- a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
@@ -23,9 +23,10 @@ import io.grpc.ManagedChannelBuilder;
import io.grpc.NameResolverRegistry;
import io.grpc.internal.DnsNameResolverProvider;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
-import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
import org.apache.skywalking.banyandb.v1.trace.TraceServiceGrpc;
/**
@@ -49,8 +50,7 @@ public class BanyanDBClient {
/**
* Options for server connection.
*/
- @Setter
- private Options options = new Options();
+ private Options options;
/**
* Managed gRPC connection.
*/
@@ -60,6 +60,10 @@ public class BanyanDBClient {
*/
private volatile TraceServiceGrpc.TraceServiceStub traceServiceStub;
/**
+ * gRPC blocking stub.
+ */
+ private volatile TraceServiceGrpc.TraceServiceBlockingStub traceServiceBlockingStub;
+ /**
* The connection status.
*/
private volatile boolean isConnected = false;
@@ -114,6 +118,8 @@ public class BanyanDBClient {
managedChannel = nettyChannelBuilder.build();
traceServiceStub = TraceServiceGrpc.newStub(managedChannel);
+ traceServiceBlockingStub = TraceServiceGrpc.newBlockingStub(
+ managedChannel);
isConnected = true;
}
} finally {
@@ -134,4 +140,16 @@ public class BanyanDBClient {
return new TraceBulkWriteProcessor(group, traceServiceStub, maxBulkSize, flushInterval, concurrency);
}
+ /**
+ * Query trace according to given conditions
+ *
+ * @param traceQuery condition for query
+ * @return hint traces.
+ */
+ public TraceQueryResponse queryTraces(TraceQuery traceQuery) {
+ final BanyandbTrace.QueryResponse response = traceServiceBlockingStub
+ .withDeadlineAfter(options.getDeadline(), TimeUnit.SECONDS)
+ .query(traceQuery.build(group));
+ return new TraceQueryResponse(response);
+ }
}
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java
index a522b76..fc5bce5 100644
--- a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java
@@ -32,7 +32,7 @@ import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
*/
public abstract class BulkWriteProcessor {
protected final int flushInterval;
- private DataCarrier queue;
+ protected DataCarrier buffer;
/**
* Create the processor.
@@ -44,12 +44,12 @@ public abstract class BulkWriteProcessor {
*/
protected BulkWriteProcessor(String processorName, int maxBulkSize, int flushInterval, int concurrency) {
this.flushInterval = flushInterval;
- this.queue = new DataCarrier(processorName, maxBulkSize, concurrency);
+ this.buffer = new DataCarrier(processorName, maxBulkSize, concurrency);
Properties properties = new Properties();
properties.put("maxBulkSize", maxBulkSize);
properties.put("flushInterval", flushInterval);
properties.put("BulkWriteProcessor", this);
- queue.consume(QueueWatcher.class, concurrency);
+ buffer.consume(QueueWatcher.class, concurrency, 20, properties);
}
/**
@@ -63,11 +63,12 @@ public abstract class BulkWriteProcessor {
private BulkWriteProcessor bulkWriteProcessor;
@Override
- public void init() {
+ public void init(Properties properties) {
lastFlushTimestamp = System.currentTimeMillis();
- //TODO: initialize maxBulkSize and flushInterval
- flushInterval = flushInterval * 1000;
+ maxBulkSize = (Integer) properties.get("maxBulkSize");
+ flushInterval = (Integer) properties.get("flushInterval") * 1000;
cachedData = new ArrayList(maxBulkSize);
+ bulkWriteProcessor = (BulkWriteProcessor) properties.get("BulkWriteProcessor");
}
@Override
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/WriteField.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Field.java
similarity index 63%
rename from oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/WriteField.java
rename to oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Field.java
index 5918533..38e6e42 100644
--- a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/WriteField.java
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Field.java
@@ -18,18 +18,20 @@
package org.apache.skywalking.banyandb.v1.client;
+import java.util.List;
+import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.banyandb.v1.Banyandb;
import static com.google.protobuf.NullValue.NULL_VALUE;
/**
- * WriteField represents a value of column/field for a write-op value.
+ * WriteField represents a value of column/field in the write-op or response.
*/
-public interface WriteField {
+public interface Field {
Banyandb.Field toField();
- class NullField implements WriteField {
+ class NullField implements Field {
@Override
public Banyandb.Field toField() {
@@ -41,8 +43,9 @@ public interface WriteField {
* The value of a String type field.
*/
@RequiredArgsConstructor
- class StringField implements WriteField {
- private final String value;
+ @Getter
+ class StringField implements Field {
+ protected final String value;
@Override
public Banyandb.Field toField() {
@@ -54,12 +57,13 @@ public interface WriteField {
* The value of a String array type field.
*/
@RequiredArgsConstructor
- class StringArrayField implements WriteField {
- private final String[] value;
+ @Getter
+ class StringArrayField implements Field {
+ protected final List<String> value;
@Override
public Banyandb.Field toField() {
- return null;
+ return Banyandb.Field.newBuilder().setStrArray(Banyandb.StrArray.newBuilder().addAllValue(value)).build();
}
}
@@ -67,15 +71,27 @@ public interface WriteField {
* The value of an int64(Long) type field.
*/
@RequiredArgsConstructor
- class LongField {
- private final long value;
+ @Getter
+ class LongField implements Field {
+ protected final Long value;
+
+ @Override
+ public Banyandb.Field toField() {
+ return Banyandb.Field.newBuilder().setInt(Banyandb.Int.newBuilder().setValue(value)).build();
+ }
}
/**
* The value of an int64(Long) array type field.
*/
@RequiredArgsConstructor
- class LongArrayField {
- private final long[] value;
+ @Getter
+ class LongArrayField implements Field {
+ protected final List<Long> value;
+
+ @Override
+ public Banyandb.Field toField() {
+ return Banyandb.Field.newBuilder().setIntArray(Banyandb.IntArray.newBuilder().addAllValue(value)).build();
+ }
}
}
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/FieldAndValue.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/FieldAndValue.java
new file mode 100644
index 0000000..0c1e13e
--- /dev/null
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/FieldAndValue.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import java.util.List;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+
+public interface FieldAndValue extends Field {
+ /**
+ * @return field name
+ */
+ String getFieldName();
+
+ /**
+ * @return true if value is null;
+ */
+ boolean isNull();
+
+ static FieldAndValue build(Banyandb.TypedPair typedPair) {
+ if (typedPair.hasIntPair()) {
+ final Banyandb.IntPair intPair = typedPair.getIntPair();
+ if (intPair.getIsNull()) {
+ return new LongFieldPair(intPair.getKey(), null);
+ } else {
+ return new LongFieldPair(intPair.getKey(), intPair.getValue());
+ }
+ } else if (typedPair.hasStrPair()) {
+ final Banyandb.StrPair strPair = typedPair.getStrPair();
+ if (strPair.getIsNull()) {
+ return new StringFieldPair(strPair.getKey(), null);
+ } else {
+ return new StringFieldPair(strPair.getKey(), strPair.getValue());
+ }
+ } else if (typedPair.hasIntArrayPair()) {
+ final Banyandb.IntArrayPair intArrayPair = typedPair.getIntArrayPair();
+ if (intArrayPair.getIsNull()) {
+ return new LongArrayFieldPair(intArrayPair.getKey(), null);
+ } else {
+ return new LongArrayFieldPair(intArrayPair.getKey(), intArrayPair.getValueList());
+ }
+ } else if (typedPair.hasStrArrayPair()) {
+ final Banyandb.StrArrayPair strArrayPair = typedPair.getStrArrayPair();
+ if (strArrayPair.getIsNull()) {
+ return new StringArrayFieldPair(strArrayPair.getKey(), null);
+ } else {
+ return new StringArrayFieldPair(strArrayPair.getKey(), strArrayPair.getValueList());
+ }
+ }
+ throw new IllegalArgumentException("Unrecognized TypedPair, " + typedPair);
+ }
+
+ class StringFieldPair extends StringField implements FieldAndValue {
+ private final String fieldName;
+
+ StringFieldPair(final String fieldName, final String value) {
+ super(value);
+ this.fieldName = fieldName;
+ }
+
+ @Override
+ public String getFieldName() {
+ return fieldName;
+ }
+
+ @Override
+ public boolean isNull() {
+ return value == null;
+ }
+ }
+
+ class StringArrayFieldPair extends StringArrayField implements FieldAndValue {
+ private final String fieldName;
+
+ StringArrayFieldPair(final String fieldName, final List<String> value) {
+ super(value);
+ this.fieldName = fieldName;
+ }
+
+ @Override
+ public String getFieldName() {
+ return fieldName;
+ }
+
+ @Override
+ public boolean isNull() {
+ return value == null;
+ }
+ }
+
+ class LongFieldPair extends LongField implements FieldAndValue {
+ private final String fieldName;
+
+ LongFieldPair(final String fieldName, final Long value) {
+ super(value);
+ this.fieldName = fieldName;
+ }
+
+ @Override
+ public String getFieldName() {
+ return null;
+ }
+
+ @Override
+ public boolean isNull() {
+ return value == null;
+ }
+ }
+
+ class LongArrayFieldPair extends LongArrayField implements FieldAndValue {
+ private final String fieldName;
+
+ LongArrayFieldPair(final String fieldName, final List<Long> value) {
+ super(value);
+ this.fieldName = fieldName;
+ }
+
+ @Override
+ public String getFieldName() {
+ return fieldName;
+ }
+
+ @Override
+ public boolean isNull() {
+ return value == null;
+ }
+ }
+}
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
index 5e9f48c..89b8038 100644
--- a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.banyandb.v1.client;
+import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
@@ -25,9 +26,16 @@ import lombok.Setter;
* Client connection options.
*/
@Setter
-@Getter
+@Getter(AccessLevel.PACKAGE)
public class Options {
+ /**
+ * Max inbound message size
+ */
private int maxInboundMessageSize = 1024 * 1024 * 50;
+ /**
+ * Threshold of gRPC blocking query, unit is second
+ */
+ private int deadline = 30;
Options() {
}
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java
new file mode 100644
index 0000000..9276e0e
--- /dev/null
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import org.apache.skywalking.banyandb.v1.Banyandb;
+
+import static org.apache.skywalking.banyandb.v1.Banyandb.PairQuery.BinaryOp.BINARY_OP_EQ;
+
+/**
+ * PairQuery represents a query condition, including field name, operator, and value(s);
+ */
+public abstract class PairQueryCondition {
+ protected String fieldName;
+
+ protected abstract Banyandb.PairQuery build();
+
+ /**
+ * LongEqual represents `Field == value` condition.
+ */
+ public class LongEqual extends PairQueryCondition {
+ private final long value;
+
+ public LongEqual(String fieldName, long value) {
+ this.fieldName = fieldName;
+ this.value = value;
+ }
+
+ @Override
+ protected Banyandb.PairQuery build() {
+ return Banyandb.PairQuery.newBuilder()
+ .setOp(BINARY_OP_EQ)
+ .setCondition(
+ Banyandb.TypedPair.newBuilder()
+ .setIntPair(
+ Banyandb.IntPair.newBuilder()
+ .setKey(fieldName)
+ .setValue(value)))
+ .build();
+ }
+ }
+
+ //TODO, Add all conditions.
+}
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/RowEntity.java
similarity index 55%
copy from oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
copy to oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/RowEntity.java
index 5e9f48c..4f95faa 100644
--- a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/RowEntity.java
@@ -18,18 +18,26 @@
package org.apache.skywalking.banyandb.v1.client;
+import java.util.ArrayList;
+import java.util.List;
import lombok.Getter;
-import lombok.Setter;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
/**
- * Client connection options.
+ * RowEntity represents an entity of BanyanDB entity.
*/
-@Setter
@Getter
-public class Options {
- private int maxInboundMessageSize = 1024 * 1024 * 50;
+public class RowEntity {
+ private final String id;
+ private final long timestamp;
+ private final byte[] binary;
+ private final List<FieldAndValue> fields;
- Options() {
+ RowEntity(BanyandbTrace.Entity entity) {
+ id = entity.getEntityId();
+ timestamp = entity.getTimestamp().getSeconds() * 1000 + entity.getTimestamp().getNanos() / 1000;
+ binary = entity.getDataBinary().toByteArray();
+ fields = new ArrayList<>(entity.getFieldsCount());
+ entity.getFieldsList().forEach(field -> fields.add(FieldAndValue.build(field)));
}
-
-}
\ No newline at end of file
+}
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TimestampRange.java
similarity index 50%
copy from oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
copy to oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TimestampRange.java
index 5e9f48c..2259068 100644
--- a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TimestampRange.java
@@ -18,18 +18,29 @@
package org.apache.skywalking.banyandb.v1.client;
+import com.google.protobuf.Timestamp;
+import lombok.AccessLevel;
import lombok.Getter;
-import lombok.Setter;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.banyandb.v1.Banyandb;
-/**
- * Client connection options.
- */
-@Setter
-@Getter
-public class Options {
- private int maxInboundMessageSize = 1024 * 1024 * 50;
+@RequiredArgsConstructor
+@Getter(AccessLevel.PROTECTED)
+public class TimestampRange {
+ private final long begin;
+ private final long end;
- Options() {
+ /**
+ * @return TimeRange accordingly.
+ */
+ Banyandb.TimeRange build() {
+ final Banyandb.TimeRange.Builder builder = Banyandb.TimeRange.newBuilder();
+ builder.setBegin(Timestamp.newBuilder()
+ .setSeconds(begin / 1000)
+ .setNanos((int) (begin % 1000 * 1000)));
+ builder.setBegin(Timestamp.newBuilder()
+ .setSeconds(end / 1000)
+ .setNanos((int) (end % 1000 * 1000)));
+ return builder.build();
}
-
-}
\ No newline at end of file
+}
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java
index 409ac97..5bab418 100644
--- a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java
@@ -31,7 +31,7 @@ import org.apache.skywalking.banyandb.v1.trace.TraceServiceGrpc;
@Slf4j
public class TraceBulkWriteProcessor extends BulkWriteProcessor {
/**
- * The instance name.
+ * The BanyanDB instance name.
*/
private final String group;
private TraceServiceGrpc.TraceServiceStub traceServiceStub;
@@ -55,6 +55,15 @@ public class TraceBulkWriteProcessor extends BulkWriteProcessor {
this.traceServiceStub = traceServiceStub;
}
+ /**
+ * Add the trace to the bulk processor.
+ *
+ * @param traceWrite to add.
+ */
+ public void add(TraceWrite traceWrite) {
+ this.buffer.produce(traceWrite);
+ }
+
@Override
protected void flush(final List data) {
final StreamObserver<BanyandbTrace.WriteRequest> writeRequestStreamObserver
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQuery.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQuery.java
new file mode 100644
index 0000000..5f84a99
--- /dev/null
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQuery.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import java.util.ArrayList;
+import java.util.List;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+
+/**
+ * TraceQuery is the high-level query API for the trace model.
+ */
+@Setter
+public class TraceQuery {
+ /**
+ * Owner name current entity
+ */
+ private final String name;
+ /**
+ * The time range for query.
+ */
+ private final TimestampRange timestampRange;
+ /**
+ * The projections of query result. These should have defined in the schema.
+ */
+ private final List<String> projections;
+ /**
+ * Query conditions.
+ */
+ private final List<PairQueryCondition> conditions;
+ /**
+ * The starting row id of the query. Default value is 0.
+ */
+ private int offset;
+ /**
+ * The limit size of the query. Default value is 20.
+ */
+ private int limit;
+ /**
+ * One order condition is supported and optional.
+ */
+ private OrderBy orderBy;
+
+ public TraceQuery(final String name, final TimestampRange timestampRange, final List<String> projections) {
+ this.name = name;
+ this.timestampRange = timestampRange;
+ this.projections = projections;
+ this.conditions = new ArrayList<>(10);
+ this.offset = 0;
+ this.limit = 20;
+ }
+
+ /**
+ * @param group The instance name.
+ * @return QueryRequest for gRPC level query.
+ */
+ BanyandbTrace.QueryRequest build(String group) {
+ final BanyandbTrace.QueryRequest.Builder builder = BanyandbTrace.QueryRequest.newBuilder();
+ builder.setMetadata(Banyandb.Metadata.newBuilder()
+ .setGroup(group)
+ .setName(name)
+ .build());
+ builder.setTimeRange(timestampRange.build());
+ builder.setProjection(Banyandb.Projection.newBuilder().addAllKeyNames(projections).build());
+ conditions.forEach(pairQueryCondition -> builder.addFields(pairQueryCondition.build()));
+ builder.setOffset(offset);
+ builder.setLimit(limit);
+ if (orderBy != null) {
+ builder.setOrderBy(orderBy.build());
+ }
+ return builder.build();
+ }
+
+ @RequiredArgsConstructor
+ public static class OrderBy {
+ /**
+ * The field name for ordering.
+ */
+ private final String fieldName;
+ /**
+ * The type of ordering.
+ */
+ private final Type type;
+
+ private Banyandb.QueryOrder build() {
+ final Banyandb.QueryOrder.Builder builder = Banyandb.QueryOrder.newBuilder();
+ builder.setKeyName(fieldName);
+ builder.setSort(
+ Type.DESC.equals(type) ? Banyandb.QueryOrder.Sort.SORT_DESC : Banyandb.QueryOrder.Sort.SORT_ASC);
+ return builder.build();
+ }
+
+ public enum Type {
+ ASC, DESC
+ }
+ }
+}
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQueryResponse.java
similarity index 57%
copy from oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
copy to oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQueryResponse.java
index 5e9f48c..26592fa 100644
--- a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceQueryResponse.java
@@ -18,18 +18,28 @@
package org.apache.skywalking.banyandb.v1.client;
+import java.util.ArrayList;
+import java.util.List;
import lombok.Getter;
-import lombok.Setter;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
/**
- * Client connection options.
+ * TraceQueryResponse represents the trace query result.
*/
-@Setter
-@Getter
-public class Options {
- private int maxInboundMessageSize = 1024 * 1024 * 50;
+public class TraceQueryResponse {
+ @Getter
+ private List<RowEntity> entities;
- Options() {
+ TraceQueryResponse(BanyandbTrace.QueryResponse response) {
+ final List<BanyandbTrace.Entity> entitiesList = response.getEntitiesList();
+ entities = new ArrayList<>(entitiesList.size());
+ entitiesList.forEach(entity -> entities.add(new RowEntity(entity)));
}
-}
\ No newline at end of file
+ /**
+ * @return size of the response set.
+ */
+ public int size() {
+ return entities.size();
+ }
+}
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java
index f40c48b..d1023f2 100644
--- a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java
@@ -52,12 +52,20 @@ public class TraceWrite {
* wouldn't deserialize this. So, no format requirement.
*/
private final byte[] binary;
- private final List<WriteField> fields;
+ /**
+ * The values of fields, which are defined by the schema. In the bulk write process, BanyanDB client doesn't require
+ * field names anymore.
+ */
+ private final List<Field> fields;
+ /**
+ * @param group of the BanyanDB client connected.
+ * @return {@link BanyandbTrace.WriteRequest} for the bulk process.
+ */
BanyandbTrace.WriteRequest build(String group) {
final BanyandbTrace.WriteRequest.Builder builder = BanyandbTrace.WriteRequest.newBuilder();
builder.setMetadata(Banyandb.Metadata.newBuilder().setGroup(group).setName(name).build());
- final Banyandb.EntityValue.Builder entityBuilder = Banyandb.EntityValue.newBuilder();
+ final BanyandbTrace.EntityValue.Builder entityBuilder = BanyandbTrace.EntityValue.newBuilder();
entityBuilder.setEntityId(entityId);
entityBuilder.setTimestamp(Timestamp.newBuilder()
.setSeconds(timestamp / 1000)
@@ -65,6 +73,6 @@ public class TraceWrite {
entityBuilder.setDataBinary(ByteString.copyFrom(binary));
fields.forEach(writeField -> entityBuilder.addFields(writeField.toField()));
builder.setEntity(entityBuilder.build());
- return null;
+ return builder.build();
}
}
diff --git a/oap-server/banyandb-java-client/src/main/proto/banyandb/v1/banyandb.proto b/oap-server/banyandb-java-client/src/main/proto/banyandb/v1/banyandb.proto
index bcf30fa..636aa3c 100644
--- a/oap-server/banyandb-java-client/src/main/proto/banyandb/v1/banyandb.proto
+++ b/oap-server/banyandb-java-client/src/main/proto/banyandb/v1/banyandb.proto
@@ -32,16 +32,32 @@ message Metadata {
string name = 2;
}
-// IntPair in a typed pair with an array of int64 as values
+// IntPair in a typed pair with an int64 as values
message IntPair {
string key = 1;
- repeated int64 values = 2;
+ int64 value = 2;
+ bool isNull = 3;
}
-// StrPair in a typed pair with an array of string as values
+// StrPair in a typed pair with a string as values
message StrPair {
string key = 1;
- repeated string values = 2;
+ string value = 2;
+ bool isNull = 3;
+}
+
+// IntPair in a typed pair with an array of int64 as values
+message IntArrayPair {
+ string key = 1;
+ repeated int64 value = 2;
+ bool isNull = 3;
+}
+
+// StrPair in a typed pair with an array of string as values
+message StrArrayPair {
+ string key = 1;
+ repeated string value = 2;
+ bool isNull = 3;
}
// Pair is the building block of a record which is equivalent to a key-value pair.
@@ -52,6 +68,8 @@ message TypedPair {
oneof typed {
IntPair int_pair = 1;
StrPair str_pair = 2;
+ IntArrayPair int_array_pair = 3;
+ StrArrayPair str_array_pair = 4;
}
}