You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/08/13 03:49:15 UTC

[skywalking] branch banyandb-client-api updated: Submit bulk prototype

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch banyandb-client-api
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/banyandb-client-api by this push:
     new 364f1c7  Submit bulk prototype
364f1c7 is described below

commit 364f1c7c0f80209843f616346cbe37f67a9917ba
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Fri Aug 13 11:49:04 2021 +0800

    Submit bulk prototype
---
 .../banyandb/v1/client/BanyanDBClient.java         | 13 ++-
 .../banyandb/v1/client/BulkWriteProcessor.java     | 57 ++++++++++---
 .../v1/client/TraceBulkWriteProcessor.java         | 93 ++++++++++++++++++++++
 .../skywalking/banyandb/v1/client/TraceWrite.java  | 21 ++++-
 .../skywalking/banyandb/v1/client/WriteField.java  | 25 +++++-
 5 files changed, 193 insertions(+), 16 deletions(-)

diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
index dcf3a80..485c4c7 100644
--- a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
@@ -121,8 +121,17 @@ public class BanyanDBClient {
         }
     }
 
-    public void writeTrace(TraceWrite write) {
-
+    /**
+     * Create a build process for trace write.
+     *
+     * @param maxBulkSize   the max bulk size for the flush operation
+     * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger
+     *                      automatically. Unit is second
+     * @param concurrency   the number of concurrency would run for the flush max
+     * @return trace bulk write processor
+     */
+    public TraceBulkWriteProcessor buildTraceWriteProcessor(int maxBulkSize, int flushInterval, int concurrency) {
+        return new TraceBulkWriteProcessor(group, traceServiceStub, maxBulkSize, flushInterval, concurrency);
     }
 
 }
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java
index f689e28..a522b76 100644
--- a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java
@@ -18,7 +18,9 @@
 
 package org.apache.skywalking.banyandb.v1.client;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
 
@@ -26,9 +28,10 @@ import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
  * BulkWriteProcessor is a timeline and size dual driven processor.
  *
  * It includes an internal queue and timer, and accept the data sequentially. With the given thresholds of time and
