You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/08/13 03:49:15 UTC
[skywalking] branch banyandb-client-api updated: Submit bulk
prototype
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch banyandb-client-api
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/banyandb-client-api by this push:
new 364f1c7 Submit bulk prototype
364f1c7 is described below
commit 364f1c7c0f80209843f616346cbe37f67a9917ba
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Fri Aug 13 11:49:04 2021 +0800
Submit bulk prototype
---
.../banyandb/v1/client/BanyanDBClient.java | 13 ++-
.../banyandb/v1/client/BulkWriteProcessor.java | 57 ++++++++++---
.../v1/client/TraceBulkWriteProcessor.java | 93 ++++++++++++++++++++++
.../skywalking/banyandb/v1/client/TraceWrite.java | 21 ++++-
.../skywalking/banyandb/v1/client/WriteField.java | 25 +++++-
5 files changed, 193 insertions(+), 16 deletions(-)
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
index dcf3a80..485c4c7 100644
--- a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
@@ -121,8 +121,17 @@ public class BanyanDBClient {
}
}
- public void writeTrace(TraceWrite write) {
-
+ /**
+ * Create a build process for trace write.
+ *
+ * @param maxBulkSize the max bulk size for the flush operation
+ * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger
+ * automatically. Unit is second
+ * @param concurrency the number of concurrency would run for the flush max
+ * @return trace bulk write processor
+ */
+ public TraceBulkWriteProcessor buildTraceWriteProcessor(int maxBulkSize, int flushInterval, int concurrency) {
+ return new TraceBulkWriteProcessor(group, traceServiceStub, maxBulkSize, flushInterval, concurrency);
}
}
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java
index f689e28..a522b76 100644
--- a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/BulkWriteProcessor.java
@@ -18,7 +18,9 @@
package org.apache.skywalking.banyandb.v1.client;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
@@ -26,9 +28,10 @@ import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
* BulkWriteProcessor is a timeline and size dual driven processor.
*
* It includes an internal queue and timer, and accept the data sequentially. With the given thresholds of time and
- * size, it could activate {@link #flush()} to continue the process to the next step.
+ * size, it could activate flush to continue the process to the next step.
*/
-public abstract class BulkWriteProcessor<T> {
+public abstract class BulkWriteProcessor {
+ protected final int flushInterval;
private DataCarrier queue;
/**
@@ -36,26 +39,55 @@ public abstract class BulkWriteProcessor<T> {
*
* @param maxBulkSize the max bulk size for the flush operation
* @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger
- * automatically
+ * automatically. Unit is second.
* @param concurrency the number of concurrency would run for the flush max.
*/
protected BulkWriteProcessor(String processorName, int maxBulkSize, int flushInterval, int concurrency) {
- this.queue = new DataCarrier(processorName, maxBulkSize * 2, 2);
+ this.flushInterval = flushInterval;
+ this.queue = new DataCarrier(processorName, maxBulkSize, concurrency);
+ Properties properties = new Properties();
+ properties.put("maxBulkSize", maxBulkSize);
+ properties.put("flushInterval", flushInterval);
+ properties.put("BulkWriteProcessor", this);
queue.consume(QueueWatcher.class, concurrency);
}
/**
- * The internal queue consumer for buld process.
+ * The internal queue consumer for build process.
*/
private static class QueueWatcher implements IConsumer {
+ private long lastFlushTimestamp;
+ private int maxBulkSize;
+ private int flushInterval;
+ private List cachedData;
+ private BulkWriteProcessor bulkWriteProcessor;
+
@Override
public void init() {
-
+ lastFlushTimestamp = System.currentTimeMillis();
+ //TODO: initialize maxBulkSize and flushInterval
+ flushInterval = flushInterval * 1000;
+ cachedData = new ArrayList(maxBulkSize);
}
@Override
public void consume(final List data) {
-
+ if (data.size() >= maxBulkSize) {
+ // The data#size actually wouldn't over the maxBulkSize due to the DataCarrier channel's max size.
+ // This is just to preventing unexpected case and avoid confusion about dropping into else section.
+ bulkWriteProcessor.flush(data);
+ lastFlushTimestamp = System.currentTimeMillis();
+ } else {
+ data.forEach(element -> {
+ cachedData.add(element);
+ if (cachedData.size() >= maxBulkSize) {
+ // Flush and re-init.
+ bulkWriteProcessor.flush(cachedData);
+ cachedData = new ArrayList(maxBulkSize);
+ lastFlushTimestamp = System.currentTimeMillis();
+ }
+ });
+ }
}
@Override
@@ -70,9 +102,16 @@ public abstract class BulkWriteProcessor<T> {
@Override
public void nothingToConsume() {
-
+ if (System.currentTimeMillis() - lastFlushTimestamp > flushInterval) {
+ bulkWriteProcessor.flush(cachedData);
+ cachedData = new ArrayList(maxBulkSize);
+ lastFlushTimestamp = System.currentTimeMillis();
+ }
}
}
- protected abstract void flush();
+ /**
+ * @param data to be flush.
+ */
+ protected abstract void flush(List data);
}
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java
new file mode 100644
index 0000000..409ac97
--- /dev/null
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import io.grpc.stub.StreamObserver;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
+import org.apache.skywalking.banyandb.v1.trace.TraceServiceGrpc;
+
+/**
+ * TraceWriteProcessor works for trace flush.
+ */
+@Slf4j
+public class TraceBulkWriteProcessor extends BulkWriteProcessor {
+ /**
+ * The instance name.
+ */
+ private final String group;
+ private TraceServiceGrpc.TraceServiceStub traceServiceStub;
+
+ /**
+ * Create the processor.
+ *
+ * @param traceServiceStub stub for gRPC call.
+ * @param maxBulkSize the max bulk size for the flush operation
+ * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger
+ * automatically. Unit is second.
+ * @param concurrency the number of concurrency would run for the flush max.
+ */
+ protected TraceBulkWriteProcessor(final String group,
+ final TraceServiceGrpc.TraceServiceStub traceServiceStub,
+ final int maxBulkSize,
+ final int flushInterval,
+ final int concurrency) {
+ super("TraceBulkWriteProcessor", maxBulkSize, flushInterval, concurrency);
+ this.group = group;
+ this.traceServiceStub = traceServiceStub;
+ }
+
+ @Override
+ protected void flush(final List data) {
+ final StreamObserver<BanyandbTrace.WriteRequest> writeRequestStreamObserver
+ = traceServiceStub.withDeadlineAfter(
+ flushInterval, TimeUnit.SECONDS)
+ .write(
+ new StreamObserver<BanyandbTrace.WriteResponse>() {
+ @Override
+ public void onNext(
+ BanyandbTrace.WriteResponse writeResponse) {
+ }
+
+ @Override
+ public void onError(
+ Throwable throwable) {
+ log.error(
+ "Error occurs in flushing traces.",
+ throwable
+ );
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ });
+ try {
+ data.forEach(write -> {
+ final TraceWrite traceWrite = (TraceWrite) write;
+ BanyandbTrace.WriteRequest request = traceWrite.build(group);
+ writeRequestStreamObserver.onNext(request);
+ });
+ } finally {
+ writeRequestStreamObserver.onCompleted();
+ }
+ }
+}
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java
index c4d72bf..f40c48b 100644
--- a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java
@@ -18,20 +18,24 @@
package org.apache.skywalking.banyandb.v1.client;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Timestamp;
import java.util.List;
import lombok.AccessLevel;
import lombok.Builder;
import lombok.Getter;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+import org.apache.skywalking.banyandb.v1.trace.BanyandbTrace;
/**
* TraceWrite represents a write operation, including necessary fields, for {@link
- * BanyanDBClient#writeTrace(TraceWrite)}.
+ * BanyanDBClient#buildTraceWriteProcessor}.
*/
@Builder
@Getter(AccessLevel.PROTECTED)
public class TraceWrite {
/**
- * Ower name current entity
+ * Owner name current entity
*/
private final String name;
/**
@@ -50,4 +54,17 @@ public class TraceWrite {
private final byte[] binary;
private final List<WriteField> fields;
+ BanyandbTrace.WriteRequest build(String group) {
+ final BanyandbTrace.WriteRequest.Builder builder = BanyandbTrace.WriteRequest.newBuilder();
+ builder.setMetadata(Banyandb.Metadata.newBuilder().setGroup(group).setName(name).build());
+ final Banyandb.EntityValue.Builder entityBuilder = Banyandb.EntityValue.newBuilder();
+ entityBuilder.setEntityId(entityId);
+ entityBuilder.setTimestamp(Timestamp.newBuilder()
+ .setSeconds(timestamp / 1000)
+ .setNanos((int) (timestamp % 1000 * 1000)));
+ entityBuilder.setDataBinary(ByteString.copyFrom(binary));
+ fields.forEach(writeField -> entityBuilder.addFields(writeField.toField()));
+ builder.setEntity(entityBuilder.build());
+ return null;
+ }
}
diff --git a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/WriteField.java b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/WriteField.java
index f4bf858..5918533 100644
--- a/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/WriteField.java
+++ b/oap-server/banyandb-java-client/src/main/java/org/apache/skywalking/banyandb/v1/client/WriteField.java
@@ -19,29 +19,48 @@
package org.apache.skywalking.banyandb.v1.client;
import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+
+import static com.google.protobuf.NullValue.NULL_VALUE;
/**
* WriteField represents a value of column/field for a write-op value.
*/
public interface WriteField {
- class NullField {
+ Banyandb.Field toField();
+
+ class NullField implements WriteField {
+ @Override
+ public Banyandb.Field toField() {
+ return Banyandb.Field.newBuilder().setNull(NULL_VALUE).build();
+ }
}
/**
* The value of a String type field.
*/
@RequiredArgsConstructor
- class StringField {
+ class StringField implements WriteField {
private final String value;
+
+ @Override
+ public Banyandb.Field toField() {
+ return Banyandb.Field.newBuilder().setStr(Banyandb.Str.newBuilder().setValue(value)).build();
+ }
}
/**
* The value of a String array type field.
*/
@RequiredArgsConstructor
- class StringArrayField {
+ class StringArrayField implements WriteField {
private final String[] value;
+
+ @Override
+ public Banyandb.Field toField() {
+ return null;
+ }
}
/**