You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/04/30 09:03:29 UTC

[skywalking] 10/24: add installer

This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch banyandb-integration-stream
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit a8e161f033cdbf1e3c32913265c2852ff849e29b
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Thu Dec 9 18:21:57 2021 +0800

    add installer
---
 .../src/main/resources/application.yml             |   8 +
 .../plugin/banyandb/BanyanDBIndexInstaller.java    |  44 +++++
 .../plugin/banyandb/BanyanDBStorageClient.java     |  33 ++++
 .../plugin/banyandb/BanyanDBStorageProvider.java   |   3 +
 .../storage/plugin/banyandb/StreamMetaInfo.java    | 191 +++++++++++++++++++++
 .../metadata/index_rules/segment/db.instance.json  |  13 ++
 .../src/main/resources/metadata/segment.json       |  93 ++++++++++
 7 files changed, 385 insertions(+)

diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index d8430650b7..12db39b01b 100755
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -260,6 +260,14 @@ storage:
     storageGroup: ${SW_STORAGE_IOTDB_STORAGE_GROUP:root.skywalking}
     sessionPoolSize: ${SW_STORAGE_IOTDB_SESSIONPOOL_SIZE:8} # If it's zero, the SessionPool size will be 2*CPU_Cores
     fetchTaskLogMaxSize: ${SW_STORAGE_IOTDB_FETCH_TASK_LOG_MAX_SIZE:1000} # the max number of fetch task log in a request
+  banyandb:
+    host: ${SW_STORAGE_BANYANDB_HOST:127.0.0.1}
+    port: ${SW_STORAGE_BANYANDB_PORT:17912}
+    group: ${SW_STORAGE_BANYANDB_GROUP:default}
+    maxBulkSize: ${SW_STORAGE_BANYANDB_MAX_BULK_SIZE:5000}
+    flushInterval: ${SW_STORAGE_BANYANDB_FLUSH_INTERVAL:15}
+    concurrentWriteThreads: ${SW_STORAGE_BANYANDB_CONCURRENT_WRITE_THREADS:15}
+    fetchTaskLogMaxSize: ${SW_STORAGE_BANYANDB_FETCH_TASK_LOG_MAX_SIZE:1000} # the max number of fetch task log in a request
 
 agent-analyzer:
   selector: ${SW_AGENT_ANALYZER:default}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