- * size, it could activate {@link #flush()} to continue the process to the next step.
+ * size, it could activate flush to continue the process to the next step.
  */
-public abstract class BulkWriteProcessor<T> {
+public abstract class BulkWriteProcessor {
+    protected final int flushInterval;
     private DataCarrier queue;
 
     /**
@@ -36,26 +39,55 @@ public abstract class BulkWriteProcessor<T> {
      *
      * @param maxBulkSize   the max bulk size for the flush operation
      * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger
-     *                      automatically
+     *                      automatically. Unit is second.
      * @param concurrency   the number of concurrency would run for the flush max.
      */
     protected BulkWriteProcessor(String processorName, int maxBulkSize, int flushInterval, int concurrency) {
-        this.queue = new DataCarrier(processorName, maxBulkSize * 2, 2);
+        this.flushInterval = flushInterval;
+        this.queue = new DataCarrier(processorName, maxBulkSize, concurrency);
+        Properties properties = new Properties();
+        properties.put("maxBulkSize", maxBulkSize);
+        properties.put("flushInterval", flushInterval);
+        properties.put("BulkWriteProcessor", this);
         queue.consume(QueueWatcher.class, concurrency);
     }
 
     /**
-     * The internal queue consumer for buld process.
+     * The internal queue consumer for build process.
      */
     private static class QueueWatcher implements IConsumer {
+        private long lastFlushTimestamp;
+        private int maxBulkSize;
+        private int flushInterval;
+        private List cachedData;
+        private BulkWriteProcessor bulkWriteProcessor;
+
         @Override
         public void init() {
-
+            lastFlushTimestamp = System.currentTimeMillis();
+            //TODO: initialize maxBulkSize and flushInterval
+            flushInterval = flushInterval * 1000;
+            cachedData = new ArrayList(maxBulkSize);
         }
 
         @Override
         public void consume(final List data) {
-
+            if (data.size() >= maxBulkSize) {
+                // The data#size actually wouldn't over the maxBulkSize due to the DataCarrier channel's max size.
+                // This is just to preventing unexpected case and avoid confusion about dropping into else section.
+                bulkWriteProcessor.flush(data);
+                lastFlushTimestamp = System.currentTimeMillis();
+            } else {
+                data.forEach(element -> {
+                    cachedData.add(element);
+                    if (cachedData.size() >= maxBulkSize) {
+                        // Flush and re-init.
+                        bulkWriteProcessor.flush(cachedData);
+                        cachedData = new ArrayList(maxBulkSize);
+                        lastFlushTimestamp = System.currentTimeMillis();
+                    }
+                });
+            }
         }
 
         @Override
@@ -70,9 +102,16 @@ public abstract class BulkWriteProcessor<T> {
 
         @Override
         public void nothingToConsume() {
-
+            if (System.currentTimeMillis() - lastFlushTimestamp > flushInterval) {
+                bulkWriteProcessor.flush(cachedData);
+                cachedData = new ArrayList(maxBulkSize);
+                lastFlushTimestamp = System.currentTimeMillis();
+            }
         }
     }
 
-    protected abstract void flush();
+    /**
+     * @param data to be flush.
+     */
+    protected abstract void flush(List data);
 }
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java
new file mode 100644
index 0000000..409ac97
--- /dev/null
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import io.grpc.stub.StreamObserver;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+import org.apache.skywalking.banyandb.v1.trace.TraceServiceGrpc;
+
+/**
+ * TraceWriteProcessor works for trace flush.
+ */
+@Slf4j
+public class TraceBulkWriteProcessor extends BulkWriteProcessor {
+    /**
+     * The instance name.
+     */
+    private final String group;
+    private TraceServiceGrpc.TraceServiceStub traceServiceStub;
+
+    /**
+     * Create the processor.
+     *
+     * @param traceServiceStub stub for gRPC call.
+     * @param maxBulkSize      the max bulk size for the flush operation
+     * @param flushInterval    if given maxBulkSize is not reached in this period, the flush would be trigger
+     *                         automatically. Unit is second.
+     * @param concurrency      the number of concurrency would run for the flush max.
+     */
+    protected TraceBulkWriteProcessor(final String group,
+                                      final TraceServiceGrpc.TraceServiceStub traceServiceStub,
+                                      final int maxBulkSize,
+                                      final int flushInterval,
+                                      final int concurrency) {
+        super("TraceBulkWriteProcessor", maxBulkSize, flushInterval, concurrency);
+        this.group = group;
+        this.traceServiceStub = traceServiceStub;
+    }
+
+    @Override
+    protected void flush(final List data) {
+        final StreamObserver<BanyandbTrace.WriteRequest> writeRequestStreamObserver
+            = traceServiceStub.withDeadlineAfter(
+                                  flushInterval, TimeUnit.SECONDS)
+                              .write(
+                                  new StreamObserver<BanyandbTrace.WriteResponse>() {
+                                      @Override
+                                      public void onNext(
+                                          BanyandbTrace.WriteResponse writeResponse) {
+                                      }
+
+                                      @Override
+                                      public void onError(
+                                          Throwable throwable) {
+                                          log.error(
+                                              "Error occurs in flushing traces.",
+                                              throwable
+                                          );
+                                      }
+
+                                      @Override
+                                      public void onCompleted() {
+                                      }
+                                  });
+        try {
+            data.forEach(write -> {
+                final TraceWrite traceWrite = (TraceWrite) write;
+                BanyandbTrace.WriteRequest request = traceWrite.build(group);
+                writeRequestStreamObserver.onNext(request);
+            });
+        } finally {
+            writeRequestStreamObserver.onCompleted();
+        }
+    }
+}
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java
index c4d72bf..f40c48b 100644
--- a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java
@@ -18,20 +18,24 @@
 
 package org.apache.skywalking.banyandb.v1.client;
 
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Timestamp;
 import java.util.List;
 import lombok.AccessLevel;
 import lombok.Builder;
 import lombok.Getter;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
 
 /**
  * TraceWrite represents a write operation, including necessary fields, for {@link
- * BanyanDBClient#writeTrace(TraceWrite)}.
+ * BanyanDBClient#buildTraceWriteProcessor}.
  */
 @Builder
 @Getter(AccessLevel.PROTECTED)
 public class TraceWrite {
     /**
-     * Ower name current entity
+     * Owner name current entity
      */
     private final String name;
     /**
@@ -50,4 +54,17 @@ public class TraceWrite {
     private final byte[] binary;
     private final List<WriteField> fields;
 
+    BanyandbTrace.WriteRequest build(String group) {
+        final BanyandbTrace.WriteRequest.Builder builder = BanyandbTrace.WriteRequest.newBuilder();
+        builder.setMetadata(Banyandb.Metadata.newBuilder().setGroup(group).setName(name).build());
+        final Banyandb.EntityValue.Builder entityBuilder = Banyandb.EntityValue.newBuilder();
+        entityBuilder.setEntityId(entityId);
+        entityBuilder.setTimestamp(Timestamp.newBuilder()
+                                            .setSeconds(timestamp / 1000)
+                                            .setNanos((int) (timestamp % 1000 * 1000)));
+        entityBuilder.setDataBinary(ByteString.copyFrom(binary));
+        fields.forEach(writeField -> entityBuilder.addFields(writeField.toField()));
+        builder.setEntity(entityBuilder.build());
+        return null;
+    }
 }
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/WriteField.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/WriteField.java
index f4bf858..5918533 100644
--- a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/WriteField.java
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/WriteField.java
@@ -19,29 +19,48 @@
 package org.apache.skywalking.banyandb.v1.client;
 
 import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+
+import static com.google.protobuf.NullValue.NULL_VALUE;
 
 /**
  * WriteField represents a value of column/field for a write-op value.
  */
 public interface WriteField {
-    class NullField {
+    Banyandb.Field toField();
+
+    class NullField implements WriteField {
 
+        @Override
+        public Banyandb.Field toField() {
+            return Banyandb.Field.newBuilder().setNull(NULL_VALUE).build();
+        }
     }
 
     /**
      * The value of a String type field.
      */
     @RequiredArgsConstructor
-    class StringField {
+    class StringField implements WriteField {
         private final String value;
+
+        @Override
+        public Banyandb.Field toField() {
+            return Banyandb.Field.newBuilder().setStr(Banyandb.Str.newBuilder().setValue(value)).build();
+        }
     }
 
     /**
      * The value of a String array type field.
      */
     @RequiredArgsConstructor
-    class StringArrayField {
+    class StringArrayField implements WriteField {
         private final String[] value;
+
+        @Override
+        public Banyandb.Field toField() {
+            return null;
+        }
     }
 
     /**