You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/04/22 03:24:42 UTC

[skywalking] 21/22: make code compiled

This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch banyandb-integration-stream
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 6aac11e47cc66974b228b39da5f5739204f0c6a9
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Thu Apr 21 21:23:55 2022 +0800

    make code compiled
---
 .../storage-banyandb-plugin/pom.xml                |   2 +-
 .../storage/plugin/banyandb/BanyanDBConverter.java |  55 ++--
 .../plugin/banyandb/BanyanDBIndexInstaller.java    |  47 +--
 .../plugin/banyandb/BanyanDBMetricsDAO.java        |   1 -
 .../plugin/banyandb/BanyanDBStorageClient.java     |  30 +-
 .../plugin/banyandb/BanyanDBStorageProvider.java   |  21 +-
 .../storage/plugin/banyandb/MetadataRegistry.java  | 319 +++++++++++++--------
 .../storage/plugin/banyandb/StreamMetadata.java    |  71 -----
 .../banyandb/measure/BanyanDBMetadataQueryDAO.java |   7 +-
 .../measure/BanyanDBProfileTaskQueryDAO.java       |   2 +-
 ...kQueryDAO.java => BanyanDBServiceLabelDAO.java} |  12 +-
 .../banyandb/stream/AbstractBanyanDBDAO.java       |  30 +-
 .../banyandb/stream/BanyanDBAlarmQueryDAO.java     |  13 +-
 .../plugin/banyandb/stream/BanyanDBBatchDAO.java   |   1 -
 .../stream/BanyanDBBrowserLogQueryDAO.java         |  16 +-
 .../BanyanDBEBPFProfilingDataDAO.java}             |  15 +-
 .../BanyanDBEBPFProfilingScheduleQueryDAO.java}    |  15 +-
 .../BanyanDBEBPFProfilingTaskDAO.java}             |  16 +-
 .../banyandb/stream/BanyanDBLogQueryDAO.java       |  32 +--
 .../stream/BanyanDBProfileTaskLogQueryDAO.java     |  28 +-
 .../BanyanDBProfileThreadSnapshotQueryDAO.java     |  70 +++--
 .../plugin/banyandb/stream/BanyanDBRecordDAO.java  |   9 +-
 .../banyandb/stream/BanyanDBTraceQueryDAO.java     |  27 +-
 23 files changed, 396 insertions(+), 443 deletions(-)

diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/pom.xml b/oap-server/server-storage-plugin/storage-banyandb-plugin/pom.xml
index e2c9ae7817..7546099e4b 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/pom.xml
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>server-storage-plugin</artifactId>
         <groupId>org.apache.skywalking</groupId>
-        <version>9.0.0-SNAPSHOT</version>
+        <version>9.1.0-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
index 565245ad55..41237f1f3b 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
@@ -19,11 +19,13 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb;
 
 import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
 import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
 import org.apache.skywalking.banyandb.v1.client.StreamWrite;
 import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