new file mode 100644
index 0000000000..ffc5aaf586
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import org.apache.skywalking.oap.server.core.storage.StorageException;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
+import org.apache.skywalking.oap.server.library.client.Client;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+
+public class BanyanDBIndexInstaller extends ModelInstaller {
+    public BanyanDBIndexInstaller(Client client, ModuleManager moduleManager) {
+        super(client, moduleManager);
+    }
+
+    @Override
+    protected boolean isExists(Model model) throws StorageException {
+        return false;
+    }
+
+    @Override
+    protected void createTable(Model model) throws StorageException {
+        StreamMetaInfo metaInfo = StreamMetaInfo.addModel(model);
+        if (metaInfo != null) {
+            ((BanyanDBStorageClient) client).createStream(metaInfo);
+        }
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
index 3ad4daf918..2ee45b12f0 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
 package org.apache.skywalking.oap.server.storage.plugin.banyandb;
 
 import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
@@ -5,12 +23,17 @@ import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
 import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
 import org.apache.skywalking.banyandb.v1.client.StreamWrite;
+import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule;
+import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
 import org.apache.skywalking.oap.server.library.client.Client;
 import org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker;
 import org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckable;
 import org.apache.skywalking.oap.server.library.util.HealthChecker;
 
 import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
 
 /**
  * BanyanDBStorageClient is a simple wrapper for the underlying {@link BanyanDBClient},
@@ -45,6 +68,16 @@ public class BanyanDBStorageClient implements Client, HealthCheckable {
         }
     }
 
+    public void createStream(StreamMetaInfo streamMetaInfo) {
+        Stream stm = this.client.define(streamMetaInfo.getStream());
+        if (stm != null) {
+            // TODO: should be fixed in SDK
+            this.client.defineIndexRules(stm, ZonedDateTime.from(Instant.now()),
+                    ZonedDateTime.of(2099, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC),
+                    streamMetaInfo.getIndexRules().toArray(new IndexRule[]{}));
+        }
+    }
+
     public void write(StreamWrite streamWrite) {
         this.client.write(streamWrite);
     }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
index aade57b62b..64f8e96cbf 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
@@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageDAO;
 import org.apache.skywalking.oap.server.core.storage.StorageModule;
 import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
 import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
+import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
 import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
 import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
 import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
@@ -122,6 +123,8 @@ public class BanyanDBStorageProvider extends ModuleProvider {
         this.client.registerChecker(healthChecker);
         try {
             this.client.connect();
+            BanyanDBIndexInstaller installer = new BanyanDBIndexInstaller(client, getManager());
+            getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer);
         } catch (Exception e) {
             throw new ModuleStartException(e.getMessage(), e);
         }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamMetaInfo.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamMetaInfo.java
new file mode 100644
index 0000000000..1fbf2eb636
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamMetaInfo.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.CharStreams;
+import com.google.protobuf.util.JsonFormat;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.database.v1.metadata.BanyandbMetadata;
+import org.apache.skywalking.banyandb.v1.client.metadata.Duration;
+import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule;
+import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
+import org.apache.skywalking.banyandb.v1.client.metadata.TagFamilySpec;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Getter
+@Builder
+@Slf4j
+public class StreamMetaInfo {
+    public static final String TAG_FAMILY_SEARCHABLE = "searchable";
+    public static final String TAG_FAMILY_DATA = "data";
+
+    private static final Map<String, StreamMetaInfo> STREAMS = new HashMap<>();
+
+    private final Model model;
+
+    /**
+     * stream is the metadata to be used for schema creation,
+     * 1. Read json from resources/metadata/{model.name}.json and deserialize to protobuf,
+     * 2. Iterate over tag families,
+     * 3. Iterate over tags in each tag family
+     * 4.
+     */
+    private final Stream stream;
+
+    private final List<IndexRule> indexRules;
+
+    public static StreamMetaInfo addModel(Model model) {
+        BanyandbMetadata.Stream pbStream = parseStreamFromJSON(model.getName());
+        if (pbStream == null) {
+            log.warn("fail to find the stream schema {}", model.getName());
+            return null;
+        }
+        BanyandbMetadata.Duration duration = pbStream.getOpts().getTtl();
+        Duration ttl = fromProtobuf(duration);
+        final Stream stream = new Stream(pbStream.getMetadata().getName(), pbStream.getOpts().getShardNum(), ttl);
+
+        List<IndexRule> indexRules = new ArrayList<>();
+
+        stream.setEntityTagNames(pbStream.getEntity().getTagNamesList());
+        for (BanyandbMetadata.TagFamilySpec pbTagFamilySpec : pbStream.getTagFamiliesList()) {
+            final TagFamilySpec tagFamilySpec = new TagFamilySpec(pbTagFamilySpec.getName());
+            final boolean needIndexParse = pbTagFamilySpec.getName().equals(TAG_FAMILY_SEARCHABLE);
+            for (final BanyandbMetadata.TagSpec pbTagSpec : pbTagFamilySpec.getTagsList()) {
+                tagFamilySpec.addTagSpec(parseTagSpec(pbTagSpec));
+
+                // if the tag family equals to "searchable", build index rules
+                if (needIndexParse) {
+                    BanyandbMetadata.IndexRule pbIndexRule = parseIndexRulesFromJSON(model.getName(), pbTagSpec.getName());
+                    if (pbIndexRule == null) {
+                        log.warn("fail to find the index rule for {}", pbTagSpec.getName());
+                        continue;
+                    }
+                    IndexRule.IndexType indexType = fromProtobuf(pbIndexRule.getType());
+                    IndexRule.IndexLocation indexLocation = fromProtobuf(pbIndexRule.getLocation());
+                    IndexRule indexRule = new IndexRule(pbIndexRule.getMetadata().getName(), indexType, indexLocation);
+                    indexRule.setTags(new ArrayList<>(pbIndexRule.getTagsList()));
+                    indexRules.add(indexRule);
+                }
+            }
+        }
+
+        return StreamMetaInfo.builder().model(model).stream(stream).indexRules(indexRules).build();
+    }
+
+    private static TagFamilySpec.TagSpec parseTagSpec(BanyandbMetadata.TagSpec pbTagSpec) {
+        switch (pbTagSpec.getType()) {
+            case TAG_TYPE_INT:
+                return TagFamilySpec.TagSpec.newIntTag(pbTagSpec.getName());
+            case TAG_TYPE_INT_ARRAY:
+                return TagFamilySpec.TagSpec.newIntArrayTag(pbTagSpec.getName());
+            case TAG_TYPE_STRING:
+                return TagFamilySpec.TagSpec.newStringTag(pbTagSpec.getName());
+            case TAG_TYPE_STRING_ARRAY:
+                return TagFamilySpec.TagSpec.newStringArrayTag(pbTagSpec.getName());
+            case TAG_TYPE_DATA_BINARY:
+                return TagFamilySpec.TagSpec.newBinaryTag(pbTagSpec.getName());
+            default:
+                throw new IllegalArgumentException("unrecognized tag type");
+        }
+    }
+
+    private static BanyandbMetadata.Stream parseStreamFromJSON(String name) {
+        try {
+            InputStream is = StreamMetaInfo.class.getClassLoader().getResourceAsStream("metadata/" + name + ".json");
+            if (is == null) {
+                return null;
+            }
+            String result = CharStreams.toString(new InputStreamReader(is, Charsets.UTF_8));
+            BanyandbMetadata.Stream.Builder b = BanyandbMetadata.Stream.newBuilder();
+            JsonFormat.parser().merge(result, b);
+            return b.build();
+        } catch (IOException ioEx) {
+            log.error("fail to read json", ioEx);
+            return null;
+        }
+    }
+
+    private static BanyandbMetadata.IndexRule parseIndexRulesFromJSON(String streamName, String name) {
+        try {
+            InputStream is = StreamMetaInfo.class.getClassLoader().getResourceAsStream(String.join("/",
+                    new String[]{"metadata", "index_rules", streamName, name + ".json"}));
+            if (is == null) {
+                return null;
+            }
+            String result = CharStreams.toString(new InputStreamReader(is, Charsets.UTF_8));
+            BanyandbMetadata.IndexRule.Builder b = BanyandbMetadata.IndexRule.newBuilder();
+            JsonFormat.parser().merge(result, b);
+            return b.build();
+        } catch (IOException ioEx) {
+            log.error("fail to read json", ioEx);
+            return null;
+        }
+    }
+
+    // TODO: change modifier to public in SDK
+    static Duration fromProtobuf(BanyandbMetadata.Duration duration) {
+        switch (duration.getUnit()) {
+            case DURATION_UNIT_DAY:
+                return Duration.ofDays(duration.getVal());
+            case DURATION_UNIT_HOUR:
+                return Duration.ofHours(duration.getVal());
+            case DURATION_UNIT_MONTH:
+                return Duration.ofMonths(duration.getVal());
+            case DURATION_UNIT_WEEK:
+                return Duration.ofWeeks(duration.getVal());
+            default:
+                throw new IllegalArgumentException("unrecognized DurationUnit");
+        }
+    }
+
+    // TODO: change modifier to public in SDK
+    private static IndexRule.IndexType fromProtobuf(BanyandbMetadata.IndexRule.Type type) {
+        switch (type) {
+            case TYPE_TREE:
+                return IndexRule.IndexType.TREE;
+            case TYPE_INVERTED:
+                return IndexRule.IndexType.INVERTED;
+            default:
+                throw new IllegalArgumentException("unrecognized index type");
+        }
+    }
+
+    // TODO: change modifier to public in SDK
+    private static IndexRule.IndexLocation fromProtobuf(BanyandbMetadata.IndexRule.Location loc) {
+        switch (loc) {
+            case LOCATION_GLOBAL:
+                return IndexRule.IndexLocation.GLOBAL;
+            case LOCATION_SERIES:
+                return IndexRule.IndexLocation.SERIES;
+            default:
+                throw new IllegalArgumentException("unrecognized index location");
+        }
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/resources/metadata/index_rules/segment/db.instance.json b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/resources/metadata/index_rules/segment/db.instance.json
new file mode 100644
index 0000000000..662cff2eac
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/resources/metadata/index_rules/segment/db.instance.json
@@ -0,0 +1,13 @@
+{
+  "metadata": {
+    "id": 1,
+    "name": "db.instance",
+    "group": "default"
+  },
+  "tags": [
+    "db.instance"
+  ],
+  "type": "TYPE_INVERTED",
+  "location": "LOCATION_SERIES",
+  "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/resources/metadata/segment.json b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/resources/metadata/segment.json
new file mode 100644
index 0000000000..1317d9ecf0
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/resources/metadata/segment.json
@@ -0,0 +1,93 @@
+{
+  "metadata": {
+    "name": "segment",
+    "group": "default"
+  },
+  "tag_families": [
+    {
+      "name": "data",
+      "tags": [
+        {
+          "name": "data_binary",
+          "type": "TAG_TYPE_DATA_BINARY"
+        }
+      ]
+    },
+    {
+      "name": "searchable",
+      "tags": [
+        {
+          "name": "trace_id",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "state",
+          "type": "TAG_TYPE_INT"
+        },
+        {
+          "name": "service_id",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "service_instance_id",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "endpoint_id",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "duration",
+          "type": "TAG_TYPE_INT"
+        },
+        {
+          "name": "start_time",
+          "type": "TAG_TYPE_INT"
+        },
+        {
+          "name": "http.method",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "status_code",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "db.type",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "db.instance",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "mq.queue",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "mq.topic",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "mq.broker",
+          "type": "TAG_TYPE_STRING"
+        }
+      ]
+    }
+  ],
+  "entity": {
+    "tag_names": [
+      "service_id",
+      "service_instance_id",
+      "state"
+    ]
+  },
+  "opts": {
+    "shard_num": 2,
+    "ttl": {
+      "val": 7,
+      "unit": "DURATION_UNIT_DAY"
+    }
+  },
+  "updated_at_nanoseconds": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file