You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/04/30 12:56:36 UTC
[skywalking] 10/25: add installer
This is an automated email from the ASF dual-hosted git repository.
lujiajing pushed a commit to branch banyandb-integration-stream
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 41615bd9b5ecf83b731a324dc65c46fc17ba4746
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Thu Dec 9 18:21:57 2021 +0800
add installer
---
.../src/main/resources/application.yml | 8 +
.../plugin/banyandb/BanyanDBIndexInstaller.java | 44 +++++
.../plugin/banyandb/BanyanDBStorageClient.java | 33 ++++
.../plugin/banyandb/BanyanDBStorageProvider.java | 3 +
.../storage/plugin/banyandb/StreamMetaInfo.java | 191 +++++++++++++++++++++
.../metadata/index_rules/segment/db.instance.json | 13 ++
.../src/main/resources/metadata/segment.json | 93 ++++++++++
7 files changed, 385 insertions(+)
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index d8430650b7..12db39b01b 100755
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -260,6 +260,14 @@ storage:
storageGroup: ${SW_STORAGE_IOTDB_STORAGE_GROUP:root.skywalking}
sessionPoolSize: ${SW_STORAGE_IOTDB_SESSIONPOOL_SIZE:8} # If it's zero, the SessionPool size will be 2*CPU_Cores
fetchTaskLogMaxSize: ${SW_STORAGE_IOTDB_FETCH_TASK_LOG_MAX_SIZE:1000} # the max number of fetch task log in a request
+ banyandb:
+ host: ${SW_STORAGE_BANYANDB_HOST:127.0.0.1}
+ port: ${SW_STORAGE_BANYANDB_PORT:17912}
+ group: ${SW_STORAGE_BANYANDB_GROUP:default}
+ maxBulkSize: ${SW_STORAGE_BANYANDB_MAX_BULK_SIZE:5000}
+ flushInterval: ${SW_STORAGE_BANYANDB_FLUSH_INTERVAL:15}
+ concurrentWriteThreads: ${SW_STORAGE_BANYANDB_CONCURRENT_WRITE_THREADS:15}
+ fetchTaskLogMaxSize: ${SW_STORAGE_BANYANDB_FETCH_TASK_LOG_MAX_SIZE:1000} # the max number of fetch task log in a request
agent-analyzer:
selector: ${SW_AGENT_ANALYZER:default}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
new file mode 100644
index 0000000000..ffc5aaf586
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import org.apache.skywalking.oap.server.core.storage.StorageException;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
+import org.apache.skywalking.oap.server.library.client.Client;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+
+public class BanyanDBIndexInstaller extends ModelInstaller {
+ public BanyanDBIndexInstaller(Client client, ModuleManager moduleManager) {
+ super(client, moduleManager);
+ }
+
+ @Override
+ protected boolean isExists(Model model) throws StorageException {
+ return false;
+ }
+
+ @Override
+ protected void createTable(Model model) throws StorageException {
+ StreamMetaInfo metaInfo = StreamMetaInfo.addModel(model);
+ if (metaInfo != null) {
+ ((BanyanDBStorageClient) client).createStream(metaInfo);
+ }
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
index 3ad4daf918..2ee45b12f0 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
package org.apache.skywalking.oap.server.storage.plugin.banyandb;
import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
@@ -5,12 +23,17 @@ import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
import org.apache.skywalking.banyandb.v1.client.StreamWrite;
+import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule;
+import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker;
import org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckable;
import org.apache.skywalking.oap.server.library.util.HealthChecker;
import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
/**
* BanyanDBStorageClient is a simple wrapper for the underlying {@link BanyanDBClient},
@@ -45,6 +68,16 @@ public class BanyanDBStorageClient implements Client, HealthCheckable {
}
}
+ public void createStream(StreamMetaInfo streamMetaInfo) {
+ Stream stm = this.client.define(streamMetaInfo.getStream());
+ if (stm != null) {
+ // TODO: should be fixed in SDK
+ this.client.defineIndexRules(stm, ZonedDateTime.from(Instant.now()),
+ ZonedDateTime.of(2099, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC),
+ streamMetaInfo.getIndexRules().toArray(new IndexRule[]{}));
+ }
+ }
+
public void write(StreamWrite streamWrite) {
this.client.write(streamWrite);
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
index aade57b62b..64f8e96cbf 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
@@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
+import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
@@ -122,6 +123,8 @@ public class BanyanDBStorageProvider extends ModuleProvider {
this.client.registerChecker(healthChecker);
try {
this.client.connect();
+ BanyanDBIndexInstaller installer = new BanyanDBIndexInstaller(client, getManager());
+ getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer);
} catch (Exception e) {
throw new ModuleStartException(e.getMessage(), e);
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamMetaInfo.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamMetaInfo.java
new file mode 100644
index 0000000000..1fbf2eb636
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamMetaInfo.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.CharStreams;
+import com.google.protobuf.util.JsonFormat;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.database.v1.metadata.BanyandbMetadata;
+import org.apache.skywalking.banyandb.v1.client.metadata.Duration;
+import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule;
+import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
+import org.apache.skywalking.banyandb.v1.client.metadata.TagFamilySpec;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Getter
+@Builder
+@Slf4j
+public class StreamMetaInfo {
+ public static final String TAG_FAMILY_SEARCHABLE = "searchable";
+ public static final String TAG_FAMILY_DATA = "data";
+
+ private static final Map<String, StreamMetaInfo> STREAMS = new HashMap<>();
+
+ private final Model model;
+
+ /**
+ * stream is the metadata to be used for schema creation,
+ * 1. Read json from resources/metadata/{model.name}.json and deserialize to protobuf,
+ * 2. Iterate over tag families,
+ * 3. Iterate over tags in each tag family
+ * 4.
+ */
+ private final Stream stream;
+
+ private final List<IndexRule> indexRules;
+
+ public static StreamMetaInfo addModel(Model model) {
+ BanyandbMetadata.Stream pbStream = parseStreamFromJSON(model.getName());
+ if (pbStream == null) {
+ log.warn("fail to find the stream schema {}", model.getName());
+ return null;
+ }
+ BanyandbMetadata.Duration duration = pbStream.getOpts().getTtl();
+ Duration ttl = fromProtobuf(duration);
+ final Stream stream = new Stream(pbStream.getMetadata().getName(), pbStream.getOpts().getShardNum(), ttl);
+
+ List<IndexRule> indexRules = new ArrayList<>();
+
+ stream.setEntityTagNames(pbStream.getEntity().getTagNamesList());
+ for (BanyandbMetadata.TagFamilySpec pbTagFamilySpec : pbStream.getTagFamiliesList()) {
+ final TagFamilySpec tagFamilySpec = new TagFamilySpec(pbTagFamilySpec.getName());
+ final boolean needIndexParse = pbTagFamilySpec.getName().equals(TAG_FAMILY_SEARCHABLE);
+ for (final BanyandbMetadata.TagSpec pbTagSpec : pbTagFamilySpec.getTagsList()) {
+ tagFamilySpec.addTagSpec(parseTagSpec(pbTagSpec));
+
+ // if the tag family equals to "searchable", build index rules
+ if (needIndexParse) {
+ BanyandbMetadata.IndexRule pbIndexRule = parseIndexRulesFromJSON(model.getName(), pbTagSpec.getName());
+ if (pbIndexRule == null) {
+ log.warn("fail to find the index rule for {}", pbTagSpec.getName());
+ continue;
+ }
+ IndexRule.IndexType indexType = fromProtobuf(pbIndexRule.getType());
+ IndexRule.IndexLocation indexLocation = fromProtobuf(pbIndexRule.getLocation());
+ IndexRule indexRule = new IndexRule(pbIndexRule.getMetadata().getName(), indexType, indexLocation);
+ indexRule.setTags(new ArrayList<>(pbIndexRule.getTagsList()));
+ indexRules.add(indexRule);
+ }
+ }
+ }
+
+ return StreamMetaInfo.builder().model(model).stream(stream).indexRules(indexRules).build();
+ }
+
+ private static TagFamilySpec.TagSpec parseTagSpec(BanyandbMetadata.TagSpec pbTagSpec) {
+ switch (pbTagSpec.getType()) {
+ case TAG_TYPE_INT:
+ return TagFamilySpec.TagSpec.newIntTag(pbTagSpec.getName());
+ case TAG_TYPE_INT_ARRAY:
+ return TagFamilySpec.TagSpec.newIntArrayTag(pbTagSpec.getName());
+ case TAG_TYPE_STRING:
+ return TagFamilySpec.TagSpec.newStringTag(pbTagSpec.getName());
+ case TAG_TYPE_STRING_ARRAY:
+ return TagFamilySpec.TagSpec.newStringArrayTag(pbTagSpec.getName());
+ case TAG_TYPE_DATA_BINARY:
+ return TagFamilySpec.TagSpec.newBinaryTag(pbTagSpec.getName());
+ default:
+ throw new IllegalArgumentException("unrecognized tag type");
+ }
+ }
+
+ private static BanyandbMetadata.Stream parseStreamFromJSON(String name) {
+ try {
+ InputStream is = StreamMetaInfo.class.getClassLoader().getResourceAsStream("metadata/" + name + ".json");
+ if (is == null) {
+ return null;
+ }
+ String result = CharStreams.toString(new InputStreamReader(is, Charsets.UTF_8));
+ BanyandbMetadata.Stream.Builder b = BanyandbMetadata.Stream.newBuilder();
+ JsonFormat.parser().merge(result, b);
+ return b.build();
+ } catch (IOException ioEx) {
+ log.error("fail to read json", ioEx);
+ return null;
+ }
+ }
+
+ private static BanyandbMetadata.IndexRule parseIndexRulesFromJSON(String streamName, String name) {
+ try {
+ InputStream is = StreamMetaInfo.class.getClassLoader().getResourceAsStream(String.join("/",
+ new String[]{"metadata", "index_rules", streamName, name + ".json"}));
+ if (is == null) {
+ return null;
+ }
+ String result = CharStreams.toString(new InputStreamReader(is, Charsets.UTF_8));
+ BanyandbMetadata.IndexRule.Builder b = BanyandbMetadata.IndexRule.newBuilder();
+ JsonFormat.parser().merge(result, b);
+ return b.build();
+ } catch (IOException ioEx) {
+ log.error("fail to read json", ioEx);
+ return null;
+ }
+ }
+
+ // TODO: change modifier to public in SDK
+ static Duration fromProtobuf(BanyandbMetadata.Duration duration) {
+ switch (duration.getUnit()) {
+ case DURATION_UNIT_DAY:
+ return Duration.ofDays(duration.getVal());
+ case DURATION_UNIT_HOUR:
+ return Duration.ofHours(duration.getVal());
+ case DURATION_UNIT_MONTH:
+ return Duration.ofMonths(duration.getVal());
+ case DURATION_UNIT_WEEK:
+ return Duration.ofWeeks(duration.getVal());
+ default:
+ throw new IllegalArgumentException("unrecognized DurationUnit");
+ }
+ }
+
+ // TODO: change modifier to public in SDK
+ private static IndexRule.IndexType fromProtobuf(BanyandbMetadata.IndexRule.Type type) {
+ switch (type) {
+ case TYPE_TREE:
+ return IndexRule.IndexType.TREE;
+ case TYPE_INVERTED:
+ return IndexRule.IndexType.INVERTED;
+ default:
+ throw new IllegalArgumentException("unrecognized index type");
+ }
+ }
+
+ // TODO: change modifier to public in SDK
+ private static IndexRule.IndexLocation fromProtobuf(BanyandbMetadata.IndexRule.Location loc) {
+ switch (loc) {
+ case LOCATION_GLOBAL:
+ return IndexRule.IndexLocation.GLOBAL;
+ case LOCATION_SERIES:
+ return IndexRule.IndexLocation.SERIES;
+ default:
+ throw new IllegalArgumentException("unrecognized index location");
+ }
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/resources/metadata/index_rules/segment/db.instance.json b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/resources/metadata/index_rules/segment/db.instance.json
new file mode 100644
index 0000000000..662cff2eac
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/resources/metadata/index_rules/segment/db.instance.json
@@ -0,0 +1,13 @@
+{
+ "metadata": {
+ "id": 1,
+ "name": "db.instance",
+ "group": "default"
+ },
+ "tags": [
+ "db.instance"
+ ],
+ "type": "TYPE_INVERTED",
+ "location": "LOCATION_SERIES",
+ "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/resources/metadata/segment.json b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/resources/metadata/segment.json
new file mode 100644
index 0000000000..1317d9ecf0
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/resources/metadata/segment.json
@@ -0,0 +1,93 @@
+{
+ "metadata": {
+ "name": "segment",
+ "group": "default"
+ },
+ "tag_families": [
+ {
+ "name": "data",
+ "tags": [
+ {
+ "name": "data_binary",
+ "type": "TAG_TYPE_DATA_BINARY"
+ }
+ ]
+ },
+ {
+ "name": "searchable",
+ "tags": [
+ {
+ "name": "trace_id",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "state",
+ "type": "TAG_TYPE_INT"
+ },
+ {
+ "name": "service_id",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "service_instance_id",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "endpoint_id",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "duration",
+ "type": "TAG_TYPE_INT"
+ },
+ {
+ "name": "start_time",
+ "type": "TAG_TYPE_INT"
+ },
+ {
+ "name": "http.method",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "status_code",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "db.type",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "db.instance",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "mq.queue",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "mq.topic",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "mq.broker",
+ "type": "TAG_TYPE_STRING"
+ }
+ ]
+ }
+ ],
+ "entity": {
+ "tag_names": [
+ "service_id",
+ "service_instance_id",
+ "state"
+ ]
+ },
+ "opts": {
+ "shard_num": 2,
+ "ttl": {
+ "val": 7,
+ "unit": "DURATION_UNIT_DAY"
+ }
+ },
+ "updated_at_nanoseconds": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file