You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/11/29 13:24:03 UTC

[skywalking-banyandb-java-client] branch main updated: Add single write API and refactor projection (#5)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb-java-client.git


The following commit(s) were added to refs/heads/main by this push:
     new 8dfbaf3  Add single write API and refactor projection (#5)
8dfbaf3 is described below

commit 8dfbaf36ad27705fb0b9d6de3a04dd4b88572037
Author: Jiajing LU <lu...@gmail.com>
AuthorDate: Mon Nov 29 21:23:56 2021 +0800

    Add single write API and refactor projection (#5)
    
    * polish the api
    
    * add relative docs
    
    * add single write api
---
 README.md                                          |   8 +-
 pom.xml                                            |   6 ++
 .../banyandb/v1/client/BanyanDBClient.java         |  31 ++++++
 .../skywalking/banyandb/v1/client/StreamQuery.java |  12 ++-
 .../skywalking/banyandb/v1/client/StreamWrite.java |  35 +++----
 .../apache/skywalking/banyandb/v1/client/Tag.java  |  25 +++++
 .../skywalking/banyandb/v1/client/TagAndValue.java |   9 ++
 .../v1/client/BanyanDBClientQueryTest.java         |  14 ++-
 .../v1/client/BanyanDBClientWriteTest.java         | 111 ++++++++++++++++++---
 9 files changed, 212 insertions(+), 39 deletions(-)

diff --git a/README.md b/README.md
index d874eec..adeb039 100644
--- a/README.md
+++ b/README.md
@@ -112,12 +112,14 @@ Instant begin = end.minus(15, ChronoUnit.MINUTES);
 // with stream schema, group=default, name=sw
 StreamQuery query = new StreamQuery("sw",
         new TimestampRange(begin.toEpochMilli(), end.toEpochMilli()),
-        // projection tags
+        // projection tags which are indexed
         Arrays.asList("state", "start_time", "duration", "trace_id"));
 // search for all states
 query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state" , 0L));
 // set order by condition
 query.setOrderBy(new StreamQuery.OrderBy("duration", StreamQuery.OrderBy.Type.DESC));
+// set projection for un-indexed tags
+query.setDataProjections(ImmutableList.of("data_binary"));
 // send the query request
 client.queryStreams(query);
 ```
@@ -155,9 +157,11 @@ And the non-existing tags must be fulfilled (with NullValue) instead of compacti
 ```java
 StreamWrite streamWrite = StreamWrite.builder()
     .elementId(segmentId)
-    .binary(byteData)
+    // write binary data to "data" tag family
+    .dataTag(Tag.binaryField(byteData))
     .timestamp(now.toEpochMilli())
     .name("sw")
+    // write indexed tags to "searchable" tag family
     .tag(Tag.stringField(traceId))
     .tag(Tag.stringField(serviceId))
     .tag(Tag.stringField(serviceInstanceId))
diff --git a/pom.xml b/pom.xml
index 0a5dd6a..2cc641c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -194,6 +194,12 @@
             <version>${javax.annotation-api.version}</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>3.21.0</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
index 95e8db4..efd8ef0 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
@@ -35,6 +35,7 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
+import io.grpc.stub.StreamObserver;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule;
 import org.apache.skywalking.banyandb.v1.client.metadata.IndexRuleBinding;
@@ -167,6 +168,36 @@ public class BanyanDBClient implements Closeable {
     }
 
     /**
+     * Perform a single write with given entity.
+     *
+     * @param streamWrite the entity to be written
+     */
+    public void write(StreamWrite streamWrite) {
+        final StreamObserver<BanyandbStream.WriteRequest> writeRequestStreamObserver
+                = streamServiceStub
+                .write(
+                        new StreamObserver<BanyandbStream.WriteResponse>() {
+                            @Override
+                            public void onNext(BanyandbStream.WriteResponse writeResponse) {
+                            }
+
+                            @Override
+                            public void onError(Throwable throwable) {
+                                log.error("Error occurs in flushing streams.", throwable);
+                            }
+
+                            @Override
+                            public void onCompleted() {
+                            }
+                        });
+        try {
+            writeRequestStreamObserver.onNext(streamWrite.build(group));
+        } finally {
+            writeRequestStreamObserver.onCompleted();
+        }
+    }
+
+    /**
      * Create a build process for stream write.
      *
      * @param maxBulkSize   the max bulk size for the flush operation
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamQuery.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamQuery.java
index 19343a4..f4d260e 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamQuery.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamQuery.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.banyandb.v1.client;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -63,9 +64,9 @@ public class StreamQuery {
      */
     private OrderBy orderBy;
     /**
-     * Whether to fetch data_binary for the query
+     * Whether to fetch unindexed fields from the "data" tag family for the query
      */
-    private boolean dataBinary;
+    private List<String> dataProjections;
 
     public StreamQuery(final String name, final TimestampRange timestampRange, final List<String> projections) {
         this.name = name;
@@ -74,7 +75,8 @@ public class StreamQuery {
         this.conditions = new ArrayList<>(10);
         this.offset = 0;
         this.limit = 20;
-        this.dataBinary = false;
+        // by default, we don't need projection for data tag family
+        this.dataProjections = Collections.emptyList();
     }
 
     public StreamQuery(final String name, final List<String> projections) {
@@ -110,10 +112,10 @@ public class StreamQuery {
                         .setName("searchable")
                         .addAllTags(this.projections)
                         .build());
-        if (this.dataBinary) {
+        if (!this.dataProjections.isEmpty()) {
             projectionBuilder.addTagFamilies(Banyandb.Projection.TagFamily.newBuilder()
                     .setName("data")
-                    .addTags("data_binary")
+                    .addAllTags(this.dataProjections)
                     .build());
         }
         builder.setProjection(projectionBuilder);
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java
index 8586984..92eb7a0 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java
@@ -18,7 +18,6 @@
 
 package org.apache.skywalking.banyandb.v1.client;
 
-import com.google.protobuf.ByteString;
 import com.google.protobuf.Timestamp;
 
 import java.util.List;
@@ -51,17 +50,19 @@ public class StreamWrite {
      */
     private final long timestamp;
     /**
-     * The binary raw data represents the whole object of current stream. It could be organized by
-     * different serialization formats. Natively, SkyWalking uses protobuf, but it is not required. The BanyanDB server
-     * wouldn't deserialize this. So, no format requirement.
+     * The fields represent objects of current stream, and they are not indexed.
+     * It could be organized by different serialization formats.
+     * For instance, regarding the binary format, SkyWalking may use protobuf, but it is not required.
+     * The BanyanDB server wouldn't deserialize binary data. Thus, no specific format requirement.
      */
-    private final byte[] binary;
+    @Singular
+    private final List<SerializableTag<Banyandb.TagValue>> dataTags;
     /**
-     * The values of fields, which are defined by the schema. In the bulk write process, BanyanDB client doesn't require
-     * field names anymore.
+     * The values of "searchable" fields, which are defined by the schema.
+     * In the bulk write process, BanyanDB client doesn't require field names anymore.
      */
     @Singular
-    private final List<SerializableTag<Banyandb.TagValue>> tags;
+    private final List<SerializableTag<Banyandb.TagValue>> searchableTags;
 
     /**
      * @param group of the BanyanDB client connected.
@@ -76,17 +77,17 @@ public class StreamWrite {
                 .setSeconds(timestamp / 1000)
                 .setNanos((int) (timestamp % 1000 * 1_000_000)));
         // 1 - add "data" tags
-        elemValBuilder.addTagFamilies(Banyandb.TagFamilyForWrite.newBuilder().addTags(
-                Banyandb.TagValue.newBuilder()
-                        .setBinaryData(ByteString.copyFrom(this.binary))
-                        .build()
-        ).build());
+        Banyandb.TagFamilyForWrite.Builder dataBuilder = Banyandb.TagFamilyForWrite.newBuilder();
+        for (final SerializableTag<Banyandb.TagValue> dataTag : this.dataTags) {
+            dataBuilder.addTags(dataTag.toTag());
+        }
+        elemValBuilder.addTagFamilies(dataBuilder.build());
         // 2 - add "searchable" tags
-        Banyandb.TagFamilyForWrite.Builder b = Banyandb.TagFamilyForWrite.newBuilder();
-        for (final SerializableTag<Banyandb.TagValue> tag : tags) {
-            b.addTags(tag.toTag());
+        Banyandb.TagFamilyForWrite.Builder searchableBuilder = Banyandb.TagFamilyForWrite.newBuilder();
+        for (final SerializableTag<Banyandb.TagValue> searchableTag : this.searchableTags) {
+            searchableBuilder.addTags(searchableTag.toTag());
         }
-        elemValBuilder.addTagFamilies(b);
+        elemValBuilder.addTagFamilies(searchableBuilder);
         builder.setElement(elemValBuilder);
         return builder.build();
     }
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/Tag.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/Tag.java
index 394713b..8db3f4e 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/Tag.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/Tag.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.banyandb.v1.client;
 
 import java.util.List;
 
+import com.google.protobuf.ByteString;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import org.apache.skywalking.banyandb.v1.Banyandb;
@@ -112,6 +113,20 @@ public abstract class Tag<T> {
     }
 
     /**
+     * The value of a byte array(ByteString) type field.
+     */
+    public static class BinaryField extends Tag<ByteString> implements SerializableTag<Banyandb.TagValue> {
+        public BinaryField(ByteString byteString) {
+            super(byteString);
+        }
+
+        @Override
+        public Banyandb.TagValue toTag() {
+            return Banyandb.TagValue.newBuilder().setBinaryData(value).build();
+        }
+    }
+
+    /**
      * Construct a string field
      *
      * @param val payload
@@ -142,6 +157,16 @@ public abstract class Tag<T> {
     }
 
     /**
+     * Construct a byte array field.
+     *
+     * @param bytes binary data
+     * @return Anonymous field with binary payload
+     */
+    public static SerializableTag<Banyandb.TagValue> binaryField(byte[] bytes) {
+        return new BinaryField(ByteString.copyFrom(bytes));
+    }
+
+    /**
      * Construct a long array field
      *
      * @param val payload
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/TagAndValue.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/TagAndValue.java
index c79863c..db7a012 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/TagAndValue.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/TagAndValue.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.banyandb.v1.client;
 
 import java.util.List;
 
+import com.google.protobuf.ByteString;
 import lombok.EqualsAndHashCode;
 import org.apache.skywalking.banyandb.v1.Banyandb;
 
@@ -68,6 +69,8 @@ public abstract class TagAndValue<T> extends Tag<T> {
                 return new LongArrayTagPair(tagFamilyName, tag.getKey(), tag.getValue().getIntArray().getValueList());
             case STR_ARRAY:
                 return new StringArrayTagPair(tagFamilyName, tag.getKey(), tag.getValue().getStrArray().getValueList());
+            case BINARY_DATA:
+                return new BinaryTagPair(tagFamilyName, tag.getKey(), tag.getValue().getBinaryData());
             case NULL:
                 return new NullTagPair(tagFamilyName, tag.getKey());
             default:
@@ -99,6 +102,12 @@ public abstract class TagAndValue<T> extends Tag<T> {
         }
     }
 
+    public static class BinaryTagPair extends TagAndValue<ByteString> {
+        public BinaryTagPair(String tagFamilyName, String fieldName, ByteString byteString) {
+            super(tagFamilyName, fieldName, byteString);
+        }
+    }
+
     public static class NullTagPair extends TagAndValue<Void> {
         NullTagPair(final String tagFamilyName, final String tagName) {
             super(tagFamilyName, tagName, null);
diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientQueryTest.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientQueryTest.java
index 6989982..f558ea6 100644
--- a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientQueryTest.java
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientQueryTest.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.banyandb.v1.client;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
 import com.google.protobuf.NullValue;
 import com.google.protobuf.Timestamp;
 import io.grpc.ManagedChannel;
@@ -191,6 +192,7 @@ public class BanyanDBClientQueryTest {
 
         StreamQuery query = new StreamQuery("sw", Arrays.asList("state", "start_time", "duration", "trace_id"));
         query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "trace_id", traceId));
+        query.setDataProjections(ImmutableList.of("data_binary"));
 
         client.queryStreams(query);
 
@@ -237,12 +239,20 @@ public class BanyanDBClientQueryTest {
                                         .setValue(Banyandb.TagValue.newBuilder().setNull(NullValue.NULL_VALUE).build())
                                         .build())
                                 .build())
+                        .addTagFamilies(Banyandb.TagFamily.newBuilder()
+                                .setName("data")
+                                .addTags(Banyandb.Tag.newBuilder()
+                                        .setKey("data_binary")
+                                        .setValue(Banyandb.TagValue.newBuilder()
+                                                .setBinaryData(ByteString.copyFrom(binaryData)).build())
+                                        .build())
+                                .build())
                         .build())
                 .build();
         StreamQueryResponse resp = new StreamQueryResponse(responseObj);
         Assert.assertNotNull(resp);
         Assert.assertEquals(1, resp.getElements().size());
-        Assert.assertEquals(1, resp.getElements().get(0).getTagFamilies().size());
+        Assert.assertEquals(2, resp.getElements().get(0).getTagFamilies().size());
         Assert.assertEquals(3, resp.getElements().get(0).getTagFamilies().get(0).size());
         Assert.assertEquals(new TagAndValue.StringTagPair("searchable", "trace_id", traceId),
                 resp.getElements().get(0).getTagFamilies().get(0).get(0));
@@ -250,6 +260,8 @@ public class BanyanDBClientQueryTest {
                 resp.getElements().get(0).getTagFamilies().get(0).get(1));
         Assert.assertEquals(new TagAndValue.StringTagPair("searchable", "mq.broker", null),
                 resp.getElements().get(0).getTagFamilies().get(0).get(2));
+        Assert.assertEquals(new TagAndValue.BinaryTagPair("data", "data_binary", ByteString.copyFrom(binaryData)),
+                resp.getElements().get(0).getTagFamilies().get(1).get(0));
     }
 
     static <T> void assertCollectionEqual(Collection<T> c1, Collection<T> c2) {
diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientWriteTest.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientWriteTest.java
index dfaafcf..4689a3e 100644
--- a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientWriteTest.java
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientWriteTest.java
@@ -120,22 +120,22 @@ public class BanyanDBClientWriteTest {
 
         StreamWrite streamWrite = StreamWrite.builder()
                 .elementId(segmentId)
-                .binary(byteData)
+                .dataTag(Tag.binaryField(byteData))
                 .timestamp(now.toEpochMilli())
                 .name("sw")
-                .tag(Tag.stringField(traceId)) // 0
-                .tag(Tag.stringField(serviceId))
-                .tag(Tag.stringField(serviceInstanceId))
-                .tag(Tag.stringField(endpointId))
-                .tag(Tag.longField(latency)) // 4
-                .tag(Tag.longField(state))
-                .tag(Tag.stringField(httpStatusCode))
-                .tag(Tag.nullField()) // 7
-                .tag(Tag.stringField(dbType))
-                .tag(Tag.stringField(dbInstance))
-                .tag(Tag.stringField(broker))
-                .tag(Tag.stringField(topic))
-                .tag(Tag.stringField(queue)) // 12
+                .searchableTag(Tag.stringField(traceId)) // 0
+                .searchableTag(Tag.stringField(serviceId))
+                .searchableTag(Tag.stringField(serviceInstanceId))
+                .searchableTag(Tag.stringField(endpointId))
+                .searchableTag(Tag.longField(latency)) // 4
+                .searchableTag(Tag.longField(state))
+                .searchableTag(Tag.stringField(httpStatusCode))
+                .searchableTag(Tag.nullField()) // 7
+                .searchableTag(Tag.stringField(dbType))
+                .searchableTag(Tag.stringField(dbInstance))
+                .searchableTag(Tag.stringField(broker))
+                .searchableTag(Tag.stringField(topic))
+                .searchableTag(Tag.stringField(queue)) // 12
                 .build();
 
         streamBulkWriteProcessor.add(streamWrite);
@@ -153,4 +153,87 @@ public class BanyanDBClientWriteTest {
             Assert.fail();
         }
     }
+
+    @Test
+    public void performSingleWrite() throws Exception {
+        final CountDownLatch allRequestsDelivered = new CountDownLatch(1);
+        final List<BanyandbStream.WriteRequest> writeRequestDelivered = new ArrayList<>();
+
+        // implement the fake service
+        final StreamServiceGrpc.StreamServiceImplBase serviceImpl =
+                new StreamServiceGrpc.StreamServiceImplBase() {
+                    @Override
+                    public StreamObserver<BanyandbStream.WriteRequest> write(StreamObserver<BanyandbStream.WriteResponse> responseObserver) {
+                        return new StreamObserver<BanyandbStream.WriteRequest>() {
+                            @Override
+                            public void onNext(BanyandbStream.WriteRequest value) {
+                                writeRequestDelivered.add(value);
+                                responseObserver.onNext(BanyandbStream.WriteResponse.newBuilder().build());
+                            }
+
+                            @Override
+                            public void onError(Throwable t) {
+                            }
+
+                            @Override
+                            public void onCompleted() {
+                                responseObserver.onCompleted();
+                                allRequestsDelivered.countDown();
+                            }
+                        };
+                    }
+                };
+        serviceRegistry.addService(serviceImpl);
+
+        String segmentId = "1231.dfd.123123ssf";
+        String traceId = "trace_id-xxfff.111323";
+        String serviceId = "webapp_id";
+        String serviceInstanceId = "10.0.0.1_id";
+        String endpointId = "home_id";
+        int latency = 200;
+        int state = 1;
+        Instant now = Instant.now();
+        byte[] byteData = new byte[]{14};
+        String broker = "172.16.10.129:9092";
+        String topic = "topic_1";
+        String queue = "queue_2";
+        String httpStatusCode = "200";
+        String dbType = "SQL";
+        String dbInstance = "127.0.0.1:3306";
+
+        StreamWrite streamWrite = StreamWrite.builder()
+                .elementId(segmentId)
+                .dataTag(Tag.binaryField(byteData))
+                .timestamp(now.toEpochMilli())
+                .name("sw")
+                .searchableTag(Tag.stringField(traceId)) // 0
+                .searchableTag(Tag.stringField(serviceId))
+                .searchableTag(Tag.stringField(serviceInstanceId))
+                .searchableTag(Tag.stringField(endpointId))
+                .searchableTag(Tag.longField(latency)) // 4
+                .searchableTag(Tag.longField(state))
+                .searchableTag(Tag.stringField(httpStatusCode))
+                .searchableTag(Tag.nullField()) // 7
+                .searchableTag(Tag.stringField(dbType))
+                .searchableTag(Tag.stringField(dbInstance))
+                .searchableTag(Tag.stringField(broker))
+                .searchableTag(Tag.stringField(topic))
+                .searchableTag(Tag.stringField(queue)) // 12
+                .build();
+
+        client.write(streamWrite);
+
+        if (allRequestsDelivered.await(5, TimeUnit.SECONDS)) {
+            Assert.assertEquals(1, writeRequestDelivered.size());
+            final BanyandbStream.WriteRequest request = writeRequestDelivered.get(0);
+            Assert.assertArrayEquals(byteData, request.getElement().getTagFamilies(0).getTags(0).getBinaryData().toByteArray());
+            Assert.assertEquals(13, request.getElement().getTagFamilies(1).getTagsCount());
+            Assert.assertEquals(traceId, request.getElement().getTagFamilies(1).getTags(0).getStr().getValue());
+            Assert.assertEquals(latency, request.getElement().getTagFamilies(1).getTags(4).getInt().getValue());
+            Assert.assertEquals(request.getElement().getTagFamilies(1).getTags(7).getNull(), NullValue.NULL_VALUE);
+            Assert.assertEquals(queue, request.getElement().getTagFamilies(1).getTags(12).getStr().getValue());
+        } else {
+            Assert.fail();
+        }
+    }
 }