+import org.apache.skywalking.banyandb.v1.client.metadata.Serializable;
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
@@ -35,16 +37,11 @@ import java.util.function.Function;
 public class BanyanDBConverter {
     @RequiredArgsConstructor
     public static class StreamToEntity implements Convert2Entity {
-        private final StreamMetadata metadata;
         private final RowEntity rowEntity;
 
         @Override
         public Object get(String fieldName) {
-            final StreamMetadata.TagMetadata metadata = this.metadata.getTagDefinition().get(fieldName);
-            if (metadata == null) {
-                return null;
-            }
-            return rowEntity.getValue(metadata.getTagFamilyName(), metadata.getTagSpec().getTagName());
+            return rowEntity.getTagValue(fieldName);
         }
 
         @Override
@@ -53,9 +50,9 @@ public class BanyanDBConverter {
         }
     }
 
+    @Slf4j
     @RequiredArgsConstructor
     public static class StreamToStorage implements Convert2Storage<StreamWrite> {
-        private final StreamMetadata metadata;
         private final StreamWrite streamWrite;
 
         @Override
@@ -64,41 +61,28 @@ public class BanyanDBConverter {
             if (Record.TIME_BUCKET.equals(fieldName)) {
                 return;
             }
-            final StreamMetadata.TagMetadata metadata = this.metadata.getTagDefinition().get(fieldName);
-            if (metadata == null) {
-                return;
-            }
-            switch (metadata.getTagFamilyName()) {
-                case StreamMetadata.TAG_FAMILY_DATA:
-                    this.streamWrite.dataTag(metadata.getTagIndex(), buildTag(fieldValue));
-                    break;
-                case StreamMetadata.TAG_FAMILY_SEARCHABLE:
-                    this.streamWrite.searchableTag(metadata.getTagIndex(), buildTag(fieldValue));
-                    break;
-                default:
-                    throw new IllegalStateException("tag family is not supported");
+            try {
+                this.streamWrite.tag(fieldName, buildTag(fieldValue));
+            } catch (BanyanDBException ex) {
+                log.error("fail to add tag", ex);
             }
         }
 
-        private SerializableTag<BanyandbModel.TagValue> buildTag(Object value) {
+        private Serializable<BanyandbModel.TagValue> buildTag(Object value) {
             if (Integer.class.equals(value.getClass()) || Long.class.equals(value.getClass())) {
-                return TagAndValue.longField((long) value);
+                return TagAndValue.longTagValue((long) value);
             } else if (String.class.equals(value.getClass())) {
-                return TagAndValue.stringField((String) value);
+                return TagAndValue.stringTagValue((String) value);
             }
             throw new IllegalStateException(value.getClass() + " is not supported");
         }
 
         @Override
         public void accept(String fieldName, byte[] fieldValue) {
-            final StreamMetadata.TagMetadata metadata = this.metadata.getTagDefinition().get(fieldName);
-            if (metadata == null) {
-                return;
-            }
-            if (StreamMetadata.TAG_FAMILY_SEARCHABLE.equals(metadata.getTagFamilyName())) {
-                this.streamWrite.searchableTag(metadata.getTagIndex(), TagAndValue.binaryField((fieldValue)));
-            } else {
-                throw new IllegalStateException("binary tag should not be store in the `data` family");
+            try {
+                this.streamWrite.tag(fieldName, TagAndValue.binaryTagValue(fieldValue));
+            } catch (BanyanDBException ex) {
+                log.error("fail to add tag", ex);
             }
         }
 
@@ -120,19 +104,12 @@ public class BanyanDBConverter {
 
         @Override
         public Object get(String fieldName) {
-            final StreamMetadata.TagMetadata metadata = this.metadata.getTagDefinition().get(fieldName);
-            if (metadata == null) {
-                return null;
-            }
             // TODO: get an unmodifiable view of tag
             return null;
         }
 
         @Override
         public StreamWrite obtain() {
-            if (metadata.isUseIdAsEntity()) {
-                this.accept(StreamMetadata.ID, this.streamWrite.getElementID());
-            }
             return this.streamWrite;
         }
     }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
index f4483b0a14..d894c9990a 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
@@ -19,9 +19,10 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.banyandb.v1.client.metadata.Catalog;
-import org.apache.skywalking.banyandb.v1.client.metadata.Duration;
+import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
+import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
 import org.apache.skywalking.banyandb.v1.client.metadata.Group;
+import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.config.ConfigService;
 import org.apache.skywalking.oap.server.core.storage.StorageException;
@@ -43,25 +44,39 @@ public class BanyanDBIndexInstaller extends ModelInstaller {
 
     @Override
     protected boolean isExists(Model model) throws StorageException {
-        // TODO: get from BanyanDB and make a diff?
-        return false;
+        final MetadataRegistry.PartialMetadata metadata = MetadataRegistry.INSTANCE.parseMetadata(model);
+        try {
+            final BanyanDBClient c = ((BanyanDBStorageClient) this.client).client;
+            // first check group
+            Group g = metadata.getOrCreateGroup(c);
+            if (g == null) {
+                throw new StorageException("fail to create group " + metadata.getGroup());
+            }
+            log.info("group {} created", g.name());
+            // then check entity schema
+            return metadata.findRemoteSchema(c).isPresent();
+        } catch (BanyanDBException ex) {
+            throw new StorageException("fail to check existence", ex);
+        }
     }
 
     @Override
     protected void createTable(Model model) throws StorageException {
-        if (model.isTimeSeries() && model.isRecord()) { // stream
-            StreamMetadata metaInfo = MetadataRegistry.INSTANCE.registerModel(model, this.configService);
-            if (metaInfo != null) {
-                log.info("install index {}", model.getName());
-                ((BanyanDBStorageClient) client).define(
-                        new Group(metaInfo.getGroup(), Catalog.STREAM, 2, 10, Duration.ofDays(7))
-                );
-                ((BanyanDBStorageClient) client).define(metaInfo);
+        try {
+            if (model.isTimeSeries() && model.isRecord()) { // stream
+                Stream stream = (Stream) MetadataRegistry.INSTANCE.registerModel(model, this.configService);
+                if (stream != null) {
+                    log.info("install stream schema {}", model.getName());
+                    ((BanyanDBStorageClient) client).define(stream);
+                }
+            } else if (model.isTimeSeries() && !model.isRecord()) { // measure
+                // TODO: dynamically register Measure
+                log.info("skip measure index {}", model.getName());
+            } else if (!model.isTimeSeries()) { // UITemplate
+                log.info("skip property index {}", model.getName());
             }
-        } else if (model.isTimeSeries() && !model.isRecord()) { // measure
-            log.info("skip measure index {}", model.getName());
-        } else if (!model.isTimeSeries()) { // UITemplate
-            log.info("skip property index {}", model.getName());
+        } catch (BanyanDBException ex) {
+            throw new StorageException("fail to install schema", ex);
         }
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBMetricsDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBMetricsDAO.java
index af54dd350f..4e84979498 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBMetricsDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBMetricsDAO.java
@@ -2,7 +2,6 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb;
 
 import lombok.RequiredArgsConstructor;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
-import org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
index b1ca206110..dd92c6fd08 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
@@ -23,8 +23,7 @@ import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
 import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
 import org.apache.skywalking.banyandb.v1.client.StreamWrite;
-import org.apache.skywalking.banyandb.v1.client.metadata.Group;
-import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule;
+import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
 import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
 import org.apache.skywalking.oap.server.library.client.Client;
 import org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker;
@@ -32,25 +31,17 @@ import org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckab
 import org.apache.skywalking.oap.server.library.util.HealthChecker;
 
 import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * BanyanDBStorageClient is a simple wrapper for the underlying {@link BanyanDBClient},
  * which implement {@link Client} and {@link HealthCheckable}.
  */
 public class BanyanDBStorageClient implements Client, HealthCheckable {
-    private final BanyanDBClient client;
-    private final Map<String, Group> groupMap;
+    final BanyanDBClient client;
     private final DelegatedHealthChecker healthChecker = new DelegatedHealthChecker();
 
     public BanyanDBStorageClient(String host, int port) {
         this.client = new BanyanDBClient(host, port);
-        this.groupMap = new ConcurrentHashMap<>();
-    }
-
-    public Group define(Group group) {
-        return groupMap.computeIfAbsent(group.getName(), s -> client.define(group));
     }
 
     @Override
@@ -63,22 +54,19 @@ public class BanyanDBStorageClient implements Client, HealthCheckable {
         this.client.close();
     }
 
-    public StreamQueryResponse query(StreamQuery streamQuery) {
+    public StreamQueryResponse query(StreamQuery streamQuery) throws BanyanDBException {
         try {
-            StreamQueryResponse response = this.client.queryStreams(streamQuery);
+            StreamQueryResponse response = this.client.query(streamQuery);
             this.healthChecker.health();
             return response;
-        } catch (Throwable t) {
-            healthChecker.unHealth(t);
-            throw t;
+        } catch (BanyanDBException ex) {
+            healthChecker.unHealth(ex);
+            throw ex;
         }
     }
 
-    public void define(StreamMetadata streamMetadata) {
-        Stream stream = this.client.define(streamMetadata.getStream());
-        if (stream != null) {
-            this.client.defineIndexRules(stream, streamMetadata.getIndexRules().toArray(new IndexRule[]{}));
-        }
+    public void define(Stream stream) throws BanyanDBException {
+        this.client.define(stream);
     }
 
     public void write(StreamWrite streamWrite) {
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
index 922bf2296a..88a81d3372 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
@@ -28,9 +28,13 @@ import org.apache.skywalking.oap.server.core.storage.StorageModule;
 import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
 import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
 import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
-import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingDataDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingScheduleDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingTaskDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IServiceLabelDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskLogQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileThreadSnapshotQueryDAO;
 import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
 import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
 import org.apache.skywalking.oap.server.core.storage.query.IBrowserLogQueryDAO;
@@ -50,11 +54,15 @@ import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDB
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMetadataQueryDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBNetworkAddressAliasDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBProfileTaskQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBServiceLabelDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBTopologyQueryDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBUITemplateManagementDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBAlarmQueryDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBBatchDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBBrowserLogQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBEBPFProfilingDataDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBEBPFProfilingScheduleQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBEBPFProfilingTaskDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBHistoryDeleteDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBLogQueryDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBProfileTaskLogQueryDAO;
@@ -106,13 +114,16 @@ public class BanyanDBStorageProvider extends ModuleProvider {
         this.registerServiceImplementation(ILogQueryDAO.class, new BanyanDBLogQueryDAO(client));
         this.registerServiceImplementation(IProfileTaskQueryDAO.class, new BanyanDBProfileTaskQueryDAO());
         this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new BanyanDBProfileTaskLogQueryDAO(client, this.config.getFetchTaskLogMaxSize()));
-        this.registerServiceImplementation(
-                IProfileThreadSnapshotQueryDAO.class, new BanyanDBProfileThreadSnapshotQueryDAO(client));
+        this.registerServiceImplementation(IProfileThreadSnapshotQueryDAO.class, new BanyanDBProfileThreadSnapshotQueryDAO(client));
         this.registerServiceImplementation(UITemplateManagementDAO.class, new BanyanDBUITemplateManagementDAO());
         this.registerServiceImplementation(IEventQueryDAO.class, new BanyanDBEventQueryDAO());
         this.registerServiceImplementation(ITopologyQueryDAO.class, new BanyanDBTopologyQueryDAO());
+        this.registerServiceImplementation(IEBPFProfilingTaskDAO.class, new BanyanDBEBPFProfilingTaskDAO());
+        this.registerServiceImplementation(IEBPFProfilingDataDAO.class, new BanyanDBEBPFProfilingDataDAO());
+        this.registerServiceImplementation(IEBPFProfilingScheduleDAO.class, new BanyanDBEBPFProfilingScheduleQueryDAO());
 
         // TODO: metrics
+        this.registerServiceImplementation(IServiceLabelDAO.class, new BanyanDBServiceLabelDAO());
         this.registerServiceImplementation(IHistoryDeleteDAO.class, new BanyanDBHistoryDeleteDAO());
         this.registerServiceImplementation(IMetricsQueryDAO.class, new BanyanDBMetricsQueryDAO());
         this.registerServiceImplementation(IAggregationQueryDAO.class, new BanyanDBAggregationQueryDAO());
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
index 83af6da867..b8925c7529 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
@@ -18,10 +18,17 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb;
 
-import com.google.common.collect.ImmutableSet;
-import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
-import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
+import io.grpc.Status;
+import lombok.Data;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
+import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
+import org.apache.skywalking.banyandb.v1.client.metadata.Catalog;
+import org.apache.skywalking.banyandb.v1.client.metadata.Duration;
+import org.apache.skywalking.banyandb.v1.client.metadata.Group;
 import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule;
+import org.apache.skywalking.banyandb.v1.client.metadata.NamedSchema;
 import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
 import org.apache.skywalking.banyandb.v1.client.metadata.TagFamilySpec;
 import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
@@ -33,6 +40,7 @@ import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
 import org.apache.skywalking.oap.server.library.util.StringUtil;
 
+import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -40,173 +48,230 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
+@Slf4j
 public enum MetadataRegistry {
     INSTANCE;
 
-    private final Map<String, StreamMetadata> streams = new HashMap<>();
+    private final Map<String, PartialMetadata> registry = new ConcurrentHashMap<>();
 
-    public StreamMetadata registerModel(Model model, ConfigService configService) {
-        BanyandbDatabase.Stream pbStream = parseStreamFromModel(model, configService);
+    public NamedSchema<?> registerModel(Model model, ConfigService configService) {
+        PartialMetadata partialMetadata = parseMetadata(model);
+        final Stream.Builder builder = Stream.create(partialMetadata.getGroup(), partialMetadata.getName());
+        Map<String, ModelColumn> modelColumnMap = model.getColumns().stream()
+                .collect(Collectors.toMap(modelColumn -> modelColumn.getColumnName().getStorageName(), Function.identity()));
+        // parse and set sharding keys
+        builder.setEntityRelativeTags(parseEntityNames(modelColumnMap));
+        // parse and set tag families, which contains tag specs
+        List<TagFamilySpec> specs = parseTagFamilySpecs(model, partialMetadata, configService);
+        builder.addTagFamilies(specs);
+        // parse and add index definition
+        builder.addIndexes(parseIndexRules(specs, partialMetadata.indexFamily(), modelColumnMap));
 
-        final boolean useIdAsEntity = pbStream.getEntity().getTagNamesCount() == 1 &&
-                StreamMetadata.ID.equals(pbStream.getEntity().getTagNames(0));
+        registry.put(model.getName(), partialMetadata);
+        return builder.build();
+    }
 
-        final Stream stream = new Stream(pbStream.getMetadata().getGroup(), pbStream.getMetadata().getName());
+    public PartialMetadata findSchema(final String name) {
+        return this.registry.get(name);
+    }
 
+    List<IndexRule> parseIndexRules(List<TagFamilySpec> specs, String indexTagFamily, Map<String, ModelColumn> modelColumnMap) {
         List<IndexRule> indexRules = new ArrayList<>();
-        Set<String> entityNameSet = ImmutableSet.copyOf(pbStream.getEntity().getTagNamesList());
-        stream.setEntityTagNames(pbStream.getEntity().getTagNamesList());
-
-        Map<String, StreamMetadata.TagMetadata> tagDefinition = new HashMap<>();
-
-        for (BanyandbDatabase.TagFamilySpec pbTagFamilySpec : pbStream.getTagFamiliesList()) {
-            final TagFamilySpec tagFamilySpec = TagFamilySpec.fromProtobuf(pbTagFamilySpec);
-            stream.addTagFamilySpec(tagFamilySpec);
-
-            int tagIndex = 0;
-            for (final TagFamilySpec.TagSpec tagSpec : tagFamilySpec.getTagSpecs()) {
-                // register tag
-                tagDefinition.put(tagSpec.getTagName(), new StreamMetadata.TagMetadata(tagFamilySpec.getTagFamilyName(), tagSpec, tagIndex++));
-
-                // if the tag family equals to "searchable", build index rules
-                if (tagFamilySpec.getTagFamilyName().equals(StreamMetadata.TAG_FAMILY_SEARCHABLE)) {
-                    // check if this spec exists in the entity names
-                    if (entityNameSet.contains(tagSpec.getTagName())) {
-                        continue;
-                    }
-                    BanyandbDatabase.IndexRule pbIndexRule = parseIndexRuleFromTagSpec(pbStream.getMetadata(), tagSpec);
-                    IndexRule indexRule = IndexRule.fromProtobuf(pbIndexRule);
-                    indexRules.add(indexRule);
+        for (final TagFamilySpec spec : specs) {
+            if (!indexTagFamily.equals(spec.tagFamilyName())) {
+                continue;
+            }
+            for (final TagFamilySpec.TagSpec tagSpec : spec.tagSpecs()) {
+                final String tagName = tagSpec.getTagName();
+                // TODO: we need to add support index type in the OAP core
+                // Currently, we only register INVERTED type
+                final ModelColumn modelColumn = modelColumnMap.get(tagName);
+                // if it is null, it must be a user-defined tag
+                if (modelColumn == null) {
+                    indexRules.add(IndexRule.create(tagName, IndexRule.IndexType.INVERTED, IndexRule.IndexLocation.SERIES));
+                    continue;
+                }
+                if (modelColumn.getBanyanDBExtension().isGlobalIndexing()) {
+                    indexRules.add(IndexRule.create(tagName, IndexRule.IndexType.INVERTED, IndexRule.IndexLocation.GLOBAL));
+                } else {
+                    indexRules.add(IndexRule.create(tagName, IndexRule.IndexType.INVERTED, IndexRule.IndexLocation.SERIES));
                 }
             }
         }
-
-        StreamMetadata streamMetadata = StreamMetadata.builder().model(model).stream(stream)
-                .tagDefinition(tagDefinition)
-                .indexRules(indexRules)
-                .group(pbStream.getMetadata().getGroup())
-                .useIdAsEntity(useIdAsEntity)
-                .build();
-        streams.put(model.getName(), streamMetadata);
-        return streamMetadata;
-    }
-
-    public StreamMetadata findStreamMetadata(final String name) {
-        return this.streams.get(name);
+        return indexRules;
     }
 
-    private BanyandbDatabase.Stream parseStreamFromModel(Model model, ConfigService configService) {
+    /**
+     * Parse sharding keys from the {@link Model}
+     *
+     * @param modelColumnMap the mapping between column storageName and {@link ModelColumn}
+     * @return a list of column names in strict order
+     */
+    List<String> parseEntityNames(Map<String, ModelColumn> modelColumnMap) {
         List<ModelColumn> shardingColumns = new ArrayList<>();
+        for (final ModelColumn col : modelColumnMap.values()) {
+            if (col.getBanyanDBExtension().isShardingKey()) {
+                shardingColumns.add(col);
+            }
+        }
+        return shardingColumns.stream()
+                .sorted(Comparator.comparingInt(col -> col.getBanyanDBExtension().getShardingKeyIdx()))
+                .map(col -> col.getColumnName().getName())
+                .collect(Collectors.toList());
+    }
 
-        List<BanyandbDatabase.TagSpec> searchableTagsSpecs = new ArrayList<>();
-        List<BanyandbDatabase.TagSpec> dataTagsSpecs = new ArrayList<>();
-        for (final ModelColumn modelColumn : model.getColumns()) {
-            if (modelColumn.getShardingKeyIdx() > -1) {
-                shardingColumns.add(modelColumn);
+    List<TagFamilySpec> parseTagFamilySpecs(Model model, PartialMetadata metadata, ConfigService configService) {
+        Map<String, TagFamilySpec.Builder> builderMap = new HashMap<>();
+        for (final ModelColumn col : model.getColumns()) {
+            final TagFamilySpec.TagSpec tagSpec = parseTagSpec(col);
+            if (tagSpec == null) {
+                continue;
             }
-            if (modelColumn.isIndexOnly()) {
-                // skip
-            } else if (modelColumn.isStorageOnly()) {
-                dataTagsSpecs.add(parseTagSpecFromModelColumn(modelColumn));
+            if (col.shouldIndex()) {
+                builderMap.computeIfAbsent(metadata.indexFamily(), TagFamilySpec::create).addTagSpec(tagSpec);
             } else {
-                searchableTagsSpecs.add(parseTagSpecFromModelColumn(modelColumn));
+                builderMap.computeIfAbsent(metadata.nonIndexFamily(), TagFamilySpec::create).addTagSpec(tagSpec);
             }
         }
 
-        Set<String> entities = shardingColumns.stream()
-                .sorted(Comparator.comparingInt(ModelColumn::getShardingKeyIdx))
-                .map(modelColumn -> modelColumn.getColumnName().getStorageName())
-                .collect(Collectors.toSet());
-
-        if (entities.isEmpty()) {
-            // if sharding keys are not defined, we have to use ID
-            entities = Collections.singleton(StreamMetadata.ID);
-            // append ID
-            searchableTagsSpecs.add(BanyandbDatabase.TagSpec.newBuilder()
-                    .setName(StreamMetadata.ID)
-                    .setType(BanyandbDatabase.TagType.TAG_TYPE_STRING).build());
-        }
-
         // add all user-defined indexed tags to the end of the "searchable" family
         if (SegmentRecord.INDEX_NAME.equals(model.getName())) {
-            searchableTagsSpecs.addAll(parseTagSpecsFromConfiguration(configService.getSearchableTracesTags()));
+            builderMap.computeIfAbsent(metadata.indexFamily(), TagFamilySpec::create).addTagSpecs(parseExtraTagSpecs(configService.getSearchableTracesTags()));
         } else if (LogRecord.INDEX_NAME.equals(model.getName())) {
-            searchableTagsSpecs.addAll(parseTagSpecsFromConfiguration(configService.getSearchableLogsTags()));
+            builderMap.computeIfAbsent(metadata.indexFamily(), TagFamilySpec::create).addTagSpecs(parseExtraTagSpecs(configService.getSearchableLogsTags()));
         } else if (AlarmRecord.INDEX_NAME.equals(model.getName())) {
-            searchableTagsSpecs.addAll(parseTagSpecsFromConfiguration(configService.getSearchableAlarmTags()));
+            builderMap.computeIfAbsent(metadata.indexFamily(), TagFamilySpec::create).addTagSpecs(parseExtraTagSpecs(configService.getSearchableAlarmTags()));
         }
 
-        String group = "default-stream";
-        if (model.isSuperDataset()) {
-            // for superDataset, we should use separate group
-            group = model.getName() + "-stream";
-        }
+        return builderMap.values().stream().map(TagFamilySpec.Builder::build).collect(Collectors.toList());
+    }
 
-        return BanyandbDatabase.Stream.newBuilder()
-                .addTagFamilies(BanyandbDatabase.TagFamilySpec.newBuilder()
-                        .setName(StreamMetadata.TAG_FAMILY_DATA)
-                        .addAllTags(dataTagsSpecs)
-                        .build())
-                .addTagFamilies(BanyandbDatabase.TagFamilySpec.newBuilder()
-                        .setName(StreamMetadata.TAG_FAMILY_SEARCHABLE)
-                        .addAllTags(searchableTagsSpecs)
-                        .build())
-                .setEntity(BanyandbDatabase.Entity.newBuilder()
-                        .addAllTagNames(entities)
-                        .build())
-                .setMetadata(BanyandbCommon.Metadata.newBuilder()
-                        .setGroup(group)
-                        .setName(model.getName())
-                        .build())
-                .build();
+    /**
+     * Extract extra tags from Configuration.
+     * They are for tags defined for {@link SegmentRecord}, {@link LogRecord} and {@link AlarmRecord}.
+     *
+     * @param tags a series of tags joint by comma
+     * @return a list of {@link org.apache.skywalking.banyandb.v1.client.metadata.TagFamilySpec.TagSpec} generated from input
+     */
+    private List<TagFamilySpec.TagSpec> parseExtraTagSpecs(String tags) {
+        if (StringUtil.isEmpty(tags)) {
+            return Collections.emptyList();
+        }
+        String[] tagsArray = tags.split(",");
+        if (tagsArray.length == 0) {
+            return Collections.emptyList();
+        }
+        return Arrays.stream(tagsArray)
+                .map(TagFamilySpec.TagSpec::newStringTag)
+                .collect(Collectors.toList());
     }
 
-    private BanyandbDatabase.TagSpec parseTagSpecFromModelColumn(ModelColumn modelColumn) {
+    /**
+     * Parse TagSpec from {@link ModelColumn}
+     *
+     * @param modelColumn the column in the model to be parsed
+     * @return a typed tag spec
+     */
+    @Nullable
+    private TagFamilySpec.TagSpec parseTagSpec(ModelColumn modelColumn) {
         final Class<?> clazz = modelColumn.getType();
         if (String.class.equals(clazz)) {
-            return BanyandbDatabase.TagSpec.newBuilder().setName(modelColumn.getColumnName().getStorageName())
-                    .setType(BanyandbDatabase.TagType.TAG_TYPE_STRING).build();
+            return TagFamilySpec.TagSpec.newStringTag(modelColumn.getColumnName().getStorageName());
         } else if (int.class.equals(clazz) || long.class.equals(clazz)) {
-            return BanyandbDatabase.TagSpec.newBuilder().setName(modelColumn.getColumnName().getStorageName())
-                    .setType(BanyandbDatabase.TagType.TAG_TYPE_INT).build();
+            return TagFamilySpec.TagSpec.newIntTag(modelColumn.getColumnName().getStorageName());
         } else if (byte[].class.equals(clazz) || DataTable.class.equals(clazz)) {
-            return BanyandbDatabase.TagSpec.newBuilder().setName(modelColumn.getColumnName().getStorageName())
-                    .setType(BanyandbDatabase.TagType.TAG_TYPE_DATA_BINARY).build();
+            return TagFamilySpec.TagSpec.newBinaryTag(modelColumn.getColumnName().getStorageName());
         } else {
+            // TODO: we skip all tags with type of List<String>
+            if ("tags".equals(modelColumn.getColumnName().getStorageName())) {
+                return null;
+            }
             throw new IllegalStateException("type " + modelColumn.getType().toString() + " is not supported");
         }
     }
 
-    private List<BanyandbDatabase.TagSpec> parseTagSpecsFromConfiguration(String tags) {
-        if (StringUtil.isEmpty(tags)) {
-            return Collections.emptyList();
+    public PartialMetadata parseMetadata(Model model) {
+        if (model.isRecord()) {
+            String group = "stream-default";
+            if (model.isSuperDataset()) {
+                // for superDataset, we should use separate group
+                group = "stream-" + model.getName();
+            }
+            return new PartialMetadata(group, model.getName(), Kind.STREAM);
         }
-        String[] tagsArray = tags.split(",");
-        if (tagsArray.length == 0) {
-            return Collections.emptyList();
+        return new PartialMetadata("measure-default", model.getName(), Kind.MEASURE);
+    }
+
+    @RequiredArgsConstructor
+    @Data
+    public static class PartialMetadata {
+        private final String group;
+        private final String name;
+        private final Kind kind;
+
+        public Optional<NamedSchema<?>> findRemoteSchema(BanyanDBClient client) throws BanyanDBException {
+            try {
+                switch (kind) {
+                    case STREAM:
+                        return Optional.ofNullable(client.findStream(this.group, this.name));
+                    case MEASURE:
+                        return Optional.ofNullable(client.findMeasure(this.group, this.name));
+                    default:
+                        throw new IllegalStateException("should not reach here");
+                }
+            } catch (BanyanDBException ex) {
+                if (ex.getStatus().equals(Status.Code.NOT_FOUND)) {
+                    return Optional.empty();
+                }
+
+                throw ex;
+            }
+        }
+
+        public Group getOrCreateGroup(BanyanDBClient client) throws BanyanDBException {
+            Group g = client.findGroup(this.group);
+            if (g != null) {
+                return g;
+            }
+            switch (kind) {
+                case STREAM:
+                    return client.define(Group.create(this.group, Catalog.STREAM, 2, 0, Duration.ofDays(7)));
+                case MEASURE:
+                    return client.define(Group.create(this.group, Catalog.MEASURE, 2, 12, Duration.ofDays(7)));
+                default:
+                    throw new IllegalStateException("should not reach here");
+            }
+        }
+
+        public String indexFamily() {
+            switch (kind) {
+                case MEASURE:
+                    return "default";
+                case STREAM:
+                    return "searchable";
+                default:
+                    throw new IllegalStateException("should not reach here");
+            }
+        }
+
+        public String nonIndexFamily() {
+            switch (kind) {
+                case MEASURE:
+                    return null;
+                case STREAM:
+                    return "binary";
+                default:
+                    throw new IllegalStateException("should not reach here");
+            }
         }
-        return Arrays.stream(tagsArray)
-                .map(tagName -> BanyandbDatabase.TagSpec.newBuilder().setName(tagName)
-                        .setType(BanyandbDatabase.TagType.TAG_TYPE_STRING).build())
-                .collect(Collectors.toList());
     }
 
-    private BanyandbDatabase.IndexRule parseIndexRuleFromTagSpec(BanyandbCommon.Metadata metadata, TagFamilySpec.TagSpec tagSpec) {
-        // In SkyWalking, only "trace_id" should be stored as a global index
-        BanyandbDatabase.IndexRule.Location loc = "trace_id".equals(tagSpec.getTagName()) ?
-                BanyandbDatabase.IndexRule.Location.LOCATION_GLOBAL :
-                BanyandbDatabase.IndexRule.Location.LOCATION_SERIES;
-
-        return BanyandbDatabase.IndexRule.newBuilder()
-                .setMetadata(BanyandbCommon.Metadata.newBuilder()
-                        .setName(tagSpec.getTagName()).setGroup(metadata.getGroup()))
-                .setLocation(loc)
-                .addTags(tagSpec.getTagName())
-                // TODO: support TYPE_TREE
-                .setType(BanyandbDatabase.IndexRule.Type.TYPE_INVERTED)
-                .build();
+    public enum Kind {
+        MEASURE, STREAM;
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamMetadata.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamMetadata.java
deleted file mode 100644
index 814f5f62e2..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamMetadata.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb;
-
-import lombok.Builder;
-import lombok.Data;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule;
-import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
-import org.apache.skywalking.banyandb.v1.client.metadata.TagFamilySpec;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
-
-import java.util.List;
-import java.util.Map;
-
-@Getter
-@Builder
-@Slf4j
-public class StreamMetadata {
-    public static final String TAG_FAMILY_SEARCHABLE = "searchable";
-    public static final String TAG_FAMILY_DATA = "data";
-
-    public static final String ID = "id";
-
-    private final Model model;
-
-    private final Map<String, TagMetadata> tagDefinition;
-
-    /**
-     * Group of the stream
-     */
-    private final String group;
-    /**
-     * Spec of the stream
-     */
-    private final Stream stream;
-    /**
-     * Index rules attached to the stream
-     */
-    private final List<IndexRule> indexRules;
-
-    private final int dataFamilySize;
-    private final int searchableFamilySize;
-
-    private final boolean useIdAsEntity;
-
-    @Getter
-    @Data
-    public static class TagMetadata {
-        private final String tagFamilyName;
-        private final TagFamilySpec.TagSpec tagSpec;
-        private final int tagIndex;
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
index 5833c98c56..bff4fc00be 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
@@ -55,10 +55,15 @@ public class BanyanDBMetadataQueryDAO implements IMetadataQueryDAO {
     }
 
     @Override
-    public List<Process> listProcesses(String serviceId, String instanceId) throws IOException {
+    public List<Process> listProcesses(String serviceId, String instanceId, String agentId, long lastPingStartTimeBucket, long lastPingEndTimeBucket) throws IOException {
         return Collections.emptyList();
     }
 
+    @Override
+    public long getProcessesCount(String serviceId, String instanceId, String agentId, long lastPingStartTimeBucket, long lastPingEndTimeBucket) throws IOException {
+        return 0;
+    }
+
     @Override
     public Process getProcess(String processId) throws IOException {
         return null;
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
index 0c6e1ebfd1..690bc6640d 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
@@ -19,7 +19,7 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
 
 import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
-import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskQueryDAO;
 
 import java.io.IOException;
 import java.util.Collections;
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBServiceLabelDAO.java
similarity index 67%
copy from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
copy to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBServiceLabelDAO.java
index 0c6e1ebfd1..295af6f75a 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBServiceLabelDAO.java
@@ -18,21 +18,15 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
 
-import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
-import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IServiceLabelDAO;
 
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
-public class BanyanDBProfileTaskQueryDAO implements IProfileTaskQueryDAO {
+public class BanyanDBServiceLabelDAO implements IServiceLabelDAO {
     @Override
-    public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws IOException {
+    public List<String> queryAllLabels(String serviceId) throws IOException {
         return Collections.emptyList();
     }
-
-    @Override
-    public ProfileTask getById(String id) throws IOException {
-        return null;
-    }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
index 0f093e396b..c40ac35924 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
@@ -22,12 +22,14 @@ import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
 import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
 import org.apache.skywalking.banyandb.v1.client.TimestampRange;
+import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
 import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
 
+import java.io.IOException;
 import java.time.Instant;
-import java.util.List;
+import java.util.Set;
 
 public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageClient> {
     private static final Instant UPPER_BOUND = Instant.ofEpochSecond(0, Long.MAX_VALUE);
@@ -38,41 +40,45 @@ public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageCli
         super(client);
     }
 
-    protected StreamQueryResponse query(StreamMetadata metadata, List<String> searchableTags, QueryBuilder builder) {
+    protected StreamQueryResponse query(MetadataRegistry.PartialMetadata metadata, Set<String> searchableTags, QueryBuilder builder) throws IOException {
         return this.query(metadata, searchableTags, null, builder);
     }
 
-    protected StreamQueryResponse query(StreamMetadata metadata, List<String> searchableTags, TimestampRange timestampRange,
-                                        QueryBuilder builder) {
+    protected StreamQueryResponse query(MetadataRegistry.PartialMetadata metadata, Set<String> searchableTags, TimestampRange timestampRange,
+                                        QueryBuilder builder) throws IOException {
         final StreamQuery query;
         if (timestampRange == null) {
-            query = new StreamQuery(metadata.getGroup(), metadata.getModel().getName(), LARGEST_TIME_RANGE, searchableTags);
+            query = new StreamQuery(metadata.getGroup(), metadata.getName(), LARGEST_TIME_RANGE, searchableTags);
         } else {
-            query = new StreamQuery(metadata.getGroup(), metadata.getModel().getName(), timestampRange, searchableTags);
+            query = new StreamQuery(metadata.getGroup(), metadata.getName(), timestampRange, searchableTags);
         }
 
         builder.apply(query);
 
-        return getClient().query(query);
+        try {
+            return getClient().query(query);
+        } catch (BanyanDBException ex) {
+            throw new IOException(ex);
+        }
     }
 
     protected abstract static class QueryBuilder {
         abstract void apply(final StreamQuery query);
 
         protected PairQueryCondition<Long> eq(String name, long value) {
-            return PairQueryCondition.LongQueryCondition.eq(StreamMetadata.TAG_FAMILY_SEARCHABLE, name, value);
+            return PairQueryCondition.LongQueryCondition.eq(name, value);
         }
 
         protected PairQueryCondition<Long> lte(String name, long value) {
-            return PairQueryCondition.LongQueryCondition.le(StreamMetadata.TAG_FAMILY_SEARCHABLE, name, value);
+            return PairQueryCondition.LongQueryCondition.le(name, value);
         }
 
         protected PairQueryCondition<Long> gte(String name, long value) {
-            return PairQueryCondition.LongQueryCondition.ge(StreamMetadata.TAG_FAMILY_SEARCHABLE, name, value);
+            return PairQueryCondition.LongQueryCondition.ge(name, value);
         }
 
         protected PairQueryCondition<String> eq(String name, String value) {
-            return PairQueryCondition.StringQueryCondition.eq(StreamMetadata.TAG_FAMILY_SEARCHABLE, name, value);
+            return PairQueryCondition.StringQueryCondition.eq(name, value);
         }
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
index 0b7693d16c..b2a43abff7 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
@@ -18,7 +18,7 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
 import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
@@ -34,7 +34,6 @@ import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
 
 import java.io.IOException;
 import java.util.List;
@@ -45,8 +44,8 @@ import java.util.Objects;
  * which can be used to build a {@link org.apache.skywalking.oap.server.core.query.type.AlarmMessage}
  */
 public class BanyanDBAlarmQueryDAO extends AbstractBanyanDBDAO implements IAlarmQueryDAO {
-    private final StreamMetadata alarmRecordMetadata =
-            MetadataRegistry.INSTANCE.findStreamMetadata(AlarmRecord.INDEX_NAME);
+    private final MetadataRegistry.PartialMetadata alarmRecordMetadata =
+            MetadataRegistry.INSTANCE.findSchema(AlarmRecord.INDEX_NAME);
 
     public BanyanDBAlarmQueryDAO(BanyanDBStorageClient client) {
         super(client);
@@ -60,13 +59,11 @@ public class BanyanDBAlarmQueryDAO extends AbstractBanyanDBDAO implements IAlarm
         }
 
         StreamQueryResponse resp = query(alarmRecordMetadata,
-                ImmutableList.of(AlarmRecord.SCOPE, AlarmRecord.START_TIME),
+                ImmutableSet.of(AlarmRecord.SCOPE, AlarmRecord.START_TIME, AlarmRecord.ID0, AlarmRecord.ID1, AlarmRecord.ALARM_MESSAGE, AlarmRecord.TAGS_RAW_DATA),
                 tsRange,
                 new QueryBuilder() {
                     @Override
                     public void apply(StreamQuery query) {
-                        query.setDataProjections(ImmutableList.of(AlarmRecord.ID0, AlarmRecord.ID1, AlarmRecord.ALARM_MESSAGE, AlarmRecord.TAGS_RAW_DATA));
-
                         if (Objects.nonNull(scopeId)) {
                             query.appendCondition(eq(AlarmRecord.SCOPE, (long) scopeId));
                         }
@@ -90,7 +87,7 @@ public class BanyanDBAlarmQueryDAO extends AbstractBanyanDBDAO implements IAlarm
         for (final RowEntity rowEntity : resp.getElements()) {
             AlarmRecord.Builder builder = new AlarmRecord.Builder();
             AlarmRecord alarmRecord = builder.storage2Entity(
-                    new BanyanDBConverter.StreamToEntity(this.alarmRecordMetadata, rowEntity)
+                    new BanyanDBConverter.StreamToEntity(rowEntity)
             );
 
             AlarmMessage message = new AlarmMessage();
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBatchDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBatchDAO.java
index 040b031828..e17b8b66ac 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBatchDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBatchDAO.java
@@ -31,7 +31,6 @@ import java.util.concurrent.CompletableFuture;
 
 public class BanyanDBBatchDAO extends AbstractDAO<BanyanDBStorageClient> implements IBatchDAO {
     private StreamBulkWriteProcessor bulkProcessor;
-
     private final int maxBulkSize;
     private final int flushInterval;
     private final int concurrency;
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
index d384210cd9..2c412cf7dc 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
@@ -18,7 +18,7 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
@@ -34,18 +34,16 @@ import org.apache.skywalking.oap.server.core.storage.query.IBrowserLogQueryDAO;
 import org.apache.skywalking.oap.server.library.util.StringUtil;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.Objects;
 
 /**
  * {@link org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord} is a stream
  */
 public class BanyanDBBrowserLogQueryDAO extends AbstractBanyanDBDAO implements IBrowserLogQueryDAO {
-    private final StreamMetadata browserErrorLogRecordMetadata =
-            MetadataRegistry.INSTANCE.findStreamMetadata(BrowserErrorLogRecord.INDEX_NAME);
+    private final MetadataRegistry.PartialMetadata browserErrorLogRecordMetadata =
+            MetadataRegistry.INSTANCE.findSchema(BrowserErrorLogRecord.INDEX_NAME);
 
     public BanyanDBBrowserLogQueryDAO(BanyanDBStorageClient client) {
         super(client);
@@ -58,13 +56,12 @@ public class BanyanDBBrowserLogQueryDAO extends AbstractBanyanDBDAO implements I
             tsRange = new TimestampRange(TimeBucket.getTimestamp(startSecondTB), TimeBucket.getTimestamp(endSecondTB));
         }
 
-        StreamQueryResponse resp = query(browserErrorLogRecordMetadata, ImmutableList.of(BrowserErrorLogRecord.SERVICE_ID,
+        StreamQueryResponse resp = query(browserErrorLogRecordMetadata, ImmutableSet.of(BrowserErrorLogRecord.SERVICE_ID,
                 BrowserErrorLogRecord.SERVICE_VERSION_ID,
                 BrowserErrorLogRecord.PAGE_PATH_ID,
-                BrowserErrorLogRecord.ERROR_CATEGORY), tsRange, new QueryBuilder() {
+                BrowserErrorLogRecord.ERROR_CATEGORY, BrowserErrorLogRecord.DATA_BINARY), tsRange, new QueryBuilder() {
             @Override
             public void apply(StreamQuery query) {
-                query.setDataProjections(Collections.singletonList(BrowserErrorLogRecord.DATA_BINARY));
                 query.appendCondition(eq(BrowserErrorLogRecord.SERVICE_ID, serviceId));
 
                 if (StringUtil.isNotEmpty(serviceVersionId)) {
@@ -88,8 +85,7 @@ public class BanyanDBBrowserLogQueryDAO extends AbstractBanyanDBDAO implements I
         logs.setTotal(resp.size());
 
         for (final RowEntity rowEntity : resp.getElements()) {
-            final byte[] dataBinary =
-                    rowEntity.getValue(StreamMetadata.TAG_FAMILY_DATA, BrowserErrorLogRecord.DATA_BINARY);
+            final byte[] dataBinary = rowEntity.getTagValue(BrowserErrorLogRecord.DATA_BINARY);
             if (dataBinary != null && dataBinary.length > 0) {
                 BrowserErrorLog log = parserDataBinary(dataBinary);
                 logs.getLogs().add(log);
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEBPFProfilingDataDAO.java
similarity index 67%
copy from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
copy to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEBPFProfilingDataDAO.java
index 0c6e1ebfd1..4036fa771c 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEBPFProfilingDataDAO.java
@@ -16,23 +16,18 @@
  *
  */
 
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
-import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
-import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
+import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingDataRecord;
+import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingDataDAO;
 
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
-public class BanyanDBProfileTaskQueryDAO implements IProfileTaskQueryDAO {
+public class BanyanDBEBPFProfilingDataDAO implements IEBPFProfilingDataDAO {
     @Override
-    public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws IOException {
+    public List<EBPFProfilingDataRecord> queryData(List<String> scheduleIdList, long beginTime, long endTime) throws IOException {
         return Collections.emptyList();
     }
-
-    @Override
-    public ProfileTask getById(String id) throws IOException {
-        return null;
-    }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEBPFProfilingScheduleQueryDAO.java
similarity index 67%
copy from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
copy to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEBPFProfilingScheduleQueryDAO.java
index 0c6e1ebfd1..e6e4d529e4 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEBPFProfilingScheduleQueryDAO.java
@@ -16,23 +16,18 @@
  *
  */
 
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
-import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
-import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
+import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingSchedule;
+import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingScheduleDAO;
 
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
-public class BanyanDBProfileTaskQueryDAO implements IProfileTaskQueryDAO {
+public class BanyanDBEBPFProfilingScheduleQueryDAO implements IEBPFProfilingScheduleDAO {
     @Override
-    public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws IOException {
+    public List<EBPFProfilingSchedule> querySchedules(String taskId, long startTimeBucket, long endTimeBucket) throws IOException {
         return Collections.emptyList();
     }
-
-    @Override
-    public ProfileTask getById(String id) throws IOException {
-        return null;
-    }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEBPFProfilingTaskDAO.java
similarity index 66%
copy from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
copy to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEBPFProfilingTaskDAO.java
index 0c6e1ebfd1..b0a59f3251 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEBPFProfilingTaskDAO.java
@@ -16,23 +16,19 @@
  *
  */
 
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
-import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
-import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
+import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTargetType;
+import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingTask;
+import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingTaskDAO;
 
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
-public class BanyanDBProfileTaskQueryDAO implements IProfileTaskQueryDAO {
+public class BanyanDBEBPFProfilingTaskDAO implements IEBPFProfilingTaskDAO {
     @Override
-    public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws IOException {
+    public List<EBPFProfilingTask> queryTasks(List<String> serviceIdList, EBPFProfilingTargetType targetType, long taskStartTime, long latestUpdateTime) throws IOException {
         return Collections.emptyList();
     }
-
-    @Override
-    public ProfileTask getById(String id) throws IOException {
-        return null;
-    }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
index 5cef08a726..c6097539c9 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
@@ -18,7 +18,7 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.skywalking.apm.network.logging.v3.LogTags;
 import org.apache.skywalking.banyandb.v1.client.RowEntity;
@@ -41,7 +41,6 @@ import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import org.apache.skywalking.oap.server.library.util.StringUtil;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
 
 import java.io.IOException;
 import java.util.List;
@@ -51,8 +50,8 @@ import java.util.Objects;
  * {@link org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord} is a stream
  */
 public class BanyanDBLogQueryDAO extends AbstractBanyanDBDAO implements ILogQueryDAO {
-    private final StreamMetadata logRecordMetadata =
-            MetadataRegistry.INSTANCE.findStreamMetadata(LogRecord.INDEX_NAME);
+    private final MetadataRegistry.PartialMetadata logRecordMetadata =
+            MetadataRegistry.INSTANCE.findSchema(LogRecord.INDEX_NAME);
 
     public BanyanDBLogQueryDAO(BanyanDBStorageClient client) {
         super(client);
@@ -66,8 +65,6 @@ public class BanyanDBLogQueryDAO extends AbstractBanyanDBDAO implements ILogQuer
         final QueryBuilder query = new QueryBuilder() {
             @Override
             public void apply(StreamQuery query) {
-                query.setDataProjections(ImmutableList.of(AbstractLogRecord.CONTENT_TYPE, AbstractLogRecord.CONTENT, AbstractLogRecord.TAGS_RAW_DATA));
-
                 if (StringUtil.isNotEmpty(serviceId)) {
                     query.appendCondition(eq(AbstractLogRecord.SERVICE_ID, serviceId));
                 }
@@ -105,32 +102,31 @@ public class BanyanDBLogQueryDAO extends AbstractBanyanDBDAO implements ILogQuer
         }
 
         StreamQueryResponse resp = query(logRecordMetadata,
-                ImmutableList.of(AbstractLogRecord.SERVICE_ID, AbstractLogRecord.SERVICE_INSTANCE_ID,
+                ImmutableSet.of(AbstractLogRecord.SERVICE_ID, AbstractLogRecord.SERVICE_INSTANCE_ID,
                         AbstractLogRecord.ENDPOINT_ID, AbstractLogRecord.TRACE_ID, AbstractLogRecord.TRACE_SEGMENT_ID,
-                        AbstractLogRecord.SPAN_ID), tsRange, query);
+                        AbstractLogRecord.SPAN_ID, AbstractLogRecord.CONTENT_TYPE, AbstractLogRecord.CONTENT,
+                        AbstractLogRecord.TAGS_RAW_DATA), tsRange, query);
 
         Logs logs = new Logs();
         logs.setTotal(resp.size());
 
         for (final RowEntity rowEntity : resp.getElements()) {
             Log log = new Log();
-            log.setServiceId(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, AbstractLogRecord.SERVICE_ID));
+            log.setServiceId(rowEntity.getTagValue(AbstractLogRecord.SERVICE_ID));
             log.setServiceInstanceId(
-                    rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, AbstractLogRecord.SERVICE_INSTANCE_ID));
+                    rowEntity.getTagValue(AbstractLogRecord.SERVICE_INSTANCE_ID));
             log.setEndpointId(
-                    rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, AbstractLogRecord.ENDPOINT_ID));
+                    rowEntity.getTagValue(AbstractLogRecord.ENDPOINT_ID));
             if (log.getEndpointId() != null) {
                 log.setEndpointName(
                         IDManager.EndpointID.analysisId(log.getEndpointId()).getEndpointName());
             }
-            log.setTraceId(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, AbstractLogRecord.TRACE_ID));
-            log.setTimestamp(((Number) rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE,
-                    AbstractLogRecord.TIMESTAMP)).longValue());
+            log.setTraceId(rowEntity.getTagValue(AbstractLogRecord.TRACE_ID));
+            log.setTimestamp(((Number) rowEntity.getTagValue(AbstractLogRecord.TIMESTAMP)).longValue());
             log.setContentType(ContentType.instanceOf(
-                    ((Number) rowEntity.getValue(StreamMetadata.TAG_FAMILY_DATA,
-                            AbstractLogRecord.CONTENT_TYPE)).intValue()));
-            log.setContent(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, AbstractLogRecord.CONTENT));
-            byte[] dataBinary = rowEntity.getValue(StreamMetadata.TAG_FAMILY_DATA, AbstractLogRecord.TAGS_RAW_DATA);
+                    ((Number) rowEntity.getTagValue(AbstractLogRecord.CONTENT_TYPE)).intValue()));
+            log.setContent(rowEntity.getTagValue(AbstractLogRecord.CONTENT));
+            byte[] dataBinary = rowEntity.getTagValue(AbstractLogRecord.TAGS_RAW_DATA);
             if (dataBinary != null && dataBinary.length > 0) {
                 parserDataBinary(dataBinary, log.getTags());
             }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
index 4411957f69..6749118a25 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
@@ -18,28 +18,27 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
 import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
+import org.apache.skywalking.oap.server.core.profiling.trace.ProfileTaskLogRecord;
 import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLog;
 import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLogOperationType;
-import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskLogQueryDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
 
 import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 
 /**
- * {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord} is a stream
+ * {@link ProfileTaskLogRecord} is a stream
  */
 public class BanyanDBProfileTaskLogQueryDAO extends AbstractBanyanDBDAO implements IProfileTaskLogQueryDAO {
-    private final StreamMetadata profileTaskLogRecord =
-            MetadataRegistry.INSTANCE.findStreamMetadata(ProfileTaskLogRecord.INDEX_NAME);
+    private final MetadataRegistry.PartialMetadata profileTaskLogRecord =
+            MetadataRegistry.INSTANCE.findSchema(ProfileTaskLogRecord.INDEX_NAME);
 
     private final int queryMaxSize;
 
@@ -51,12 +50,11 @@ public class BanyanDBProfileTaskLogQueryDAO extends AbstractBanyanDBDAO implemen
     @Override
     public List<ProfileTaskLog> getTaskLogList() throws IOException {
         StreamQueryResponse resp = query(profileTaskLogRecord,
-                ImmutableList.of(ProfileTaskLogRecord.OPERATION_TIME, ProfileTaskLogRecord.INSTANCE_ID),
+                ImmutableSet.of(ProfileTaskLogRecord.OPERATION_TIME, ProfileTaskLogRecord.INSTANCE_ID,
+                        ProfileTaskLogRecord.TASK_ID, ProfileTaskLogRecord.OPERATION_TYPE),
                 new QueryBuilder() {
                     @Override
                     public void apply(StreamQuery query) {
-                        query.setDataProjections(ImmutableList.of(ProfileTaskLogRecord.TASK_ID,
-                                ProfileTaskLogRecord.OPERATION_TYPE));
                         query.setLimit(BanyanDBProfileTaskLogQueryDAO.this.queryMaxSize);
                     }
                 });
@@ -72,15 +70,13 @@ public class BanyanDBProfileTaskLogQueryDAO extends AbstractBanyanDBDAO implemen
     private ProfileTaskLog parseTaskLog(RowEntity data) {
         return ProfileTaskLog.builder()
                 .id(data.getId())
-                .taskId(data.getValue(StreamMetadata.TAG_FAMILY_DATA, ProfileTaskLogRecord.TASK_ID))
+                .taskId(data.getTagValue(ProfileTaskLogRecord.TASK_ID))
                 .instanceId(
-                        data.getValue(StreamMetadata.TAG_FAMILY_DATA, ProfileTaskLogRecord.INSTANCE_ID))
+                        data.getTagValue(ProfileTaskLogRecord.INSTANCE_ID))
                 .operationType(ProfileTaskLogOperationType.parse(
-                        ((Number) data.getValue(StreamMetadata.TAG_FAMILY_DATA,
-                                ProfileTaskLogRecord.OPERATION_TYPE)).intValue()))
+                        ((Number) data.getTagValue(ProfileTaskLogRecord.OPERATION_TYPE)).intValue()))
                 .operationTime(
-                        ((Number) data.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE,
-                                ProfileTaskLogRecord.OPERATION_TIME)).longValue())
+                        ((Number) data.getTagValue(ProfileTaskLogRecord.OPERATION_TIME)).longValue())
                 .build();
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
index 4c6ac7ddcd..a0cacd0c39 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
@@ -18,20 +18,19 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
 import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
 import org.apache.skywalking.oap.server.core.analysis.IDManager;
 import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
-import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
+import org.apache.skywalking.oap.server.core.profiling.trace.ProfileThreadSnapshotRecord;
 import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
-import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileThreadSnapshotQueryDAO;
 import org.apache.skywalking.oap.server.library.util.BooleanUtils;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -49,11 +48,11 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
     protected final ProfileThreadSnapshotRecord.Builder builder =
             new ProfileThreadSnapshotRecord.Builder();
 
-    private final StreamMetadata profileThreadSnapshotMetadata =
-            MetadataRegistry.INSTANCE.findStreamMetadata(ProfileThreadSnapshotRecord.INDEX_NAME);
+    private final MetadataRegistry.PartialMetadata profileThreadSnapshotMetadata =
+            MetadataRegistry.INSTANCE.findSchema(ProfileThreadSnapshotRecord.INDEX_NAME);
 
-    private final StreamMetadata segmentRecordMetadata =
-            MetadataRegistry.INSTANCE.findStreamMetadata(SegmentRecord.INDEX_NAME);
+    private final MetadataRegistry.PartialMetadata segmentRecordMetadata =
+            MetadataRegistry.INSTANCE.findSchema(SegmentRecord.INDEX_NAME);
 
     public BanyanDBProfileThreadSnapshotQueryDAO(BanyanDBStorageClient client) {
         super(client);
@@ -62,7 +61,7 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
     @Override
     public List<BasicTrace> queryProfiledSegments(String taskId) throws IOException {
         StreamQueryResponse resp = query(profileThreadSnapshotMetadata,
-                ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
+                ImmutableSet.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
                         ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE),
                 new QueryBuilder() {
                     @Override
@@ -78,14 +77,14 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
 
         final List<String> segmentIds = new LinkedList<>();
         for (final RowEntity rowEntity : resp.getElements()) {
-            segmentIds.add(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, ProfileThreadSnapshotRecord.SEGMENT_ID));
+            segmentIds.add(rowEntity.getTagValue(ProfileThreadSnapshotRecord.SEGMENT_ID));
         }
 
         // TODO: support `IN` or `OR` logic operation in BanyanDB
         List<BasicTrace> basicTraces = new ArrayList<>();
         for (String segmentID : segmentIds) {
             final StreamQueryResponse segmentRecordResp = query(segmentRecordMetadata,
-                    ImmutableList.of(SegmentRecord.TRACE_ID, SegmentRecord.IS_ERROR, SegmentRecord.ENDPOINT_ID, SegmentRecord.LATENCY, SegmentRecord.START_TIME),
+                    ImmutableSet.of(SegmentRecord.TRACE_ID, SegmentRecord.IS_ERROR, SegmentRecord.ENDPOINT_ID, SegmentRecord.LATENCY, SegmentRecord.START_TIME),
                     new QueryBuilder() {
                         @Override
                         public void apply(StreamQuery traceQuery) {
@@ -97,15 +96,15 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
                 BasicTrace basicTrace = new BasicTrace();
 
                 basicTrace.setSegmentId(row.getId());
-                basicTrace.setStart(String.valueOf(row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.START_TIME)));
+                basicTrace.setStart(String.valueOf(row.getTagValue(SegmentRecord.START_TIME)));
                 basicTrace.getEndpointNames().add(IDManager.EndpointID.analysisId(
-                        row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.ENDPOINT_ID)
+                        row.getTagValue(SegmentRecord.ENDPOINT_ID)
                 ).getEndpointName());
-                basicTrace.setDuration(((Number) row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.LATENCY)).intValue());
+                basicTrace.setDuration(((Number) row.getTagValue(SegmentRecord.LATENCY)).intValue());
                 basicTrace.setError(BooleanUtils.valueToBoolean(
-                        ((Number) row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.IS_ERROR)).intValue()
+                        ((Number) row.getTagValue(SegmentRecord.IS_ERROR)).intValue()
                 ));
-                basicTrace.getTraceIds().add(row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.TRACE_ID));
+                basicTrace.getTraceIds().add(row.getTagValue(SegmentRecord.TRACE_ID));
 
                 basicTraces.add(basicTrace);
             }
@@ -134,13 +133,12 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
     @Override
     public List<ProfileThreadSnapshotRecord> queryRecords(String segmentId, int minSequence, int maxSequence) throws IOException {
         StreamQueryResponse resp = query(profileThreadSnapshotMetadata,
-                ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
-                        ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE),
+                ImmutableSet.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
+                        ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE,
+                        ProfileThreadSnapshotRecord.STACK_BINARY),
                 new QueryBuilder() {
                     @Override
                     public void apply(StreamQuery query) {
-                        query.setDataProjections(Collections.singletonList(ProfileThreadSnapshotRecord.STACK_BINARY));
-
                         query.appendCondition(eq(ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
                                 .appendCondition(lte(ProfileThreadSnapshotRecord.SEQUENCE, maxSequence))
                                 .appendCondition(gte(ProfileThreadSnapshotRecord.SEQUENCE, minSequence));
@@ -150,7 +148,7 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
         List<ProfileThreadSnapshotRecord> result = new ArrayList<>(maxSequence - minSequence);
         for (final RowEntity rowEntity : resp.getElements()) {
             ProfileThreadSnapshotRecord record = this.builder.storage2Entity(
-                    new BanyanDBConverter.StreamToEntity(profileThreadSnapshotMetadata, rowEntity));
+                    new BanyanDBConverter.StreamToEntity(rowEntity));
 
             result.add(record);
         }
@@ -160,11 +158,12 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
     @Override
     public SegmentRecord getProfiledSegment(String segmentId) throws IOException {
         StreamQueryResponse resp = query(segmentRecordMetadata,
-                ImmutableList.of(SegmentRecord.TRACE_ID, SegmentRecord.IS_ERROR, SegmentRecord.SERVICE_ID, SegmentRecord.SERVICE_INSTANCE_ID, SegmentRecord.ENDPOINT_ID, SegmentRecord.LATENCY, SegmentRecord.START_TIME),
+                ImmutableSet.of(SegmentRecord.TRACE_ID, SegmentRecord.IS_ERROR, SegmentRecord.SERVICE_ID,
+                        SegmentRecord.SERVICE_INSTANCE_ID, SegmentRecord.ENDPOINT_ID, SegmentRecord.LATENCY,
+                        SegmentRecord.START_TIME, SegmentRecord.DATA_BINARY),
                 new QueryBuilder() {
                     @Override
                     public void apply(StreamQuery query) {
-                        query.setDataProjections(Collections.singletonList(SegmentRecord.DATA_BINARY));
                         query.appendCondition(eq(SegmentRecord.INDEX_NAME, segmentId));
                     }
                 });
@@ -175,31 +174,30 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
 
         final RowEntity rowEntity = resp.getElements().iterator().next();
         final SegmentRecord segmentRecord = new SegmentRecord();
-        segmentRecord.setSegmentId(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.SEGMENT_ID));
-        segmentRecord.setTraceId(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.TRACE_ID));
-        segmentRecord.setServiceId(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.SERVICE_ID));
+        segmentRecord.setSegmentId(rowEntity.getTagValue(SegmentRecord.SEGMENT_ID));
+        segmentRecord.setTraceId(rowEntity.getTagValue(SegmentRecord.TRACE_ID));
+        segmentRecord.setServiceId(rowEntity.getTagValue(SegmentRecord.SERVICE_ID));
         segmentRecord.setStartTime(
-                ((Number) rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.START_TIME)).longValue());
+                ((Number) rowEntity.getTagValue(SegmentRecord.START_TIME)).longValue());
         segmentRecord.setLatency(
-                ((Number) rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.LATENCY)).intValue());
+                ((Number) rowEntity.getTagValue(SegmentRecord.LATENCY)).intValue());
         segmentRecord.setIsError(
-                ((Number) rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.IS_ERROR)).intValue());
-        byte[] dataBinary = rowEntity.getValue(StreamMetadata.TAG_FAMILY_DATA, SegmentRecord.DATA_BINARY);
+                ((Number) rowEntity.getTagValue(SegmentRecord.IS_ERROR)).intValue());
+        byte[] dataBinary = rowEntity.getTagValue(SegmentRecord.DATA_BINARY);
         if (dataBinary != null && dataBinary.length > 0) {
             segmentRecord.setDataBinary(dataBinary);
         }
         return segmentRecord;
     }
 
-    private int querySequenceWithAgg(AggType aggType, String segmentId, long start, long end) {
+    private int querySequenceWithAgg(AggType aggType, String segmentId, long start, long end) throws IOException {
         StreamQueryResponse resp = query(profileThreadSnapshotMetadata,
-                ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
-                        ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE),
+                ImmutableSet.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
+                        ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE,
+                        ProfileThreadSnapshotRecord.STACK_BINARY),
                 new QueryBuilder() {
                     @Override
                     public void apply(StreamQuery query) {
-                        query.setDataProjections(Collections.singletonList(ProfileThreadSnapshotRecord.STACK_BINARY));
-
                         query.appendCondition(eq(ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
                                 .appendCondition(lte(ProfileThreadSnapshotRecord.DUMP_TIME, end))
                                 .appendCondition(gte(ProfileThreadSnapshotRecord.DUMP_TIME, start));
@@ -209,7 +207,7 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
         List<ProfileThreadSnapshotRecord> records = new ArrayList<>();
         for (final RowEntity rowEntity : resp.getElements()) {
             ProfileThreadSnapshotRecord record = this.builder.storage2Entity(
-                    new BanyanDBConverter.StreamToEntity(profileThreadSnapshotMetadata, rowEntity));
+                    new BanyanDBConverter.StreamToEntity(rowEntity));
 
             records.add(record);
         }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
index a1cf73edcd..3fe0e03ef7 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
@@ -29,7 +29,6 @@ import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
 
 import java.io.IOException;
 
@@ -39,17 +38,15 @@ public class BanyanDBRecordDAO implements IRecordDAO {
 
     @Override
     public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
-        StreamMetadata metadata = MetadataRegistry.INSTANCE.findStreamMetadata(model.getName());
+        MetadataRegistry.PartialMetadata metadata = MetadataRegistry.INSTANCE.findSchema(model.getName());
         if (metadata == null) {
             throw new IOException(model.getName() + " is not registered");
         }
         StreamWrite streamWrite = new StreamWrite(metadata.getGroup(), // group name
                 model.getName(), // index-name
                 record.id(), // identity
-                TimeBucket.getTimestamp(record.getTimeBucket(), model.getDownsampling()), // timestamp
-                metadata.getDataFamilySize(), // length of the "data" tag family
-                metadata.getSearchableFamilySize()); // length of the "searchable" tag family
-        Convert2Storage<StreamWrite> convert2Storage = new BanyanDBConverter.StreamToStorage(metadata, streamWrite);
+                TimeBucket.getTimestamp(record.getTimeBucket(), model.getDownsampling())); // timestamp
+        Convert2Storage<StreamWrite> convert2Storage = new BanyanDBConverter.StreamToStorage(streamWrite);
         storageBuilder.entity2Storage(record, convert2Storage);
 
         return new BanyanDBStreamInsertRequest(convert2Storage.obtain());
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
index fc0d458c29..e149b5ab27 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
@@ -19,7 +19,7 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
 import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
 import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
@@ -39,7 +39,6 @@ import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -47,8 +46,8 @@ import java.util.Collections;
 import java.util.List;
 
 public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITraceQueryDAO {
-    private final StreamMetadata segmentRecordMetadata =
-            MetadataRegistry.INSTANCE.findStreamMetadata(SegmentRecord.INDEX_NAME);
+    private final MetadataRegistry.PartialMetadata segmentRecordMetadata =
+            MetadataRegistry.INSTANCE.findSchema(SegmentRecord.INDEX_NAME);
 
     public BanyanDBTraceQueryDAO(BanyanDBStorageClient client) {
         super(client);
@@ -117,7 +116,7 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace
         }
 
         StreamQueryResponse resp = query(segmentRecordMetadata,
-                ImmutableList.of(SegmentRecord.TRACE_ID, // 0 - trace_id
+                ImmutableSet.of(SegmentRecord.TRACE_ID, // 0 - trace_id
                         SegmentRecord.IS_ERROR, // 1 - is_error
                         SegmentRecord.SERVICE_ID, // 2 - service_id
                         SegmentRecord.SERVICE_INSTANCE_ID, // 3 - service_instance_id
@@ -133,15 +132,15 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace
             BasicTrace basicTrace = new BasicTrace();
 
             basicTrace.setSegmentId(row.getId());
-            basicTrace.setStart(String.valueOf(row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.START_TIME)));
+            basicTrace.setStart(String.valueOf(row.getTagValue(SegmentRecord.START_TIME)));
             basicTrace.getEndpointNames().add(IDManager.EndpointID.analysisId(
-                    row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.ENDPOINT_ID)
+                    row.getTagValue(SegmentRecord.ENDPOINT_ID)
             ).getEndpointName());
-            basicTrace.setDuration(((Number) row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.LATENCY)).intValue());
+            basicTrace.setDuration(((Number) row.getTagValue(SegmentRecord.LATENCY)).intValue());
             basicTrace.setError(BooleanUtils.valueToBoolean(
-                    ((Number) row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.IS_ERROR)).intValue()
+                    ((Number) row.getTagValue(SegmentRecord.IS_ERROR)).intValue()
             ));
-            basicTrace.getTraceIds().add(row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.TRACE_ID));
+            basicTrace.getTraceIds().add(row.getTagValue(SegmentRecord.TRACE_ID));
 
             traceBrief.getTraces().add(basicTrace);
         }
@@ -152,17 +151,17 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace
     @Override
     public List<SegmentRecord> queryByTraceId(String traceId) throws IOException {
         StreamQueryResponse resp = query(segmentRecordMetadata,
-                ImmutableList.of(SegmentRecord.TRACE_ID,
+                ImmutableSet.of(SegmentRecord.TRACE_ID,
                         SegmentRecord.IS_ERROR,
                         SegmentRecord.SERVICE_ID,
                         SegmentRecord.SERVICE_INSTANCE_ID,
                         SegmentRecord.ENDPOINT_ID,
                         SegmentRecord.LATENCY,
-                        SegmentRecord.START_TIME),
+                        SegmentRecord.START_TIME,
+                        SegmentRecord.DATA_BINARY),
                 new QueryBuilder() {
                     @Override
                     public void apply(StreamQuery query) {
-                        query.setDataProjections(Collections.singletonList(SegmentRecord.DATA_BINARY));
                         query.appendCondition(eq(SegmentRecord.TRACE_ID, traceId));
                     }
                 });
@@ -171,7 +170,7 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace
 
         for (final RowEntity rowEntity : resp.getElements()) {
             SegmentRecord segmentRecord = new SegmentRecord.Builder().storage2Entity(
-                    new BanyanDBConverter.StreamToEntity(segmentRecordMetadata, rowEntity));
+                    new BanyanDBConverter.StreamToEntity(rowEntity));
             segmentRecords.add(segmentRecord);
         }