You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/11/29 13:24:03 UTC
[skywalking-banyandb-java-client] branch main updated: Add single write API and refactor projection (#5)
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb-java-client.git
The following commit(s) were added to refs/heads/main by this push:
new 8dfbaf3 Add single write API and refactor projection (#5)
8dfbaf3 is described below
commit 8dfbaf36ad27705fb0b9d6de3a04dd4b88572037
Author: Jiajing LU <lu...@gmail.com>
AuthorDate: Mon Nov 29 21:23:56 2021 +0800
Add single write API and refactor projection (#5)
* polish the api
* add relative docs
* add single write api
---
README.md | 8 +-
pom.xml | 6 ++
.../banyandb/v1/client/BanyanDBClient.java | 31 ++++++
.../skywalking/banyandb/v1/client/StreamQuery.java | 12 ++-
.../skywalking/banyandb/v1/client/StreamWrite.java | 35 +++----
.../apache/skywalking/banyandb/v1/client/Tag.java | 25 +++++
.../skywalking/banyandb/v1/client/TagAndValue.java | 9 ++
.../v1/client/BanyanDBClientQueryTest.java | 14 ++-
.../v1/client/BanyanDBClientWriteTest.java | 111 ++++++++++++++++++---
9 files changed, 212 insertions(+), 39 deletions(-)
diff --git a/README.md b/README.md
index d874eec..adeb039 100644
--- a/README.md
+++ b/README.md
@@ -112,12 +112,14 @@ Instant begin = end.minus(15, ChronoUnit.MINUTES);
// with stream schema, group=default, name=sw
StreamQuery query = new StreamQuery("sw",
new TimestampRange(begin.toEpochMilli(), end.toEpochMilli()),
- // projection tags
+ // projection tags which are indexed
Arrays.asList("state", "start_time", "duration", "trace_id"));
// search for all states
query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state" , 0L));
// set order by condition
query.setOrderBy(new StreamQuery.OrderBy("duration", StreamQuery.OrderBy.Type.DESC));
+// set projection for un-indexed tags
+query.setDataProjections(ImmutableList.of("data_binary"));
// send the query request
client.queryStreams(query);
```
@@ -155,9 +157,11 @@ And the non-existing tags must be fulfilled (with NullValue) instead of compacti
```java
StreamWrite streamWrite = StreamWrite.builder()
.elementId(segmentId)
- .binary(byteData)
+ // write binary data to "data" tag family
+ .dataTag(Tag.binaryField(byteData))
.timestamp(now.toEpochMilli())
.name("sw")
+ // write indexed tags to "searchable" tag family
.tag(Tag.stringField(traceId))
.tag(Tag.stringField(serviceId))
.tag(Tag.stringField(serviceInstanceId))
diff --git a/pom.xml b/pom.xml
index 0a5dd6a..2cc641c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -194,6 +194,12 @@
<version>${javax.annotation-api.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <version>3.21.0</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
index 95e8db4..efd8ef0 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
@@ -35,6 +35,7 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
+import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule;
import org.apache.skywalking.banyandb.v1.client.metadata.IndexRuleBinding;
@@ -167,6 +168,36 @@ public class BanyanDBClient implements Closeable {
}
/**
+ * Perform a single write with given entity.
+ *
+ * @param streamWrite the entity to be written
+ */
+ public void write(StreamWrite streamWrite) {
+ final StreamObserver<BanyandbStream.WriteRequest> writeRequestStreamObserver
+ = streamServiceStub
+ .write(
+ new StreamObserver<BanyandbStream.WriteResponse>() {
+ @Override
+ public void onNext(BanyandbStream.WriteResponse writeResponse) {
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ log.error("Error occurs in flushing streams.", throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ });
+ try {
+ writeRequestStreamObserver.onNext(streamWrite.build(group));
+ } finally {
+ writeRequestStreamObserver.onCompleted();
+ }
+ }
+
+ /**
* Create a build process for stream write.
*
* @param maxBulkSize the max bulk size for the flush operation
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamQuery.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamQuery.java
index 19343a4..f4d260e 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamQuery.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamQuery.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.banyandb.v1.client;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -63,9 +64,9 @@ public class StreamQuery {
*/
private OrderBy orderBy;
/**
- * Whether to fetch data_binary for the query
+ * Whether to fetch unindexed fields from the "data" tag family for the query
*/
- private boolean dataBinary;
+ private List<String> dataProjections;
public StreamQuery(final String name, final TimestampRange timestampRange, final List<String> projections) {
this.name = name;
@@ -74,7 +75,8 @@ public class StreamQuery {
this.conditions = new ArrayList<>(10);
this.offset = 0;
this.limit = 20;
- this.dataBinary = false;
+ // by default, we don't need projection for data tag family
+ this.dataProjections = Collections.emptyList();
}
public StreamQuery(final String name, final List<String> projections) {
@@ -110,10 +112,10 @@ public class StreamQuery {
.setName("searchable")
.addAllTags(this.projections)
.build());
- if (this.dataBinary) {
+ if (!this.dataProjections.isEmpty()) {
projectionBuilder.addTagFamilies(Banyandb.Projection.TagFamily.newBuilder()
.setName("data")
- .addTags("data_binary")
+ .addAllTags(this.dataProjections)
.build());
}
builder.setProjection(projectionBuilder);
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java
index 8586984..92eb7a0 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java
@@ -18,7 +18,6 @@
package org.apache.skywalking.banyandb.v1.client;
-import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import java.util.List;
@@ -51,17 +50,19 @@ public class StreamWrite {
*/
private final long timestamp;
/**
- * The binary raw data represents the whole object of current stream. It could be organized by
- * different serialization formats. Natively, SkyWalking uses protobuf, but it is not required. The BanyanDB server
- * wouldn't deserialize this. So, no format requirement.
+ * The fields represent objects of current stream, and they are not indexed.
+ * It could be organized by different serialization formats.
+ * For instance, regarding the binary format, SkyWalking may use protobuf, but it is not required.
+ * The BanyanDB server wouldn't deserialize binary data. Thus, no specific format requirement.
*/
- private final byte[] binary;
+ @Singular
+ private final List<SerializableTag<Banyandb.TagValue>> dataTags;
/**
- * The values of fields, which are defined by the schema. In the bulk write process, BanyanDB client doesn't require
- * field names anymore.
+ * The values of "searchable" fields, which are defined by the schema.
+ * In the bulk write process, BanyanDB client doesn't require field names anymore.
*/
@Singular
- private final List<SerializableTag<Banyandb.TagValue>> tags;
+ private final List<SerializableTag<Banyandb.TagValue>> searchableTags;
/**
* @param group of the BanyanDB client connected.
@@ -76,17 +77,17 @@ public class StreamWrite {
.setSeconds(timestamp / 1000)
.setNanos((int) (timestamp % 1000 * 1_000_000)));
// 1 - add "data" tags
- elemValBuilder.addTagFamilies(Banyandb.TagFamilyForWrite.newBuilder().addTags(
- Banyandb.TagValue.newBuilder()
- .setBinaryData(ByteString.copyFrom(this.binary))
- .build()
- ).build());
+ Banyandb.TagFamilyForWrite.Builder dataBuilder = Banyandb.TagFamilyForWrite.newBuilder();
+ for (final SerializableTag<Banyandb.TagValue> dataTag : this.dataTags) {
+ dataBuilder.addTags(dataTag.toTag());
+ }
+ elemValBuilder.addTagFamilies(dataBuilder.build());
// 2 - add "searchable" tags
- Banyandb.TagFamilyForWrite.Builder b = Banyandb.TagFamilyForWrite.newBuilder();
- for (final SerializableTag<Banyandb.TagValue> tag : tags) {
- b.addTags(tag.toTag());
+ Banyandb.TagFamilyForWrite.Builder searchableBuilder = Banyandb.TagFamilyForWrite.newBuilder();
+ for (final SerializableTag<Banyandb.TagValue> searchableTag : this.searchableTags) {
+ searchableBuilder.addTags(searchableTag.toTag());
}
- elemValBuilder.addTagFamilies(b);
+ elemValBuilder.addTagFamilies(searchableBuilder);
builder.setElement(elemValBuilder);
return builder.build();
}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/Tag.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/Tag.java
index 394713b..8db3f4e 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/Tag.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/Tag.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.banyandb.v1.client;
import java.util.List;
+import com.google.protobuf.ByteString;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.apache.skywalking.banyandb.v1.Banyandb;
@@ -112,6 +113,20 @@ public abstract class Tag<T> {
}
/**
+ * The value of a byte array(ByteString) type field.
+ */
+ public static class BinaryField extends Tag<ByteString> implements SerializableTag<Banyandb.TagValue> {
+ public BinaryField(ByteString byteString) {
+ super(byteString);
+ }
+
+ @Override
+ public Banyandb.TagValue toTag() {
+ return Banyandb.TagValue.newBuilder().setBinaryData(value).build();
+ }
+ }
+
+ /**
* Construct a string field
*
* @param val payload
@@ -142,6 +157,16 @@ public abstract class Tag<T> {
}
/**
+ * Construct a byte array field.
+ *
+ * @param bytes binary data
+ * @return Anonymous field with binary payload
+ */
+ public static SerializableTag<Banyandb.TagValue> binaryField(byte[] bytes) {
+ return new BinaryField(ByteString.copyFrom(bytes));
+ }
+
+ /**
* Construct a long array field
*
* @param val payload
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/TagAndValue.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/TagAndValue.java
index c79863c..db7a012 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/TagAndValue.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/TagAndValue.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.banyandb.v1.client;
import java.util.List;
+import com.google.protobuf.ByteString;
import lombok.EqualsAndHashCode;
import org.apache.skywalking.banyandb.v1.Banyandb;
@@ -68,6 +69,8 @@ public abstract class TagAndValue<T> extends Tag<T> {
return new LongArrayTagPair(tagFamilyName, tag.getKey(), tag.getValue().getIntArray().getValueList());
case STR_ARRAY:
return new StringArrayTagPair(tagFamilyName, tag.getKey(), tag.getValue().getStrArray().getValueList());
+ case BINARY_DATA:
+ return new BinaryTagPair(tagFamilyName, tag.getKey(), tag.getValue().getBinaryData());
case NULL:
return new NullTagPair(tagFamilyName, tag.getKey());
default:
@@ -99,6 +102,12 @@ public abstract class TagAndValue<T> extends Tag<T> {
}
}
+ public static class BinaryTagPair extends TagAndValue<ByteString> {
+ public BinaryTagPair(String tagFamilyName, String fieldName, ByteString byteString) {
+ super(tagFamilyName, fieldName, byteString);
+ }
+ }
+
public static class NullTagPair extends TagAndValue<Void> {
NullTagPair(final String tagFamilyName, final String tagName) {
super(tagFamilyName, tagName, null);
diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientQueryTest.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientQueryTest.java
index 6989982..f558ea6 100644
--- a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientQueryTest.java
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientQueryTest.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.banyandb.v1.client;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
import com.google.protobuf.NullValue;
import com.google.protobuf.Timestamp;
import io.grpc.ManagedChannel;
@@ -191,6 +192,7 @@ public class BanyanDBClientQueryTest {
StreamQuery query = new StreamQuery("sw", Arrays.asList("state", "start_time", "duration", "trace_id"));
query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "trace_id", traceId));
+ query.setDataProjections(ImmutableList.of("data_binary"));
client.queryStreams(query);
@@ -237,12 +239,20 @@ public class BanyanDBClientQueryTest {
.setValue(Banyandb.TagValue.newBuilder().setNull(NullValue.NULL_VALUE).build())
.build())
.build())
+ .addTagFamilies(Banyandb.TagFamily.newBuilder()
+ .setName("data")
+ .addTags(Banyandb.Tag.newBuilder()
+ .setKey("data_binary")
+ .setValue(Banyandb.TagValue.newBuilder()
+ .setBinaryData(ByteString.copyFrom(binaryData)).build())
+ .build())
+ .build())
.build())
.build();
StreamQueryResponse resp = new StreamQueryResponse(responseObj);
Assert.assertNotNull(resp);
Assert.assertEquals(1, resp.getElements().size());
- Assert.assertEquals(1, resp.getElements().get(0).getTagFamilies().size());
+ Assert.assertEquals(2, resp.getElements().get(0).getTagFamilies().size());
Assert.assertEquals(3, resp.getElements().get(0).getTagFamilies().get(0).size());
Assert.assertEquals(new TagAndValue.StringTagPair("searchable", "trace_id", traceId),
resp.getElements().get(0).getTagFamilies().get(0).get(0));
@@ -250,6 +260,8 @@ public class BanyanDBClientQueryTest {
resp.getElements().get(0).getTagFamilies().get(0).get(1));
Assert.assertEquals(new TagAndValue.StringTagPair("searchable", "mq.broker", null),
resp.getElements().get(0).getTagFamilies().get(0).get(2));
+ Assert.assertEquals(new TagAndValue.BinaryTagPair("data", "data_binary", ByteString.copyFrom(binaryData)),
+ resp.getElements().get(0).getTagFamilies().get(1).get(0));
}
static <T> void assertCollectionEqual(Collection<T> c1, Collection<T> c2) {
diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientWriteTest.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientWriteTest.java
index dfaafcf..4689a3e 100644
--- a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientWriteTest.java
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientWriteTest.java
@@ -120,22 +120,22 @@ public class BanyanDBClientWriteTest {
StreamWrite streamWrite = StreamWrite.builder()
.elementId(segmentId)
- .binary(byteData)
+ .dataTag(Tag.binaryField(byteData))
.timestamp(now.toEpochMilli())
.name("sw")
- .tag(Tag.stringField(traceId)) // 0
- .tag(Tag.stringField(serviceId))
- .tag(Tag.stringField(serviceInstanceId))
- .tag(Tag.stringField(endpointId))
- .tag(Tag.longField(latency)) // 4
- .tag(Tag.longField(state))
- .tag(Tag.stringField(httpStatusCode))
- .tag(Tag.nullField()) // 7
- .tag(Tag.stringField(dbType))
- .tag(Tag.stringField(dbInstance))
- .tag(Tag.stringField(broker))
- .tag(Tag.stringField(topic))
- .tag(Tag.stringField(queue)) // 12
+ .searchableTag(Tag.stringField(traceId)) // 0
+ .searchableTag(Tag.stringField(serviceId))
+ .searchableTag(Tag.stringField(serviceInstanceId))
+ .searchableTag(Tag.stringField(endpointId))
+ .searchableTag(Tag.longField(latency)) // 4
+ .searchableTag(Tag.longField(state))
+ .searchableTag(Tag.stringField(httpStatusCode))
+ .searchableTag(Tag.nullField()) // 7
+ .searchableTag(Tag.stringField(dbType))
+ .searchableTag(Tag.stringField(dbInstance))
+ .searchableTag(Tag.stringField(broker))
+ .searchableTag(Tag.stringField(topic))
+ .searchableTag(Tag.stringField(queue)) // 12
.build();
streamBulkWriteProcessor.add(streamWrite);
@@ -153,4 +153,87 @@ public class BanyanDBClientWriteTest {
Assert.fail();
}
}
+
+ @Test
+ public void performSingleWrite() throws Exception {
+ final CountDownLatch allRequestsDelivered = new CountDownLatch(1);
+ final List<BanyandbStream.WriteRequest> writeRequestDelivered = new ArrayList<>();
+
+ // implement the fake service
+ final StreamServiceGrpc.StreamServiceImplBase serviceImpl =
+ new StreamServiceGrpc.StreamServiceImplBase() {
+ @Override
+ public StreamObserver<BanyandbStream.WriteRequest> write(StreamObserver<BanyandbStream.WriteResponse> responseObserver) {
+ return new StreamObserver<BanyandbStream.WriteRequest>() {
+ @Override
+ public void onNext(BanyandbStream.WriteRequest value) {
+ writeRequestDelivered.add(value);
+ responseObserver.onNext(BanyandbStream.WriteResponse.newBuilder().build());
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ }
+
+ @Override
+ public void onCompleted() {
+ responseObserver.onCompleted();
+ allRequestsDelivered.countDown();
+ }
+ };
+ }
+ };
+ serviceRegistry.addService(serviceImpl);
+
+ String segmentId = "1231.dfd.123123ssf";
+ String traceId = "trace_id-xxfff.111323";
+ String serviceId = "webapp_id";
+ String serviceInstanceId = "10.0.0.1_id";
+ String endpointId = "home_id";
+ int latency = 200;
+ int state = 1;
+ Instant now = Instant.now();
+ byte[] byteData = new byte[]{14};
+ String broker = "172.16.10.129:9092";
+ String topic = "topic_1";
+ String queue = "queue_2";
+ String httpStatusCode = "200";
+ String dbType = "SQL";
+ String dbInstance = "127.0.0.1:3306";
+
+ StreamWrite streamWrite = StreamWrite.builder()
+ .elementId(segmentId)
+ .dataTag(Tag.binaryField(byteData))
+ .timestamp(now.toEpochMilli())
+ .name("sw")
+ .searchableTag(Tag.stringField(traceId)) // 0
+ .searchableTag(Tag.stringField(serviceId))
+ .searchableTag(Tag.stringField(serviceInstanceId))
+ .searchableTag(Tag.stringField(endpointId))
+ .searchableTag(Tag.longField(latency)) // 4
+ .searchableTag(Tag.longField(state))
+ .searchableTag(Tag.stringField(httpStatusCode))
+ .searchableTag(Tag.nullField()) // 7
+ .searchableTag(Tag.stringField(dbType))
+ .searchableTag(Tag.stringField(dbInstance))
+ .searchableTag(Tag.stringField(broker))
+ .searchableTag(Tag.stringField(topic))
+ .searchableTag(Tag.stringField(queue)) // 12
+ .build();
+
+ client.write(streamWrite);
+
+ if (allRequestsDelivered.await(5, TimeUnit.SECONDS)) {
+ Assert.assertEquals(1, writeRequestDelivered.size());
+ final BanyandbStream.WriteRequest request = writeRequestDelivered.get(0);
+ Assert.assertArrayEquals(byteData, request.getElement().getTagFamilies(0).getTags(0).getBinaryData().toByteArray());
+ Assert.assertEquals(13, request.getElement().getTagFamilies(1).getTagsCount());
+ Assert.assertEquals(traceId, request.getElement().getTagFamilies(1).getTags(0).getStr().getValue());
+ Assert.assertEquals(latency, request.getElement().getTagFamilies(1).getTags(4).getInt().getValue());
+ Assert.assertEquals(request.getElement().getTagFamilies(1).getTags(7).getNull(), NullValue.NULL_VALUE);
+ Assert.assertEquals(queue, request.getElement().getTagFamilies(1).getTags(12).getStr().getValue());
+ } else {
+ Assert.fail();
+ }
+ }
}