You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/10/12 11:45:58 UTC

[skywalking-banyandb-java-client] 01/01: Bump up proto files

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch v0.2.0
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb-java-client.git

commit c3ff3529f9edbb905ed0fe160dc54a8b44debc0c
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Wed Oct 12 19:45:23 2022 +0800

    Bump up proto files
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 pom.xml                                            |  23 ++++
 .../banyandb/v1/client/AbstractCriteria.java       |  31 +++++
 .../banyandb/v1/client/AbstractQuery.java          |  76 +++++++----
 .../apache/skywalking/banyandb/v1/client/And.java  |  41 ++++++
 .../banyandb/v1/client/BanyanDBClient.java         |  61 +++++----
 .../banyandb/v1/client/MeasureQuery.java           |  22 ++--
 .../apache/skywalking/banyandb/v1/client/Or.java   |  41 ++++++
 .../banyandb/v1/client/PairQueryCondition.java     |  24 +++-
 .../skywalking/banyandb/v1/client/StreamQuery.java |   8 +-
 .../banyandb/v1/client/grpc/MetadataClient.java    |  10 ++
 .../banyandb/v1/client/metadata/Group.java         |  26 ++--
 .../v1/client/metadata/GroupMetadataRegistry.java  |   9 ++
 .../banyandb/v1/client/metadata/IndexRule.java     |  30 +++++
 .../metadata/IndexRuleBindingMetadataRegistry.java |   9 ++
 .../client/metadata/IndexRuleMetadataRegistry.java |   9 ++
 .../banyandb/v1/client/metadata/IntervalRule.java  |  68 ++++++++++
 .../client/metadata/MeasureMetadataRegistry.java   |   9 ++
 .../banyandb/v1/client/metadata/PropertyStore.java |  69 +++++++---
 .../v1/client/metadata/StreamMetadataRegistry.java |   9 ++
 .../banyandb/v1/client/metadata/TagFamilySpec.java |  13 ++
 src/main/proto/banyandb/v1/banyandb-common.proto   |  30 ++++-
 src/main/proto/banyandb/v1/banyandb-database.proto | 128 ++++++++++++++++++
 src/main/proto/banyandb/v1/banyandb-measure.proto  |  51 ++++----
 src/main/proto/banyandb/v1/banyandb-model.proto    |  27 +++-
 src/main/proto/banyandb/v1/banyandb-property.proto |  40 +++---
 src/main/proto/banyandb/v1/banyandb-stream.proto   |  19 +--
 .../v1/client/BanyanDBClientMeasureQueryTest.java  |  18 ++-
 .../v1/client/BanyanDBClientStreamQueryTest.java   | 145 +++++++++++++++++----
 .../v1/client/ITBanyanDBMeasureQueryTests.java     |  27 +---
 .../v1/client/ITBanyanDBPropertyTests.java         |  18 ++-
 .../v1/client/ITBanyanDBStreamQueryTests.java      |   8 +-
 .../v1/client/metadata/PropertyStoreTest.java      |  37 +++---
 32 files changed, 901 insertions(+), 235 deletions(-)

diff --git a/pom.xml b/pom.xml
index 16de166..7bbbdbd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -93,6 +93,7 @@
         <auto-value.version>1.9</auto-value.version>
         <testcontainers.version>1.16.3</testcontainers.version>
         <awaitility.version>4.2.0</awaitility.version>
+        <bufbuild.protoc-gen-validate.version>0.6.13</bufbuild.protoc-gen-validate.version>
         <!-- necessary for Java 9+ -->
         <org.apache.tomcat.annotations-api.version>6.0.53</org.apache.tomcat.annotations-api.version>
         <slf4j.version>1.7.36</slf4j.version>
@@ -151,6 +152,17 @@
             <version>${org.apache.tomcat.annotations-api.version}</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>build.buf.protoc-gen-validate</groupId>
+            <artifactId>protoc-gen-validate</artifactId>
+            <version>${bufbuild.protoc-gen-validate.version}</version>
+            <type>pom</type>
+        </dependency>
+        <dependency>
+            <groupId>build.buf.protoc-gen-validate</groupId>
+            <artifactId>pgv-java-stub</artifactId>
+            <version>${bufbuild.protoc-gen-validate.version}</version>
+        </dependency>
 
         <dependency>
             <groupId>com.google.auto.value</groupId>
@@ -269,6 +281,17 @@
                             <goal>compile-custom</goal>
                         </goals>
                     </execution>
+                    <execution>
+                        <id>protoc-java-pgv</id>
+                        <goals>
+                            <goal>compile-custom</goal>
+                        </goals>
+                        <configuration>
+                            <pluginParameter>lang=java</pluginParameter>
+                            <pluginId>java-pgv</pluginId>
+                            <pluginArtifact>build.buf.protoc-gen-validate:protoc-gen-validate:${bufbuild.protoc-gen-validate.version}:exe:${os.detected.classifier}</pluginArtifact>
+                        </configuration>
+                    </execution>
                 </executions>
             </plugin>
             <plugin>
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractCriteria.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractCriteria.java
new file mode 100644
index 0000000..718c38b
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractCriteria.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
+
+public abstract class AbstractCriteria {
+    abstract BanyandbModel.Criteria build();
+}
+
+abstract class LogicalExpression extends AbstractCriteria {
+    abstract AbstractCriteria left();
+
+    abstract AbstractCriteria right();
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractQuery.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractQuery.java
index 829b77c..1de4476 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractQuery.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractQuery.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.banyandb.v1.client;
 
+import com.google.auto.value.AutoValue;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
 import lombok.AccessLevel;
@@ -30,12 +31,9 @@ import org.apache.skywalking.banyandb.v1.client.grpc.exception.InvalidReferenceE
 import org.apache.skywalking.banyandb.v1.client.metadata.MetadataCache;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 public abstract class AbstractQuery<T> {
     /**
@@ -53,7 +51,7 @@ public abstract class AbstractQuery<T> {
     /**
      * Query conditions.
      */
-    protected final List<PairQueryCondition<?>> conditions;
+    protected final List<LogicalExpression> conditions;
     /**
      * The projections of query result.
      * These should have defined in the schema.
@@ -62,6 +60,10 @@ public abstract class AbstractQuery<T> {
 
     @Getter(AccessLevel.PACKAGE)
     protected final MetadataCache.EntityMetadata metadata;
+    /**
+     * Query criteria.
+     */
+    protected AbstractCriteria criteria;
 
     public AbstractQuery(String group, String name, TimestampRange timestampRange, Set<String> tagProjections) {
         this.group = group;
@@ -73,12 +75,32 @@ public abstract class AbstractQuery<T> {
     }
 
     /**
-     * Fluent API for appending query condition
+     * Fluent API for appending a and
      *
      * @param condition the query condition to be appended
      */
     public AbstractQuery<T> and(PairQueryCondition<?> condition) {
-        this.conditions.add(condition);
+        this.conditions.add(new AutoValue_AbstractQuery_LogicalExpression(BanyandbModel.LogicalExpression.LogicalOp.LOGICAL_OP_AND, condition));
+        return this;
+    }
+
+    /**
+     * Fluent API for appending or
+     *
+     * @param condition the query condition to be appended
+     */
+    public AbstractQuery<T> or(PairQueryCondition<?> condition) {
+        this.conditions.add(new AutoValue_AbstractQuery_LogicalExpression(BanyandbModel.LogicalExpression.LogicalOp.LOGICAL_OP_OR, condition));
+        return this;
+    }
+
+    /**
+     * Fluent API for appending query criteria
+     *
+     * @param criteria the query criteria to be appended
+     */
+    public AbstractQuery<T> criteria(AbstractCriteria criteria) {
+        this.criteria = criteria;
         return this;
     }
 
@@ -95,28 +117,23 @@ public abstract class AbstractQuery<T> {
                 .build();
     }
 
-    protected List<BanyandbModel.Criteria> buildCriteria() throws BanyanDBException {
-        List<BanyandbModel.Criteria> criteriaList = new ArrayList<>();
-        // set conditions grouped by tagFamilyName
-        Map<String, List<PairQueryCondition<?>>> groupedConditions = new HashMap<>();
-        for (final PairQueryCondition<?> condition : conditions) {
-            String tagFamilyName = metadata.findTagInfo(condition.getTagName()).orElseThrow(() ->
-                    InvalidReferenceException.fromInvalidTag(condition.getTagName())
-            ).getTagFamilyName();
-            List<PairQueryCondition<?>> conditionList = groupedConditions.computeIfAbsent(tagFamilyName, key -> new ArrayList<>());
-            conditionList.add(condition);
+    protected Optional<BanyandbModel.Criteria> buildCriteria() {
+        if (criteria != null) {
+            return Optional.of(criteria.build());
         }
-
-        for (final Map.Entry<String, List<PairQueryCondition<?>>> tagFamily : groupedConditions.entrySet()) {
-            final List<BanyandbModel.Condition> conditionList = tagFamily.getValue().stream().map(PairQueryCondition::build)
-                    .collect(Collectors.toList());
-            BanyandbModel.Criteria criteria = BanyandbModel.Criteria
-                    .newBuilder()
-                    .setTagFamilyName(tagFamily.getKey())
-                    .addAllConditions(conditionList).build();
-            criteriaList.add(criteria);
+        if (conditions.isEmpty()) {
+            return Optional.empty();
         }
-        return criteriaList;
+        return Optional.of(conditions.stream()
+                .reduce(null, (criteria, logicalExpression) -> {
+                    BanyandbModel.LogicalExpression.Builder b = BanyandbModel.LogicalExpression.newBuilder();
+                    if (criteria != null) {
+                        b.setRight(criteria);
+                    }
+                    return BanyandbModel.Criteria.newBuilder()
+                            .setLe(b.setOp(logicalExpression.op())
+                                    .setLeft(logicalExpression.cond().build())).build();
+                }, (first, second) -> second));
     }
 
     protected BanyandbModel.TagProjection buildTagProjections() throws BanyanDBException {
@@ -168,4 +185,11 @@ public abstract class AbstractQuery<T> {
     public enum Sort {
         ASC, DESC;
     }
+
+    @AutoValue
+    abstract static class LogicalExpression {
+        abstract BanyandbModel.LogicalExpression.LogicalOp op();
+
+        abstract PairQueryCondition<?> cond();
+    }
 }
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/And.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/And.java
new file mode 100644
index 0000000..cdf45b4
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/And.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import com.google.auto.value.AutoValue;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
+
+@AutoValue
+public abstract class And extends LogicalExpression {
+
+    public static And create(AbstractCriteria left, AbstractCriteria right) {
+        return new AutoValue_And(left, right);
+    }
+
+    @Override
+    BanyandbModel.Criteria build() {
+        return BanyandbModel.Criteria.newBuilder()
+                .setLe(BanyandbModel.LogicalExpression.newBuilder()
+                        .setLeft(left().build())
+                        .setRight(right().build())
+                        .setOp(BanyandbModel.LogicalExpression.LogicalOp.LOGICAL_OP_AND)
+                        .build())
+                .build();
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
index 86c99c8..45407f4 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
@@ -23,16 +23,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import io.grpc.Channel;
 import io.grpc.ManagedChannel;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
-
 import io.grpc.Status;
 import io.grpc.stub.StreamObserver;
 import lombok.AccessLevel;
@@ -60,6 +50,15 @@ import org.apache.skywalking.banyandb.v1.client.metadata.PropertyStore;
 import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
 import org.apache.skywalking.banyandb.v1.client.metadata.StreamMetadataRegistry;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
@@ -318,23 +317,25 @@ public class BanyanDBClient implements Closeable {
     }
 
     /**
-     * Create or update the property
+     * Apply(Create or update) the property with {@link PropertyStore.Strategy#MERGE}
      *
      * @param property the property to be stored in the BanyanBD
      */
-    public void save(Property property) throws BanyanDBException {
+    public PropertyStore.ApplyResult apply(Property property) throws BanyanDBException {
         PropertyStore store = new PropertyStore(checkNotNull(this.channel));
-        try {
-            store.get(property.group(), property.name(), property.id());
-            store.update(property);
-        } catch (BanyanDBException ex) {
-            // multiple entity can share a single index rule
-            if (ex.getStatus().equals(Status.Code.NOT_FOUND)) {
-                store.create(property);
-                return;
-            }
-            throw ex;
-        }
+        return store.apply(property);
+    }
+
+    /**
+     * Apply(Create or update) the property
+     *
+     * @param property the property to be stored in the BanyanBD
+     * @param strategy dedicates how to apply the property
+     */
+    public PropertyStore.ApplyResult apply(Property property, PropertyStore.Strategy strategy) throws
+            BanyanDBException {
+        PropertyStore store = new PropertyStore(checkNotNull(this.channel));
+        return store.apply(property, strategy);
     }
 
     /**
@@ -343,11 +344,12 @@ public class BanyanDBClient implements Closeable {
      * @param group group of the metadata
      * @param name  name of the metadata
      * @param id    identity of the property
+     * @param tags  tags to be returned
      * @return property if it can be found
      */
-    public Property findProperty(String group, String name, String id) throws BanyanDBException {
+    public Property findProperty(String group, String name, String id, String... tags) throws BanyanDBException {
         PropertyStore store = new PropertyStore(checkNotNull(this.channel));
-        return store.get(group, name, id);
+        return store.get(group, name, id, tags);
     }
 
     /**
@@ -368,11 +370,13 @@ public class BanyanDBClient implements Closeable {
      * @param group group of the metadata
      * @param name  name of the metadata
      * @param id    identity of the property
+     * @param tags  tags to be deleted. If null, the property is deleted
      * @return if this property has been deleted
      */
-    public boolean deleteProperty(String group, String name, String id) throws BanyanDBException {
+    public PropertyStore.DeleteResult deleteProperty(String group, String name, String id, String... tags) throws
+            BanyanDBException {
         PropertyStore store = new PropertyStore(checkNotNull(this.channel));
-        return store.delete(group, name, id);
+        return store.delete(group, name, id, tags);
     }
 
     /**
@@ -493,7 +497,8 @@ public class BanyanDBClient implements Closeable {
         return m;
     }
 
-    private List<IndexRule> findIndexRulesByGroupAndBindingName(String group, String bindingName) throws BanyanDBException {
+    private List<IndexRule> findIndexRulesByGroupAndBindingName(String group, String bindingName) throws
+            BanyanDBException {
         IndexRuleBindingMetadataRegistry irbRegistry = new IndexRuleBindingMetadataRegistry(checkNotNull(this.channel));
 
         IndexRuleBinding irb = irbRegistry.get(group, bindingName);
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureQuery.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureQuery.java
index 5c6d9aa..14caca0 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureQuery.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureQuery.java
@@ -59,6 +59,16 @@ public class MeasureQuery extends AbstractQuery<BanyandbMeasure.QueryRequest> {
         this.fieldProjections = fieldProjections;
     }
 
+    @Override
+    public MeasureQuery and(PairQueryCondition<?> condition) {
+        return (MeasureQuery) super.and(condition);
+    }
+
+    @Override
+    public MeasureQuery or(PairQueryCondition<?> condition) {
+        return (MeasureQuery) super.or(condition);
+    }
+
     public MeasureQuery groupBy(Set<String> groupByKeys) {
         Preconditions.checkArgument(this.tagProjections.containsAll(groupByKeys), "groupBy tags should be selected first");
         this.aggregation = new Aggregation(null, Aggregation.Type.UNSPECIFIED, groupByKeys);
@@ -131,16 +141,6 @@ public class MeasureQuery extends AbstractQuery<BanyandbMeasure.QueryRequest> {
         return this;
     }
 
-    /**
-     * Query ID column with given value.
-     *
-     * @param value candidate value of ID
-     */
-    public MeasureQuery andWithID(String value) {
-        this.and(PairQueryCondition.IDQueryCondition.eq(Measure.ID, value));
-        return this;
-    }
-
     /**
      * @return QueryRequest for gRPC level query.
      */
@@ -189,7 +189,7 @@ public class MeasureQuery extends AbstractQuery<BanyandbMeasure.QueryRequest> {
             builder.setOrderBy(orderBy.build());
         }
         // add all criteria
-        builder.addAllCriteria(buildCriteria());
+        buildCriteria().ifPresent(builder::setCriteria);
         return builder.build();
     }
 
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/Or.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/Or.java
new file mode 100644
index 0000000..ccfe3eb
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/Or.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import com.google.auto.value.AutoValue;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
+
+@AutoValue
+public abstract class Or extends LogicalExpression {
+
+    public static Or create(AbstractCriteria left, AbstractCriteria right) {
+        return new AutoValue_Or(left, right);
+    }
+
+    @Override
+    BanyandbModel.Criteria build() {
+        return BanyandbModel.Criteria.newBuilder()
+                .setLe(BanyandbModel.LogicalExpression.newBuilder()
+                        .setLeft(left().build())
+                        .setRight(right().build())
+                        .setOp(BanyandbModel.LogicalExpression.LogicalOp.LOGICAL_OP_OR)
+                        .build())
+                .build();
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java
index 01f9ef5..0c71e2c 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java
@@ -25,7 +25,7 @@ import java.util.List;
 /**
  * PairQuery represents a query condition, including tag name, operator, and value(s);
  */
-public abstract class PairQueryCondition<T> {
+public abstract class PairQueryCondition<T> extends AbstractCriteria {
     protected final BanyandbModel.Condition.BinaryOp op;
     private final TagAndValue<T> tagAndValue;
 
@@ -34,11 +34,14 @@ public abstract class PairQueryCondition<T> {
         this.tagAndValue = tagAndValue;
     }
 
-    BanyandbModel.Condition build() {
-        return BanyandbModel.Condition.newBuilder()
+    @Override
+    BanyandbModel.Criteria build() {
+        return BanyandbModel.Criteria.newBuilder()
+                .setCondition(BanyandbModel.Condition.newBuilder()
                 .setName(this.tagAndValue.getTagName())
                 .setOp(this.op)
-                .setValue(this.tagAndValue.buildTypedTagValue()).build();
+                .setValue(this.tagAndValue.buildTypedTagValue()).build())
+                .build();
     }
 
     public String getTagName() {
@@ -124,6 +127,7 @@ public abstract class PairQueryCondition<T> {
         public static PairQueryCondition<Long> le(String tagName, Long val) {
             return new LongQueryCondition(tagName, BanyandbModel.Condition.BinaryOp.BINARY_OP_LE, val);
         }
+
     }
 
     /**
@@ -157,6 +161,18 @@ public abstract class PairQueryCondition<T> {
         public static PairQueryCondition<String> ne(String tagName, String val) {
             return new StringQueryCondition(tagName, BanyandbModel.Condition.BinaryOp.BINARY_OP_NE, val);
         }
+
+        /**
+         * Build a query condition for {@link String} type
+         * and {@link BanyandbModel.Condition.BinaryOp#BINARY_OP_MATCH} as the relation
+         *
+         * @param tagName name of the tag
+         * @param val     value of the tag
+         * @return a query that `String != value`
+         */
+        public static PairQueryCondition<String> match(String tagName, String val) {
+            return new StringQueryCondition(tagName, BanyandbModel.Condition.BinaryOp.BINARY_OP_MATCH, val);
+        }
     }
 
     public static class IDQueryCondition extends PairQueryCondition<String> {
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamQuery.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamQuery.java
index 7012dc4..b46665b 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamQuery.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamQuery.java
@@ -57,6 +57,11 @@ public class StreamQuery extends AbstractQuery<BanyandbStream.QueryRequest> {
         return (StreamQuery) super.and(condition);
     }
 
+    @Override
+    public StreamQuery or(PairQueryCondition<?> condition) {
+        return (StreamQuery) super.or(condition);
+    }
+
     @Override
     BanyandbStream.QueryRequest build() throws BanyanDBException {
         final BanyandbStream.QueryRequest.Builder builder = BanyandbStream.QueryRequest.newBuilder()
@@ -65,8 +70,7 @@ public class StreamQuery extends AbstractQuery<BanyandbStream.QueryRequest> {
             builder.setTimeRange(timestampRange.build());
         }
         builder.setProjection(buildTagProjections());
-        // set conditions grouped by tagFamilyName
-        builder.addAllCriteria(buildCriteria());
+        buildCriteria().ifPresent(builder::setCriteria);
         builder.setOffset(offset);
         builder.setLimit(limit);
         if (orderBy != null) {
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/grpc/MetadataClient.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/grpc/MetadataClient.java
index c4bbbd9..0f017d7 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/grpc/MetadataClient.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/grpc/MetadataClient.java
@@ -74,6 +74,16 @@ public abstract class MetadataClient<STUB extends AbstractBlockingStub<STUB>, P
      */
     public abstract S get(String group, String name) throws BanyanDBException;
 
+    /**
+     * Check whether a schema exists
+     *
+     * @param group the group of the schema to be found
+     * @param name the name of the schema to be found
+     * @return ture: existed false: absent
+     * @throws BanyanDBException a wrapped exception to the underlying gRPC calls
+     */
+    public abstract boolean exist(String group, String name) throws BanyanDBException;
+
     /**
      * List all schemas with the same group name
      *
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/Group.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/Group.java
index bb349dc..a306ba8 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/Group.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/Group.java
@@ -22,7 +22,6 @@ import com.google.auto.value.AutoValue;
 import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
 import org.apache.skywalking.banyandb.v1.client.util.TimeUtils;
 
-import javax.annotation.Nullable;
 import java.time.ZonedDateTime;
 
 @AutoValue
@@ -37,17 +36,18 @@ public abstract class Group extends NamedSchema<BanyandbCommon.Group> {
      */
     abstract int shardNum();
 
-    @Nullable
-    abstract Integer blockNum();
+    abstract IntervalRule blockInterval();
 
-    abstract Duration ttl();
+    abstract IntervalRule segmentInterval();
 
-    public static Group create(String name, Catalog catalog, int shardNum, int blockNum, Duration ttl) {
-        return new AutoValue_Group(null, name, null, catalog, shardNum, blockNum, ttl);
+    abstract IntervalRule ttl();
+
+    public static Group create(String name, Catalog catalog, int shardNum, IntervalRule blockInterval, IntervalRule segmentInterval, IntervalRule ttl) {
+        return new AutoValue_Group(null, name, null, catalog, shardNum, blockInterval, segmentInterval, ttl);
     }
 
-    public static Group create(String name, Catalog catalog, int shardNum, int blockNum, Duration ttl, ZonedDateTime updatedAt) {
-        return new AutoValue_Group(null, name, updatedAt, catalog, shardNum, blockNum, ttl);
+    public static Group create(String name, Catalog catalog, int shardNum, IntervalRule blockInterval, IntervalRule segmentInterval, IntervalRule ttl, ZonedDateTime updatedAt) {
+        return new AutoValue_Group(null, name, updatedAt, catalog, shardNum, blockInterval, segmentInterval, ttl);
     }
 
     @Override
@@ -58,8 +58,9 @@ public abstract class Group extends NamedSchema<BanyandbCommon.Group> {
                 .setCatalog(catalog().getCatalog())
                 .setResourceOpts(BanyandbCommon.ResourceOpts.newBuilder()
                         .setShardNum(shardNum())
-                        .setBlockNum(blockNum())
-                        .setTtl(ttl().format())
+                        .setBlockInterval(blockInterval().serialize())
+                        .setSegmentInterval(segmentInterval().serialize())
+                        .setTtl(ttl().serialize())
                         .build())
                 .build();
     }
@@ -80,7 +81,8 @@ public abstract class Group extends NamedSchema<BanyandbCommon.Group> {
                 TimeUtils.parseTimestamp(group.getUpdatedAt()),
                 catalog,
                 group.getResourceOpts().getShardNum(),
-                group.getResourceOpts().getBlockNum(),
-                Duration.parse(group.getResourceOpts().getTtl()));
+                IntervalRule.fromProtobuf(group.getResourceOpts().getBlockInterval()),
+                IntervalRule.fromProtobuf(group.getResourceOpts().getSegmentInterval()),
+                IntervalRule.fromProtobuf(group.getResourceOpts().getTtl()));
     }
 }
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/GroupMetadataRegistry.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/GroupMetadataRegistry.java
index d5828a8..90a2d47 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/GroupMetadataRegistry.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/GroupMetadataRegistry.java
@@ -68,6 +68,15 @@ public class GroupMetadataRegistry extends MetadataClient<GroupRegistryServiceGr
         return Group.fromProtobuf(resp.getGroup());
     }
 
+    @Override
+    public boolean exist(String group, String name) throws BanyanDBException {
+        BanyandbDatabase.GroupRegistryServiceExistResponse resp = execute(() ->
+                stub.exist(BanyandbDatabase.GroupRegistryServiceExistRequest.newBuilder()
+                        .setGroup(name)
+                        .build()));
+        return resp.getHasGroup();
+    }
+
     @Override
     public List<Group> list(String group) throws BanyanDBException {
         BanyandbDatabase.GroupRegistryServiceListResponse resp = execute(() ->
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/IndexRule.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/IndexRule.java
index c8bbf3c..35206c7 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/IndexRule.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/IndexRule.java
@@ -25,6 +25,7 @@ import lombok.RequiredArgsConstructor;
 import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
 import org.apache.skywalking.banyandb.v1.client.util.TimeUtils;
 
+import javax.annotation.Nullable;
 import java.time.ZonedDateTime;
 
 @AutoValue
@@ -46,6 +47,12 @@ public abstract class IndexRule extends NamedSchema<BanyandbDatabase.IndexRule>
      */
     abstract IndexLocation indexLocation();
 
+    /**
+     * analyzer indicates how to analyze the value.
+     */
+    @Nullable
+    abstract Analyzer analyzer();
+
     abstract Builder toBuilder();
 
     public final IndexRule withGroup(String group) {
@@ -81,6 +88,8 @@ public abstract class IndexRule extends NamedSchema<BanyandbDatabase.IndexRule>
 
         abstract Builder setIndexLocation(IndexLocation indexLocation);
 
+        abstract Builder setAnalyzer(Analyzer analyzer);
+
         abstract Builder setUpdatedAt(ZonedDateTime updatedAt);
 
         abstract IndexRule build();
@@ -147,4 +156,25 @@ public abstract class IndexRule extends NamedSchema<BanyandbDatabase.IndexRule>
             }
         }
     }
+
+    @RequiredArgsConstructor
+    public enum Analyzer {
+        KEYWORD(BanyandbDatabase.IndexRule.Analyzer.ANALYZER_KEYWORD), STANDARD(BanyandbDatabase.IndexRule.Analyzer.ANALYZER_STANDARD),
+        SIMPLE(BanyandbDatabase.IndexRule.Analyzer.ANALYZER_SIMPLE);
+
+        private final BanyandbDatabase.IndexRule.Analyzer analyzer;
+
+        private static Analyzer fromProtobuf(BanyandbDatabase.IndexRule.Analyzer analyzer) {
+            switch (analyzer) {
+                case ANALYZER_KEYWORD:
+                    return KEYWORD;
+                case ANALYZER_SIMPLE:
+                    return SIMPLE;
+                case ANALYZER_STANDARD:
+                    return STANDARD;
+                default:
+                    throw new IllegalArgumentException("unrecognized analyzer");
+            }
+        }
+    }
 }
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/IndexRuleBindingMetadataRegistry.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/IndexRuleBindingMetadataRegistry.java
index cbb235a..c65c8a1 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/IndexRuleBindingMetadataRegistry.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/IndexRuleBindingMetadataRegistry.java
@@ -68,6 +68,15 @@ public class IndexRuleBindingMetadataRegistry extends MetadataClient<IndexRuleBi
         return IndexRuleBinding.fromProtobuf(resp.getIndexRuleBinding());
     }
 
+    @Override
+    public boolean exist(String group, String name) throws BanyanDBException {
+        BanyandbDatabase.IndexRuleBindingRegistryServiceExistResponse resp = execute(() ->
+                stub.exist(BanyandbDatabase.IndexRuleBindingRegistryServiceExistRequest.newBuilder()
+                        .setMetadata(BanyandbCommon.Metadata.newBuilder().setGroup(group).setName(name).build())
+                        .build()));
+        return resp.getHasIndexRuleBinding();
+    }
+
     @Override
     public List<IndexRuleBinding> list(String group) throws BanyanDBException {
         BanyandbDatabase.IndexRuleBindingRegistryServiceListResponse resp = execute(() ->
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/IndexRuleMetadataRegistry.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/IndexRuleMetadataRegistry.java
index f9ea810..3b966a7 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/IndexRuleMetadataRegistry.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/IndexRuleMetadataRegistry.java
@@ -69,6 +69,15 @@ public class IndexRuleMetadataRegistry extends MetadataClient<IndexRuleRegistryS
         return IndexRule.fromProtobuf(resp.getIndexRule());
     }
 
+    @Override
+    public boolean exist(String group, String name) throws BanyanDBException {
+        BanyandbDatabase.IndexRuleRegistryServiceExistResponse resp = execute(() ->
+                stub.exist(BanyandbDatabase.IndexRuleRegistryServiceExistRequest.newBuilder()
+                        .setMetadata(BanyandbCommon.Metadata.newBuilder().setGroup(group).setName(name).build())
+                        .build()));
+        return resp.getHasIndexRule();
+    }
+
     @Override
     public List<IndexRule> list(String group) throws BanyanDBException {
         BanyandbDatabase.IndexRuleRegistryServiceListResponse resp = execute(() ->
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/IntervalRule.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/IntervalRule.java
new file mode 100644
index 0000000..07cbe94
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/IntervalRule.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client.metadata;
+
+import com.google.auto.value.AutoValue;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
+
+@AutoValue
+public abstract class IntervalRule implements Serializable<BanyandbCommon.IntervalRule> {
+    public enum Unit {
+        UNSPECIFIED, HOUR, DAY
+    }
+
+    abstract Unit unit();
+
+    abstract int num();
+
+    public static IntervalRule create(Unit unit, int num) {
+        return new AutoValue_IntervalRule(unit, num);
+    }
+
+    @Override
+    public BanyandbCommon.IntervalRule serialize() {
+        BanyandbCommon.IntervalRule.Builder builder = BanyandbCommon.IntervalRule.newBuilder();
+        switch (unit()) {
+            case DAY:
+                builder.setUnit(BanyandbCommon.IntervalRule.Unit.UNIT_DAY);
+                break;
+            case HOUR:
+                builder.setUnit(BanyandbCommon.IntervalRule.Unit.UNIT_HOUR);
+                break;
+        }
+        builder.setNum(num());
+        return builder.build();
+    }
+
+    public static IntervalRule fromProtobuf(BanyandbCommon.IntervalRule intervalRule) {
+        Unit unit;
+        switch (intervalRule.getUnit()) {
+            case UNIT_DAY:
+                unit = Unit.DAY;
+                break;
+            case UNIT_HOUR:
+                unit = Unit.HOUR;
+                break;
+            default:
+                unit = Unit.UNSPECIFIED;
+                break;
+        }
+        return new AutoValue_IntervalRule(unit, intervalRule.getNum());
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/MeasureMetadataRegistry.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/MeasureMetadataRegistry.java
index c3f990d..510879d 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/MeasureMetadataRegistry.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/MeasureMetadataRegistry.java
@@ -70,6 +70,15 @@ public class MeasureMetadataRegistry extends MetadataClient<MeasureRegistryServi
         return Measure.fromProtobuf(resp.getMeasure());
     }
 
+    @Override
+    public boolean exist(String group, String name) throws BanyanDBException {
+        BanyandbDatabase.MeasureRegistryServiceExistResponse resp = execute(() ->
+                stub.exist(BanyandbDatabase.MeasureRegistryServiceExistRequest.newBuilder()
+                        .setMetadata(BanyandbCommon.Metadata.newBuilder().setGroup(group).setName(name).build())
+                        .build()));
+        return resp.getHasMeasure();
+    }
+
     @Override
     public List<Measure> list(final String group) throws BanyanDBException {
         BanyandbDatabase.MeasureRegistryServiceListResponse resp = execute(() ->
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/PropertyStore.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/PropertyStore.java
index 4023c24..8758746 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/PropertyStore.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/PropertyStore.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.banyandb.v1.client.metadata;
 
+import com.google.auto.value.AutoValue;
 import io.grpc.Channel;
 import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
 import org.apache.skywalking.banyandb.property.v1.BanyandbProperty;
@@ -25,6 +26,7 @@ import org.apache.skywalking.banyandb.property.v1.PropertyServiceGrpc;
 import org.apache.skywalking.banyandb.v1.client.grpc.HandleExceptionsWith;
 import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -35,24 +37,36 @@ public class PropertyStore {
         this.stub = PropertyServiceGrpc.newBlockingStub(channel);
     }
 
-    public void create(Property payload) throws BanyanDBException {
-        HandleExceptionsWith.callAndTranslateApiException(() ->
-                this.stub.create(BanyandbProperty.CreateRequest.newBuilder()
-                        .setProperty(payload.serialize())
-                        .build()));
+    public ApplyResult apply(Property payload) throws BanyanDBException {
+        return apply(payload, Strategy.MERGE);
     }
 
-    public void update(Property payload) throws BanyanDBException {
-        HandleExceptionsWith.callAndTranslateApiException(() ->
-                this.stub.update(BanyandbProperty.UpdateRequest.newBuilder()
-                        .setProperty(payload.serialize())
-                        .build()));
+    public ApplyResult apply(Property payload, Strategy strategy) throws BanyanDBException {
+        BanyandbProperty.ApplyRequest.Strategy s = BanyandbProperty.ApplyRequest.Strategy.STRATEGY_MERGE;
+        switch (strategy) {
+            case MERGE:
+                s = BanyandbProperty.ApplyRequest.Strategy.STRATEGY_MERGE;
+                break;
+            case REPLACE:
+                s = BanyandbProperty.ApplyRequest.Strategy.STRATEGY_REPLACE;
+                break;
+        }
+        BanyandbProperty.ApplyRequest r = BanyandbProperty.ApplyRequest.newBuilder()
+                .setProperty(payload.serialize())
+                .setStrategy(s)
+                .build();
+        BanyandbProperty.ApplyResponse resp = HandleExceptionsWith.callAndTranslateApiException(() ->
+                this.stub.apply(r));
+        return new AutoValue_PropertyStore_ApplyResult(resp.getCreated(), resp.getTagsNum());
     }
 
-    public boolean delete(String group, String name, String id) throws BanyanDBException {
+    public DeleteResult delete(String group, String name, String id, String... tags) throws BanyanDBException {
+        BanyandbProperty.DeleteRequest.Builder b = BanyandbProperty.DeleteRequest.newBuilder();
+        if (tags != null && tags.length > 0) {
+            b.addAllTags(Arrays.asList(tags));
+        }
         BanyandbProperty.DeleteResponse resp = HandleExceptionsWith.callAndTranslateApiException(() ->
-                this.stub.delete(BanyandbProperty.DeleteRequest.newBuilder()
-                        .setMetadata(BanyandbProperty.Metadata
+                this.stub.delete(b.setMetadata(BanyandbProperty.Metadata
                                 .newBuilder()
                                 .setContainer(BanyandbCommon.Metadata.newBuilder()
                                         .setGroup(group)
@@ -61,13 +75,16 @@ public class PropertyStore {
                                 .setId(id)
                                 .build())
                         .build()));
-        return resp != null && resp.getDeleted();
+        return new AutoValue_PropertyStore_DeleteResult(resp.getDeleted(), resp.getTagsNum());
     }
 
-    public Property get(String group, String name, String id) throws BanyanDBException {
+    public Property get(String group, String name, String id, String... tags) throws BanyanDBException {
+        BanyandbProperty.GetRequest.Builder b = BanyandbProperty.GetRequest.newBuilder();
+        if (tags != null && tags.length > 0) {
+            b.addAllTags(Arrays.asList(tags));
+        }
         BanyandbProperty.GetResponse resp = HandleExceptionsWith.callAndTranslateApiException(() ->
-                this.stub.get(BanyandbProperty.GetRequest.newBuilder()
-                        .setMetadata(BanyandbProperty.Metadata
+                this.stub.get(b.setMetadata(BanyandbProperty.Metadata
                                 .newBuilder()
                                 .setContainer(BanyandbCommon.Metadata.newBuilder()
                                         .setGroup(group)
@@ -91,4 +108,22 @@ public class PropertyStore {
 
         return resp.getPropertyList().stream().map(Property::fromProtobuf).collect(Collectors.toList());
     }
+
+    public enum Strategy {
+        MERGE, REPLACE
+    }
+
+    @AutoValue
+    public abstract static class ApplyResult {
+        public abstract boolean created();
+
+        public abstract int tagsNum();
+    }
+
+    @AutoValue
+    public abstract static class DeleteResult {
+        public abstract boolean deleted();
+
+        public abstract int tagsNum();
+    }
 }
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/StreamMetadataRegistry.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/StreamMetadataRegistry.java
index d7da85a..046ce29 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/StreamMetadataRegistry.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/StreamMetadataRegistry.java
@@ -70,6 +70,15 @@ public class StreamMetadataRegistry extends MetadataClient<StreamRegistryService
         return Stream.fromProtobuf(resp.getStream());
     }
 
+    @Override
+    public boolean exist(String group, String name) throws BanyanDBException {
+        BanyandbDatabase.StreamRegistryServiceExistResponse resp = execute(() ->
+                stub.exist(BanyandbDatabase.StreamRegistryServiceExistRequest.newBuilder()
+                        .setMetadata(BanyandbCommon.Metadata.newBuilder().setGroup(group).setName(name).build())
+                        .build()));
+        return resp.getHasStream();
+    }
+
     @Override
     public List<Stream> list(String group) throws BanyanDBException {
         BanyandbDatabase.StreamRegistryServiceListResponse resp = execute(() ->
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/TagFamilySpec.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/TagFamilySpec.java
index e21386b..95ac109 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/TagFamilySpec.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/TagFamilySpec.java
@@ -126,6 +126,11 @@ public abstract class TagFamilySpec implements Serializable<BanyandbDatabase.Tag
          */
         private final TagType tagType;
 
+        /**
+         * indexedOnly of the tag
+         */
+        private boolean indexedOnly;
+
         private TagSpec(String tagName, TagType tagType) {
             Preconditions.checkArgument(!Strings.isNullOrEmpty(tagName), "tagName must not be null or empty");
             this.tagName = tagName;
@@ -192,11 +197,19 @@ public abstract class TagFamilySpec implements Serializable<BanyandbDatabase.Tag
             return new TagSpec(name, TagType.ID);
         }
 
+        /**
+         * Set the tag to indexed_only
+         */
+        public void indexedOnly() {
+            indexedOnly = true;
+        }
+
         @Override
         public BanyandbDatabase.TagSpec serialize() {
             return BanyandbDatabase.TagSpec.newBuilder()
                     .setName(this.tagName)
                     .setType(this.tagType.getTagType())
+                    .setIndexedOnly(this.indexedOnly)
                     .build();
         }
 
diff --git a/src/main/proto/banyandb/v1/banyandb-common.proto b/src/main/proto/banyandb/v1/banyandb-common.proto
index c5f0e26..5fdbea9 100644
--- a/src/main/proto/banyandb/v1/banyandb-common.proto
+++ b/src/main/proto/banyandb/v1/banyandb-common.proto
@@ -22,6 +22,7 @@ option java_package = "org.apache.skywalking.banyandb.common.v1";
 package banyandb.common.v1;
 
 import "google/protobuf/timestamp.proto";
+import "validate/validate.proto";
 
 enum Catalog {
   CATALOG_UNSPECIFIED = 0;
@@ -34,7 +35,7 @@ message Metadata {
   // group contains a set of options, like retention policy, max
   string group = 1;
   // name of the entity
-  string name = 2;
+  string name = 2 [(validate.rules).string.min_len = 1];
   uint32 id = 3;
   // readonly. create_revision is the revision of last creation on this key.
   int64 create_revision = 4;
@@ -42,23 +43,38 @@ message Metadata {
   int64 mod_revision = 5;
 }
 
+// IntervalRule is a structured duration
+message IntervalRule {
+  enum Unit {
+    UNIT_UNSPECIFIED = 0;
+    UNIT_HOUR = 1;
+    UNIT_DAY = 2;
+  }
+  // unit can only be UNIT_HOUR or UNIT_DAY
+  Unit unit = 1 [(validate.rules).enum.defined_only = true];
+  uint32 num = 2 [(validate.rules).uint32.gt = 0];
+}
+
 message ResourceOpts {
   // shard_num is the number of shards
-  uint32 shard_num = 1;
-  // block_num specific how many blocks in a segment
-  uint32 block_num = 2;
+  uint32 shard_num = 1 [(validate.rules).uint32.gt = 0];
+  // block_interval indicates the length of a block
+  // block_interval should be less than or equal to segment_interval
+  IntervalRule block_interval = 2 [(validate.rules).message.required = true];
+  // segment_interval indicates the length of a segment
+  IntervalRule segment_interval = 3 [(validate.rules).message.required = true];;
   // ttl indicates time to live, how long the data will be cached
-  string ttl = 3;
+  IntervalRule ttl = 4 [(validate.rules).message.required = true];
 }
 
 // Group is an internal object for Group management
 message Group {
   // metadata define the group's identity
-  common.v1.Metadata metadata = 1;
+  common.v1.Metadata metadata = 1 [(validate.rules).message.required = true];
   // catalog denotes which type of data the group contains
   common.v1.Catalog catalog = 2;
   // resourceOpts indicates the structure of the underlying kv storage
-  ResourceOpts resource_opts = 3;
+  ResourceOpts resource_opts = 3 [(validate.rules).message.required = true];
   // updated_at indicates when resources of the group are updated
   google.protobuf.Timestamp updated_at = 4;
 }
diff --git a/src/main/proto/banyandb/v1/banyandb-database.proto b/src/main/proto/banyandb/v1/banyandb-database.proto
index 7115aff..cce7229 100644
--- a/src/main/proto/banyandb/v1/banyandb-database.proto
+++ b/src/main/proto/banyandb/v1/banyandb-database.proto
@@ -44,6 +44,10 @@ message TagFamilySpec {
 message TagSpec {
   string name = 1;
   TagType type = 2;
+  // indexed_only indicates whether the tag is stored
+  // True: It's indexed only, but not stored
+  // False: it's stored and indexed
+  bool indexed_only = 3;
 }
 
 // Stream intends to store streaming data, for example, traces or logs
@@ -102,6 +106,7 @@ message Measure {
   // entity indicates which tags will be to generate a series and shard a measure
   Entity entity = 4;
   // interval indicates how frequently to send a data point
+  // valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h", "d".
   string interval = 5;
   // updated_at indicates when the measure is updated
   google.protobuf.Timestamp updated_at = 6;
@@ -156,6 +161,19 @@ message IndexRule {
   Location location = 4;
   // updated_at indicates when the IndexRule is updated
   google.protobuf.Timestamp updated_at = 5;
+  enum Analyzer {
+    ANALYZER_UNSPECIFIED = 0;
+    // Keyword analyzer is a “noop” analyzer which returns the entire input string as a single token.
+    ANALYZER_KEYWORD = 1;
+    // Standard analyzer provides grammar based tokenization
+    ANALYZER_STANDARD = 2;
+    // Simple analyzer breaks text into tokens at any non-letter character,
+    // such as numbers, spaces, hyphens and apostrophes, discards non-letter characters,
+    // and changes uppercase to lowercase.
+    ANALYZER_SIMPLE = 3;
+  }
+  // analyzer analyzes tag value to support the full-text searching for TYPE_INVERTED indices.
+  Analyzer analyzer = 6;
 }
 
 // Subject defines which stream or measure would generate indices
@@ -215,6 +233,15 @@ message StreamRegistryServiceGetResponse {
   banyandb.database.v1.Stream stream = 1;
 }
 
+message StreamRegistryServiceExistRequest {
+  banyandb.common.v1.Metadata metadata = 1;
+}
+
+message StreamRegistryServiceExistResponse {
+  bool has_group = 1;
+  bool has_stream = 2;
+}
+
 message StreamRegistryServiceListRequest {
   string group = 1;
 }
@@ -229,6 +256,8 @@ service StreamRegistryService {
   rpc Delete(StreamRegistryServiceDeleteRequest) returns (StreamRegistryServiceDeleteResponse);
   rpc Get(StreamRegistryServiceGetRequest) returns (StreamRegistryServiceGetResponse);
   rpc List(StreamRegistryServiceListRequest) returns (StreamRegistryServiceListResponse);
+  // Exist doesn't expose an HTTP endpoint. Please use HEAD method to touch Get instead
+  rpc Exist(StreamRegistryServiceExistRequest) returns (StreamRegistryServiceExistResponse);
 }
 
 message IndexRuleBindingRegistryServiceCreateRequest {
@@ -269,12 +298,23 @@ message IndexRuleBindingRegistryServiceListResponse {
   repeated banyandb.database.v1.IndexRuleBinding index_rule_binding = 1;
 }
 
+message IndexRuleBindingRegistryServiceExistRequest {
+  banyandb.common.v1.Metadata metadata = 1;
+}
+
+message IndexRuleBindingRegistryServiceExistResponse {
+  bool has_group = 1;
+  bool has_index_rule_binding = 2;
+}
+
 service IndexRuleBindingRegistryService {
   rpc Create(IndexRuleBindingRegistryServiceCreateRequest) returns (IndexRuleBindingRegistryServiceCreateResponse);
   rpc Update(IndexRuleBindingRegistryServiceUpdateRequest) returns (IndexRuleBindingRegistryServiceUpdateResponse);
   rpc Delete(IndexRuleBindingRegistryServiceDeleteRequest) returns (IndexRuleBindingRegistryServiceDeleteResponse);
   rpc Get(IndexRuleBindingRegistryServiceGetRequest) returns (IndexRuleBindingRegistryServiceGetResponse);
   rpc List(IndexRuleBindingRegistryServiceListRequest) returns (IndexRuleBindingRegistryServiceListResponse);
+  // Exist doesn't expose an HTTP endpoint. Please use HEAD method to touch Get instead
+  rpc Exist(IndexRuleBindingRegistryServiceExistRequest) returns (IndexRuleBindingRegistryServiceExistResponse);
 }
 
 message IndexRuleRegistryServiceCreateRequest {
@@ -315,12 +355,23 @@ message IndexRuleRegistryServiceListResponse {
   repeated banyandb.database.v1.IndexRule index_rule = 1;
 }
 
+message IndexRuleRegistryServiceExistRequest {
+  banyandb.common.v1.Metadata metadata = 1;
+}
+
+message IndexRuleRegistryServiceExistResponse {
+  bool has_group = 1;
+  bool has_index_rule = 2;
+}
+
 service IndexRuleRegistryService {
   rpc Create(IndexRuleRegistryServiceCreateRequest) returns (IndexRuleRegistryServiceCreateResponse);
   rpc Update(IndexRuleRegistryServiceUpdateRequest) returns (IndexRuleRegistryServiceUpdateResponse);
   rpc Delete(IndexRuleRegistryServiceDeleteRequest) returns (IndexRuleRegistryServiceDeleteResponse);
   rpc Get(IndexRuleRegistryServiceGetRequest) returns (IndexRuleRegistryServiceGetResponse);
   rpc List(IndexRuleRegistryServiceListRequest) returns (IndexRuleRegistryServiceListResponse);
+  // Exist doesn't expose an HTTP endpoint. Please use HEAD method to touch Get instead
+  rpc Exist(IndexRuleRegistryServiceExistRequest) returns (IndexRuleRegistryServiceExistResponse);
 }
 
 message MeasureRegistryServiceCreateRequest {
@@ -361,12 +412,23 @@ message MeasureRegistryServiceListResponse {
   repeated banyandb.database.v1.Measure measure = 1;
 }
 
+message MeasureRegistryServiceExistRequest {
+  banyandb.common.v1.Metadata metadata = 1;
+}
+
+message MeasureRegistryServiceExistResponse {
+  bool has_group = 1;
+  bool has_measure = 2;
+}
+
 service MeasureRegistryService {
   rpc Create(MeasureRegistryServiceCreateRequest) returns (MeasureRegistryServiceCreateResponse);
   rpc Update(MeasureRegistryServiceUpdateRequest) returns (MeasureRegistryServiceUpdateResponse);
   rpc Delete(MeasureRegistryServiceDeleteRequest) returns (MeasureRegistryServiceDeleteResponse);
   rpc Get(MeasureRegistryServiceGetRequest) returns (MeasureRegistryServiceGetResponse);
   rpc List(MeasureRegistryServiceListRequest) returns (MeasureRegistryServiceListResponse);
+  // Exist doesn't expose an HTTP endpoint. Please use HEAD method to touch Get instead
+  rpc Exist(MeasureRegistryServiceExistRequest) returns (MeasureRegistryServiceExistResponse);
 }
 
 message GroupRegistryServiceCreateRequest {
@@ -408,10 +470,76 @@ message GroupRegistryServiceListResponse {
   repeated banyandb.common.v1.Group group = 1;
 }
 
+message GroupRegistryServiceExistRequest {
+  string group = 1;
+}
+
+message GroupRegistryServiceExistResponse {
+  bool has_group = 1;
+}
+
 service GroupRegistryService {
   rpc Create(GroupRegistryServiceCreateRequest) returns (GroupRegistryServiceCreateResponse);
   rpc Update(GroupRegistryServiceUpdateRequest) returns (GroupRegistryServiceUpdateResponse);
   rpc Delete(GroupRegistryServiceDeleteRequest) returns (GroupRegistryServiceDeleteResponse);
   rpc Get(GroupRegistryServiceGetRequest) returns (GroupRegistryServiceGetResponse);
   rpc List(GroupRegistryServiceListRequest) returns (GroupRegistryServiceListResponse);
+  // Exist doesn't expose an HTTP endpoint. Please use HEAD method to touch Get instead
+  rpc Exist(GroupRegistryServiceExistRequest) returns (GroupRegistryServiceExistResponse);
+}
+
+message TopNAggregationRegistryServiceCreateRequest {
+  banyandb.database.v1.TopNAggregation top_n_aggregation = 1;
+}
+
+message TopNAggregationRegistryServiceCreateResponse {
+}
+
+message TopNAggregationRegistryServiceUpdateRequest {
+  banyandb.database.v1.TopNAggregation top_n_aggregation = 1;
+}
+
+message TopNAggregationRegistryServiceUpdateResponse {
+}
+
+message TopNAggregationRegistryServiceDeleteRequest {
+  banyandb.common.v1.Metadata metadata = 1;
+}
+
+message TopNAggregationRegistryServiceDeleteResponse {
+  bool deleted = 1;
+}
+
+message TopNAggregationRegistryServiceGetRequest {
+  banyandb.common.v1.Metadata metadata = 1;
+}
+
+message TopNAggregationRegistryServiceGetResponse {
+  banyandb.database.v1.TopNAggregation top_n_aggregation = 1;
+}
+
+message TopNAggregationRegistryServiceListRequest {
+  string group = 1;
+}
+
+message TopNAggregationRegistryServiceListResponse {
+  repeated banyandb.database.v1.TopNAggregation top_n_aggregation = 1;
+}
+
+message TopNAggregationRegistryServiceExistRequest {
+  banyandb.common.v1.Metadata metadata = 1;
+}
+
+message TopNAggregationRegistryServiceExistResponse {
+  bool has_group = 1;
+  bool has_top_n_aggregation = 2;
+}
+
+service TopNAggregationRegistryService {
+  rpc Create(TopNAggregationRegistryServiceCreateRequest) returns (TopNAggregationRegistryServiceCreateResponse);
+  rpc Update(TopNAggregationRegistryServiceUpdateRequest) returns (TopNAggregationRegistryServiceUpdateResponse);
+  rpc Delete(TopNAggregationRegistryServiceDeleteRequest) returns (TopNAggregationRegistryServiceDeleteResponse);
+  rpc Get(TopNAggregationRegistryServiceGetRequest) returns (TopNAggregationRegistryServiceGetResponse);
+  rpc List(TopNAggregationRegistryServiceListRequest) returns (TopNAggregationRegistryServiceListResponse);
+  rpc Exist(TopNAggregationRegistryServiceExistRequest) returns (TopNAggregationRegistryServiceExistResponse);
 }
diff --git a/src/main/proto/banyandb/v1/banyandb-measure.proto b/src/main/proto/banyandb/v1/banyandb-measure.proto
index 2a8214d..e6dcaed 100644
--- a/src/main/proto/banyandb/v1/banyandb-measure.proto
+++ b/src/main/proto/banyandb/v1/banyandb-measure.proto
@@ -22,12 +22,13 @@ option java_package = "org.apache.skywalking.banyandb.measure.v1";
 package banyandb.measure.v1;
 
 import "google/protobuf/timestamp.proto";
+import "validate/validate.proto";
 import "banyandb/v1/banyandb-common.proto";
 import "banyandb/v1/banyandb-model.proto";
 
 // DataPoint is stored in Measures
 message DataPoint {
-  // timestamp is in the timeunit of nanoseconds.
+  // timestamp is in the timeunit of milliseconds.
   google.protobuf.Timestamp timestamp = 1;
   // tag_families contains tags selected in the projection
   repeated model.v1.TagFamily tag_families = 2;
@@ -48,13 +49,13 @@ message QueryResponse {
 // QueryRequest is the request contract for query.
 message QueryRequest {
   // metadata is required
-  common.v1.Metadata metadata = 1;
-  // time_range is a range query with begin/end time of entities in the timeunit of nanoseconds.
-  model.v1.TimeRange time_range = 2;
+  common.v1.Metadata metadata = 1 [(validate.rules).message.required = true];
+  // time_range is a range query with begin/end time of entities in the timeunit of milliseconds.
+  model.v1.TimeRange time_range = 2 [(validate.rules).message.required = true];
   // tag_families are indexed.
-  repeated model.v1.Criteria criteria = 4;
+  model.v1.Criteria criteria = 4;
   // tag_projection can be used to select tags of the data points in the response
-  model.v1.TagProjection tag_projection = 5;
+  model.v1.TagProjection tag_projection = 5 [(validate.rules).message.required = true];
   message FieldProjection {
     repeated string names = 1;
   }
@@ -86,13 +87,14 @@ message QueryRequest {
     // UNSPECIFIED: topN
     model.v1.Sort field_value_sort = 3;
   }
-  // top limit the result based on a particular field
+  // top limits the result based on a particular field.
+  // If order_by is specificed, top sorts the dataset based on order_by's output
   Top top = 9;
   // offset is used to support pagination, together with the following limit.
-  // If top is specified, offset processes the dataset based on top's output
+  // If top is sepcificed, offset processes the dataset based on top's output
   uint32 offset = 10;
   // limit is used to impose a boundary on the number of records being returned.
-  // If top is specified, limit processes the dataset based on top's output
+  // If top is sepcificed, limit processes the dataset based on top's output
   uint32 limit = 11;
   // order_by is given to specify the sort for a tag.
   model.v1.QueryOrder order_by = 12;
@@ -100,7 +102,7 @@ message QueryRequest {
 
 //TopNList contains a series of topN items
 message TopNList {
-  // timestamp is in the timeunit of nanoseconds.
+  // timestamp is in the timeunit of milliseconds.
   google.protobuf.Timestamp timestamp = 1;
   message Item {
     string name = 1;
@@ -120,12 +122,13 @@ message TopNResponse {
 // TopNRequest is the request contract for query.
 message TopNRequest {
   // metadata is required
-  common.v1.Metadata metadata = 1;
-  // time_range is a range query with begin/end time of entities in the timeunit of nanoseconds.
-  model.v1.TimeRange time_range = 2;
+  common.v1.Metadata metadata = 1 [(validate.rules).message.required = true];
+  // time_range is a range query with begin/end time of entities in the timeunit of milliseconds.
+  model.v1.TimeRange time_range = 2 [(validate.rules).message.required = true];
   // top_n set the how many items should be returned in each list.
-  int32 top_n = 3;
+  int32 top_n = 3 [(validate.rules).int32.gt = 0];
   // agg aggregates lists grouped by field names in the time_range
+  // TODO validate enum defined_only
   model.v1.AggregationFunction agg = 4;
   // criteria select counters.
   repeated model.v1.Condition conditions = 5;
@@ -133,12 +136,12 @@ message TopNRequest {
   model.v1.Sort field_value_sort = 6;
 }
 
-// DataPointValue is the data point for writing. It only contains values.
+//DataPointValue is the data point for writing. It only contains values.
 message DataPointValue {
-  // timestamp is in the timeunit of nanoseconds.
-  google.protobuf.Timestamp timestamp = 1;
+  // timestamp is in the timeunit of milliseconds.
+  google.protobuf.Timestamp timestamp = 1 [(validate.rules).timestamp.required = true];
   // the order of tag_families' items match the measure schema
-  repeated model.v1.TagFamilyForWrite tag_families = 2;
+  repeated model.v1.TagFamilyForWrite tag_families = 2 [(validate.rules).repeated.min_items = 1];
   // the order of fields match the measure schema
   repeated model.v1.FieldValue fields = 3;
 }
@@ -146,22 +149,16 @@ message DataPointValue {
 // WriteRequest is the request contract for write
 message WriteRequest {
   // the metadata is required.
-  common.v1.Metadata metadata = 1;
+  common.v1.Metadata metadata = 1 [(validate.rules).message.required = true];
   // the data_point is required.
-  DataPointValue data_point = 2;
+  DataPointValue data_point = 2 [(validate.rules).message.required = true];
 }
 
 // WriteResponse is the response contract for write
 message WriteResponse {}
 
-message InternalWriteRequest {
-  uint32 shard_id = 1;
-  bytes series_hash = 2;
-  WriteRequest request = 3;
-}
-
 service MeasureService {
   rpc Query(banyandb.measure.v1.QueryRequest) returns (banyandb.measure.v1.QueryResponse);
   rpc Write(stream banyandb.measure.v1.WriteRequest) returns (stream banyandb.measure.v1.WriteResponse);
   rpc TopN(banyandb.measure.v1.TopNRequest) returns (banyandb.measure.v1.TopNResponse);
-}
\ No newline at end of file
+}
diff --git a/src/main/proto/banyandb/v1/banyandb-model.proto b/src/main/proto/banyandb/v1/banyandb-model.proto
index 23b6eae..32d4d07 100644
--- a/src/main/proto/banyandb/v1/banyandb-model.proto
+++ b/src/main/proto/banyandb/v1/banyandb-model.proto
@@ -23,6 +23,7 @@ package banyandb.model.v1;
 
 import "google/protobuf/timestamp.proto";
 import "google/protobuf/struct.proto";
+import "validate/validate.proto";
 
 message ID {
   string value = 1;
@@ -100,6 +101,9 @@ message Condition {
   // For EQ, NE, LT, GT, LE and GE, only one operand should be given, i.e. one-to-one relationship.
   // HAVING and NOT_HAVING allow multi-value to be the operand such as array/vector, i.e. one-to-many relationship.
   // For example, "keyA" contains "valueA" **and** "valueB"
+  // MATCH performances a full-text search if the tag is analyzed.
+  // The string value applies to the same analyzer as the tag, but string array value does not.
+  // Each item in a string array is seen as a token instead of a query expression.
   enum BinaryOp {
     BINARY_OP_UNSPECIFIED = 0;
     BINARY_OP_EQ = 1;
@@ -112,6 +116,7 @@ message Condition {
     BINARY_OP_NOT_HAVING = 8;
     BINARY_OP_IN = 9;
     BINARY_OP_NOT_IN = 10;
+    BINARY_OP_MATCH = 11;
   }
   string name = 1;
   BinaryOp op = 2;
@@ -120,8 +125,23 @@ message Condition {
 
 // tag_families are indexed.
 message Criteria {
-  string tag_family_name = 1;
-  repeated model.v1.Condition conditions = 2;
+  oneof exp {
+    LogicalExpression le = 1;
+    Condition condition = 2;
+  }
+}
+
+// LogicalExpression supports logical operation
+message LogicalExpression {
+  enum LogicalOp {
+    LOGICAL_OP_UNSPECIFIED = 0;
+    LOGICAL_OP_AND = 1;
+    LOGICAL_OP_OR = 2;
+  }
+  // op is a logial operation
+  LogicalOp op = 1;
+  Criteria left = 2;
+  Criteria right = 3;
 }
 
 enum Sort {
@@ -143,7 +163,7 @@ message TagProjection {
     string name = 1;
     repeated string tags = 2;
   }
-  repeated TagFamily tag_families = 1;
+  repeated TagFamily tag_families = 1 [(validate.rules).repeated.min_items = 1];
 }
 
 // TimeRange is a range query for uint64,
@@ -152,4 +172,3 @@ message TimeRange {
   google.protobuf.Timestamp begin = 1;
   google.protobuf.Timestamp end = 2;
 }
-
diff --git a/src/main/proto/banyandb/v1/banyandb-property.proto b/src/main/proto/banyandb/v1/banyandb-property.proto
index fafe19f..d2bb04c 100644
--- a/src/main/proto/banyandb/v1/banyandb-property.proto
+++ b/src/main/proto/banyandb/v1/banyandb-property.proto
@@ -22,6 +22,7 @@ option java_package = "org.apache.skywalking.banyandb.property.v1";
 package banyandb.property.v1;
 
 import "google/protobuf/timestamp.proto";
+import "validate/validate.proto";
 import "banyandb/v1/banyandb-common.proto";
 import "banyandb/v1/banyandb-model.proto";
 
@@ -43,30 +44,37 @@ message Property {
   google.protobuf.Timestamp updated_at = 3;
 }
 
-message CreateRequest {
-  banyandb.property.v1.Property property = 1;
-}
-
-message CreateResponse {
-}
-
-message UpdateRequest {
-  banyandb.property.v1.Property property = 1;
+message ApplyRequest {
+  banyandb.property.v1.Property property = 1 [(validate.rules).message.required = true];
+  enum Strategy {
+    STRATEGY_UNSPECIFIED=0;
+    STRATEGY_MERGE=1;
+    STRATEGY_REPLACE=2;
+  }
+  // strategy indicates how to update a property. It defaults to STRATEGY_MERGE
+  Strategy strategy = 2;
 }
 
-message UpdateResponse {
+message ApplyResponse {
+  // created indicates whether the property existed.
+  // True: the property is absent. False: the property existed.
+  bool created = 1;
+  uint32 tags_num = 2;
 }
 
 message DeleteRequest {
-  banyandb.property.v1.Metadata metadata = 1;
+  banyandb.property.v1.Metadata metadata = 1 [(validate.rules).message.required = true];
+  repeated string tags = 2;
 }
 
 message DeleteResponse {
   bool deleted = 1;
+  uint32 tags_num = 2;
 }
 
 message GetRequest {
-  banyandb.property.v1.Metadata metadata = 1;
+  banyandb.property.v1.Metadata metadata = 1 [(validate.rules).message.required = true];
+  repeated string tags = 2;
 }
 
 message GetResponse {
@@ -74,7 +82,9 @@ message GetResponse {
 }
 
 message ListRequest {
-  banyandb.common.v1.Metadata container = 1;
+  banyandb.common.v1.Metadata container = 1 [(validate.rules).message.required = true];
+  repeated string ids = 2;
+  repeated string tags = 3;
 }
 
 message ListResponse {
@@ -82,8 +92,8 @@ message ListResponse {
 }
 
 service PropertyService {
-  rpc Create(CreateRequest) returns (CreateResponse);
-  rpc Update(UpdateRequest) returns (UpdateResponse);
+  // Apply creates a property if it's absent, or update a existed one based on a strategy.
+  rpc Apply(ApplyRequest) returns (ApplyResponse);
   rpc Delete(DeleteRequest) returns (DeleteResponse);
   rpc Get(GetRequest) returns (GetResponse);
   rpc List(ListRequest) returns (ListResponse);
diff --git a/src/main/proto/banyandb/v1/banyandb-stream.proto b/src/main/proto/banyandb/v1/banyandb-stream.proto
index 6abb63d..84338cf 100644
--- a/src/main/proto/banyandb/v1/banyandb-stream.proto
+++ b/src/main/proto/banyandb/v1/banyandb-stream.proto
@@ -22,6 +22,7 @@ option java_package = "org.apache.skywalking.banyandb.stream.v1";
 package banyandb.stream.v1;
 
 import "google/protobuf/timestamp.proto";
+import "validate/validate.proto";
 import "banyandb/v1/banyandb-common.proto";
 import "banyandb/v1/banyandb-model.proto";
 
@@ -31,7 +32,7 @@ import "banyandb/v1/banyandb-model.proto";
 message Element {
   // element_id could be span_id of a Span or segment_id of a Segment in the context of stream
   string element_id = 1;
-  // timestamp represents
+  // timestamp represents a millisecond
   // 1) either the start time of a Span/Segment,
   // 2) or the timestamp of a log
   google.protobuf.Timestamp timestamp = 2;
@@ -40,7 +41,7 @@ message Element {
   // - duration
   // - service_name
   // - service_instance_id
-  // - end_time_nanoseconds
+  // - end_time_milliseconds
   repeated model.v1.TagFamily tag_families = 3;
 }
 
@@ -53,8 +54,8 @@ message QueryResponse {
 // QueryRequest is the request contract for query.
 message QueryRequest {
   // metadata is required
-  common.v1.Metadata metadata = 1;
-  // time_range is a range query with begin/end time of entities in the timeunit of nanoseconds.
+  common.v1.Metadata metadata = 1 [(validate.rules).message.required = true];
+  // time_range is a range query with begin/end time of entities in the timeunit of milliseconds.
   // In the context of stream, it represents the range of the `startTime` for spans/segments,
   // while in the context of Log, it means the range of the timestamp(s) for logs.
   // it is always recommended to specify time range for performance reason
@@ -66,15 +67,15 @@ message QueryRequest {
   // order_by is given to specify the sort for a field. So far, only fields in the type of Integer are supported
   model.v1.QueryOrder order_by = 5;
   // tag_families are indexed.
-  repeated model.v1.Criteria criteria = 6;
+  model.v1.Criteria criteria = 6;
   // projection can be used to select the key names of the element in the response
-  model.v1.TagProjection projection = 7;
+  model.v1.TagProjection projection = 7 [(validate.rules).message.required = true];
 }
 
 message ElementValue {
   // element_id could be span_id of a Span or segment_id of a Segment in the context of stream
   string element_id = 1;
-  // timestamp_nanoseconds is in the timeunit of nanoseconds. It represents
+  // timestamp is in the timeunit of milliseconds. It represents
   // 1) either the start time of a Span/Segment,
   // 2) or the timestamp of a log
   google.protobuf.Timestamp timestamp = 2;
@@ -84,9 +85,9 @@ message ElementValue {
 
 message WriteRequest {
   // the metadata is only required in the first write.
-  common.v1.Metadata metadata = 1;
+  common.v1.Metadata metadata = 1 [(validate.rules).message.required = true];
   // the element is required.
-  ElementValue element = 2;
+  ElementValue element = 2 [(validate.rules).message.required = true];
 }
 
 message WriteResponse {}
diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientMeasureQueryTest.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientMeasureQueryTest.java
index a37b4aa..daba42e 100644
--- a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientMeasureQueryTest.java
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientMeasureQueryTest.java
@@ -108,10 +108,20 @@ public class BanyanDBClientMeasureQueryTest extends AbstractBanyanDBClientTest {
         Assert.assertEquals(end.toEpochMilli() / 1000, request.getTimeRange().getEnd().getSeconds());
         Assert.assertEquals(TimeUnit.MILLISECONDS.toNanos(end.toEpochMilli() % 1000), request.getTimeRange().getEnd().getNanos());
         // assert fields, we only have state as a condition which should be state
-        Assert.assertEquals(1, request.getCriteriaCount());
-        // assert state
-        Assert.assertEquals(BanyandbModel.Condition.BinaryOp.BINARY_OP_EQ, request.getCriteria(0).getConditions(0).getOp());
-        Assert.assertEquals(0L, request.getCriteria(0).getConditions(0).getValue().getInt().getValue());
+        Assert.assertEquals("le {\n" +
+                "  op: LOGICAL_OP_AND\n" +
+                "  left {\n" +
+                "    condition {\n" +
+                "      name: \"entity_id\"\n" +
+                "      op: BINARY_OP_EQ\n" +
+                "      value {\n" +
+                "        str {\n" +
+                "          value: \"abc\"\n" +
+                "        }\n" +
+                "      }\n" +
+                "    }\n" +
+                "  }\n" +
+                "}", request.getCriteria().toString().trim());
         // assert projections
         assertCollectionEqual(Lists.newArrayList("default:id", "default:entity_id"),
                 parseProjectionList(request.getTagProjection()));
diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientStreamQueryTest.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientStreamQueryTest.java
index 9656955..89b10c8 100644
--- a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientStreamQueryTest.java
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientStreamQueryTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.skywalking.banyandb.v1.client;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
@@ -117,14 +116,23 @@ public class BanyanDBClientStreamQueryTest extends AbstractBanyanDBClientTest {
         Assert.assertEquals(TimeUnit.MILLISECONDS.toNanos(begin.toEpochMilli() % 1000), request.getTimeRange().getBegin().getNanos());
         Assert.assertEquals(end.toEpochMilli() / 1000, request.getTimeRange().getEnd().getSeconds());
         Assert.assertEquals(TimeUnit.MILLISECONDS.toNanos(end.toEpochMilli() % 1000), request.getTimeRange().getEnd().getNanos());
-        // assert fields, we only have state as a condition which should be state
-        Assert.assertEquals(1, request.getCriteriaCount());
+        // assert criteria
+        Assert.assertEquals("le {\n" +
+                "  op: LOGICAL_OP_AND\n" +
+                "  left {\n" +
+                "    condition {\n" +
+                "      name: \"state\"\n" +
+                "      op: BINARY_OP_EQ\n" +
+                "      value {\n" +
+                "        int {\n" +
+                "        }\n" +
+                "      }\n" +
+                "    }\n" +
+                "  }\n" +
+                "}", request.getCriteria().toString().trim());
         // assert orderBy, by default DESC
         Assert.assertEquals(BanyandbModel.Sort.SORT_DESC, request.getOrderBy().getSort());
         Assert.assertEquals("duration", request.getOrderBy().getIndexRuleName());
-        // assert state
-        Assert.assertEquals(BanyandbModel.Condition.BinaryOp.BINARY_OP_EQ, request.getCriteria(0).getConditions(0).getOp());
-        Assert.assertEquals(0L, request.getCriteria(0).getConditions(0).getValue().getInt().getValue());
         // assert projections
         assertCollectionEqual(Lists.newArrayList("searchable:duration", "searchable:state", "searchable:start_time", "searchable:trace_id"),
                 parseProjectionList(request.getProjection()));
@@ -163,22 +171,106 @@ public class BanyanDBClientStreamQueryTest extends AbstractBanyanDBClientTest {
         // assert timeRange
         Assert.assertEquals(begin.getEpochSecond(), request.getTimeRange().getBegin().getSeconds());
         Assert.assertEquals(end.getEpochSecond(), request.getTimeRange().getEnd().getSeconds());
-        // assert fields, we only have state as a condition
-        Assert.assertEquals(6, request.getCriteria(0).getConditionsCount());
+        // assert criteria
+        Assert.assertEquals("le {\n" +
+                "  op: LOGICAL_OP_AND\n" +
+                "  left {\n" +
+                "    condition {\n" +
+                "      name: \"duration\"\n" +
+                "      op: BINARY_OP_LE\n" +
+                "      value {\n" +
+                "        int {\n" +
+                "          value: 100\n" +
+                "        }\n" +
+                "      }\n" +
+                "    }\n" +
+                "  }\n" +
+                "  right {\n" +
+                "    le {\n" +
+                "      op: LOGICAL_OP_AND\n" +
+                "      left {\n" +
+                "        condition {\n" +
+                "          name: \"duration\"\n" +
+                "          op: BINARY_OP_GE\n" +
+                "          value {\n" +
+                "            int {\n" +
+                "              value: 10\n" +
+                "            }\n" +
+                "          }\n" +
+                "        }\n" +
+                "      }\n" +
+                "      right {\n" +
+                "        le {\n" +
+                "          op: LOGICAL_OP_AND\n" +
+                "          left {\n" +
+                "            condition {\n" +
+                "              name: \"endpoint_id\"\n" +
+                "              op: BINARY_OP_EQ\n" +
+                "              value {\n" +
+                "                str {\n" +
+                "                  value: \"/check_0\"\n" +
+                "                }\n" +
+                "              }\n" +
+                "            }\n" +
+                "          }\n" +
+                "          right {\n" +
+                "            le {\n" +
+                "              op: LOGICAL_OP_AND\n" +
+                "              left {\n" +
+                "                condition {\n" +
+                "                  name: \"service_instance_id\"\n" +
+                "                  op: BINARY_OP_EQ\n" +
+                "                  value {\n" +
+                "                    str {\n" +
+                "                      value: \"service_id_b_1\"\n" +
+                "                    }\n" +
+                "                  }\n" +
+                "                }\n" +
+                "              }\n" +
+                "              right {\n" +
+                "                le {\n" +
+                "                  op: LOGICAL_OP_AND\n" +
+                "                  left {\n" +
+                "                    condition {\n" +
+                "                      name: \"service_id\"\n" +
+                "                      op: BINARY_OP_EQ\n" +
+                "                      value {\n" +
+                "                        str {\n" +
+                "                          value: \"service_id_b\"\n" +
+                "                        }\n" +
+                "                      }\n" +
+                "                    }\n" +
+                "                  }\n" +
+                "                  right {\n" +
+                "                    le {\n" +
+                "                      op: LOGICAL_OP_AND\n" +
+                "                      left {\n" +
+                "                        condition {\n" +
+                "                          name: \"state\"\n" +
+                "                          op: BINARY_OP_EQ\n" +
+                "                          value {\n" +
+                "                            int {\n" +
+                "                              value: 1\n" +
+                "                            }\n" +
+                "                          }\n" +
+                "                        }\n" +
+                "                      }\n" +
+                "                    }\n" +
+                "                  }\n" +
+                "                }\n" +
+                "              }\n" +
+                "            }\n" +
+                "          }\n" +
+                "        }\n" +
+                "      }\n" +
+                "    }\n" +
+                "  }\n" +
+                "}\n", request.getCriteria().toString());
         // assert orderBy, by default DESC
         Assert.assertEquals(BanyandbModel.Sort.SORT_ASC, request.getOrderBy().getSort());
         Assert.assertEquals("start_time", request.getOrderBy().getIndexRuleName());
         // assert projections
         assertCollectionEqual(Lists.newArrayList("searchable:duration", "searchable:state", "searchable:start_time", "searchable:trace_id"), parseProjectionList(request.getProjection()));
-        // assert fields
-        assertCollectionEqual(request.getCriteria(0).getConditionsList(), ImmutableList.of(
-                PairQueryCondition.LongQueryCondition.ge("duration", minDuration).build(), // 1 -> duration >= minDuration
-                PairQueryCondition.LongQueryCondition.le("duration", maxDuration).build(), // 2 -> duration <= maxDuration
-                PairQueryCondition.StringQueryCondition.eq("service_id", serviceId).build(), // 3 -> service_id
-                PairQueryCondition.StringQueryCondition.eq("service_instance_id", serviceInstanceId).build(), // 4 -> service_instance_id
-                PairQueryCondition.StringQueryCondition.eq("endpoint_id", endpointId).build(), // 5 -> endpoint_id
-                PairQueryCondition.LongQueryCondition.eq("state", 1L).build() // 7 -> state
-        ));
     }
 
     @Test
@@ -196,11 +288,20 @@ public class BanyanDBClientStreamQueryTest extends AbstractBanyanDBClientTest {
         // assert metadata
         Assert.assertEquals("sw", request.getMetadata().getName());
         Assert.assertEquals("default", request.getMetadata().getGroup());
-        Assert.assertEquals(1, request.getCriteria(0).getConditionsCount());
-        // assert fields
-        assertCollectionEqual(request.getCriteria(0).getConditionsList(), ImmutableList.of(
-                PairQueryCondition.StringQueryCondition.eq("trace_id", traceId).build()
-        ));
+        Assert.assertEquals("le {\n" +
+                "  op: LOGICAL_OP_AND\n" +
+                "  left {\n" +
+                "    condition {\n" +
+                "      name: \"trace_id\"\n" +
+                "      op: BINARY_OP_EQ\n" +
+                "      value {\n" +
+                "        str {\n" +
+                "          value: \"1111.222.333\"\n" +
+                "        }\n" +
+                "      }\n" +
+                "    }\n" +
+                "  }\n" +
+                "}\n", request.getCriteria().toString());
     }
 
     @Test
diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java
index 3561f78..c12a003 100644
--- a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java
@@ -24,11 +24,13 @@ import org.apache.skywalking.banyandb.v1.client.metadata.Catalog;
 import org.apache.skywalking.banyandb.v1.client.metadata.Duration;
 import org.apache.skywalking.banyandb.v1.client.metadata.Group;
 import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule;
+import org.apache.skywalking.banyandb.v1.client.metadata.IntervalRule;
 import org.apache.skywalking.banyandb.v1.client.metadata.Measure;
 import org.apache.skywalking.banyandb.v1.client.metadata.TagFamilySpec;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -38,26 +40,16 @@ import java.util.concurrent.TimeUnit;
 
 import static org.awaitility.Awaitility.await;
 
+@Ignore
 public class ITBanyanDBMeasureQueryTests extends BanyanDBClientTestCI {
     private MeasureBulkWriteProcessor processor;
 
     @Before
     public void setUp() throws IOException, BanyanDBException, InterruptedException {
         this.setUpConnection();
-        Group expectedGroup = this.client.define(
-                Group.create("sw_metric", Catalog.MEASURE, 2, 12, Duration.ofDays(7))
-        );
+        Group expectedGroup = this.client.define(Group.create("sw_metric", Catalog.MEASURE, 2, IntervalRule.create(IntervalRule.Unit.HOUR, 4), IntervalRule.create(IntervalRule.Unit.DAY, 1), IntervalRule.create(IntervalRule.Unit.DAY, 7)));
         Assert.assertNotNull(expectedGroup);
-        Measure expectedMeasure = Measure.create("sw_metric", "service_cpm_minute", Duration.ofMinutes(1))
-                .setEntityRelativeTags("entity_id")
-                .addTagFamily(TagFamilySpec.create("default")
-                        .addIDTagSpec()
-                        .addTagSpec(TagFamilySpec.TagSpec.newStringTag("entity_id"))
-                        .build())
-                .addField(Measure.FieldSpec.newIntField("total").compressWithZSTD().encodeWithGorilla().build())
-                .addField(Measure.FieldSpec.newIntField("value").compressWithZSTD().encodeWithGorilla().build())
-                .addIndex(IndexRule.create("scope", IndexRule.IndexType.INVERTED, IndexRule.IndexLocation.SERIES))
-                .build();
+        Measure expectedMeasure = Measure.create("sw_metric", "service_cpm_minute", Duration.ofMinutes(1)).setEntityRelativeTags("entity_id").addTagFamily(TagFamilySpec.create("default").addIDTagSpec().addTagSpec(TagFamilySpec.TagSpec.newStringTag("entity_id")).build()).addField(Measure.FieldSpec.newIntField("total").compressWithZSTD().encodeWithGorilla().build()).addField(Measure.FieldSpec.newIntField("value").compressWithZSTD().encodeWithGorilla().build()).addIndex(IndexRule.create("sc [...]
         client.define(expectedMeasure);
         Assert.assertNotNull(expectedMeasure);
         processor = client.buildMeasureWriteProcessor(1000, 1, 1);
@@ -78,16 +70,11 @@ public class ITBanyanDBMeasureQueryTests extends BanyanDBClientTestCI {
         Instant begin = now.minus(15, ChronoUnit.MINUTES);
 
         MeasureWrite measureWrite = new MeasureWrite("sw_metric", "service_cpm_minute", now.toEpochMilli());
-        measureWrite.tag("id", TagAndValue.idTagValue("1"))
-                .tag("entity_id", TagAndValue.stringTagValue("entity_1"))
-                .field("total", TagAndValue.longFieldValue(100))
-                .field("value", TagAndValue.longFieldValue(1));
+        measureWrite.tag("id", TagAndValue.idTagValue("1")).tag("entity_id", TagAndValue.stringTagValue("entity_1")).field("total", TagAndValue.longFieldValue(100)).field("value", TagAndValue.longFieldValue(1));
 
         processor.add(measureWrite);
 
-        MeasureQuery query = new MeasureQuery("sw_metric", "service_cpm_minute",
-                new TimestampRange(begin.toEpochMilli(), now.plus(1, ChronoUnit.MINUTES).toEpochMilli()),
-                ImmutableSet.of("id", "entity_id"), // tags
+        MeasureQuery query = new MeasureQuery("sw_metric", "service_cpm_minute", new TimestampRange(begin.toEpochMilli(), now.plus(1, ChronoUnit.MINUTES).toEpochMilli()), ImmutableSet.of("id", "entity_id"), // tags
                 ImmutableSet.of("total")); // fields
         client.query(query);
 
diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBPropertyTests.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBPropertyTests.java
index 08b8d41..fca4b70 100644
--- a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBPropertyTests.java
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBPropertyTests.java
@@ -21,12 +21,13 @@ package org.apache.skywalking.banyandb.v1.client;
 import io.grpc.Status;
 import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
 import org.apache.skywalking.banyandb.v1.client.metadata.Catalog;
-import org.apache.skywalking.banyandb.v1.client.metadata.Duration;
 import org.apache.skywalking.banyandb.v1.client.metadata.Group;
+import org.apache.skywalking.banyandb.v1.client.metadata.IntervalRule;
 import org.apache.skywalking.banyandb.v1.client.metadata.Property;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -34,12 +35,15 @@ import java.util.concurrent.TimeUnit;
 
 import static org.awaitility.Awaitility.await;
 
+@Ignore
 public class ITBanyanDBPropertyTests extends BanyanDBClientTestCI {
     @Before
     public void setUp() throws IOException, BanyanDBException, InterruptedException {
         super.setUpConnection();
         Group expectedGroup = this.client.define(
-                Group.create("default", Catalog.STREAM, 2, 0, Duration.ofDays(7))
+                Group.create("default", Catalog.STREAM, 2, IntervalRule.create(IntervalRule.Unit.HOUR, 4),
+                        IntervalRule.create(IntervalRule.Unit.DAY, 1),
+                        IntervalRule.create(IntervalRule.Unit.DAY, 7))
         );
         Assert.assertNotNull(expectedGroup);
     }
@@ -54,7 +58,7 @@ public class ITBanyanDBPropertyTests extends BanyanDBClientTestCI {
         Property property = Property.create("default", "sw", "ui_template")
                 .addTag(TagAndValue.newStringTag("name", "hello"))
                 .build();
-        this.client.save(property);
+        Assert.assertTrue(this.client.apply(property).created());
 
         await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
             Property gotProperty = client.findProperty("default", "sw", "ui_template");
@@ -68,7 +72,7 @@ public class ITBanyanDBPropertyTests extends BanyanDBClientTestCI {
         Property property = Property.create("default", "sw", "ui_template")
                 .addTag(TagAndValue.newStringTag("name", "hello"))
                 .build();
-        this.client.save(property);
+        Assert.assertTrue(this.client.apply(property).created());
 
         await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
             Property gotProperty = client.findProperty("default", "sw", "ui_template");
@@ -76,7 +80,7 @@ public class ITBanyanDBPropertyTests extends BanyanDBClientTestCI {
             Assert.assertEquals(property, gotProperty);
         });
 
-        Assert.assertTrue(this.client.deleteProperty("default", "sw", "ui_template"));
+        Assert.assertTrue(this.client.deleteProperty("default", "sw", "ui_template").deleted());
 
         try {
             client.findProperty("default", "sw", "ui_template");
@@ -91,12 +95,12 @@ public class ITBanyanDBPropertyTests extends BanyanDBClientTestCI {
         Property property1 = Property.create("default", "sw", "ui_template")
                 .addTag(TagAndValue.newStringTag("name", "hello"))
                 .build();
-        this.client.save(property1);
+        Assert.assertTrue(this.client.apply(property1).created());
 
         Property property2 = Property.create("default", "sw", "ui_template")
                 .addTag(TagAndValue.newStringTag("name", "world"))
                 .build();
-        this.client.save(property2);
+        Assert.assertFalse(this.client.apply(property2).created());
 
         await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
             Property gotProperty = client.findProperty("default", "sw", "ui_template");
diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java
index fa5ae0a..e2c9124 100644
--- a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java
@@ -21,14 +21,15 @@ package org.apache.skywalking.banyandb.v1.client;
 import com.google.common.collect.ImmutableSet;
 import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
 import org.apache.skywalking.banyandb.v1.client.metadata.Catalog;
-import org.apache.skywalking.banyandb.v1.client.metadata.Duration;
 import org.apache.skywalking.banyandb.v1.client.metadata.Group;
 import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule;
+import org.apache.skywalking.banyandb.v1.client.metadata.IntervalRule;
 import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
 import org.apache.skywalking.banyandb.v1.client.metadata.TagFamilySpec;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -37,6 +38,7 @@ import java.util.concurrent.TimeUnit;
 
 import static org.awaitility.Awaitility.await;
 
+@Ignore
 public class ITBanyanDBStreamQueryTests extends BanyanDBClientTestCI {
     private StreamBulkWriteProcessor processor;
 
@@ -44,7 +46,9 @@ public class ITBanyanDBStreamQueryTests extends BanyanDBClientTestCI {
     public void setUp() throws IOException, BanyanDBException, InterruptedException {
         this.setUpConnection();
         Group expectedGroup = this.client.define(
-                Group.create("default", Catalog.STREAM, 2, 0, Duration.ofDays(7))
+                Group.create("default", Catalog.STREAM, 2, IntervalRule.create(IntervalRule.Unit.HOUR, 4),
+                        IntervalRule.create(IntervalRule.Unit.DAY, 1),
+                        IntervalRule.create(IntervalRule.Unit.DAY, 7))
         );
         Assert.assertNotNull(expectedGroup);
         Stream expectedStream = Stream.create("default", "sw")
diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/metadata/PropertyStoreTest.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/metadata/PropertyStoreTest.java
index 062c932..79e14fe 100644
--- a/src/test/java/org/apache/skywalking/banyandb/v1/client/metadata/PropertyStoreTest.java
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/metadata/PropertyStoreTest.java
@@ -51,22 +51,18 @@ public class PropertyStoreTest extends AbstractBanyanDBClientTest {
     private final PropertyServiceGrpc.PropertyServiceImplBase propertyServiceImpl = mock(PropertyServiceGrpc.PropertyServiceImplBase.class, delegatesTo(
             new PropertyServiceGrpc.PropertyServiceImplBase() {
                 @Override
-                public void create(BanyandbProperty.CreateRequest request, StreamObserver<BanyandbProperty.CreateResponse> responseObserver) {
+                public void apply(BanyandbProperty.ApplyRequest request, StreamObserver<BanyandbProperty.ApplyResponse> responseObserver) {
                     BanyandbProperty.Property p = request.getProperty().toBuilder()
                             .setUpdatedAt(TimeUtils.buildTimestamp(ZonedDateTime.now()))
                             .build();
+                    String key = format(p.getMetadata());
+                    BanyandbProperty.Property v = memory.get(key);
                     memory.put(format(p.getMetadata()), p);
-                    responseObserver.onNext(BanyandbProperty.CreateResponse.newBuilder().build());
-                    responseObserver.onCompleted();
-                }
-
-                @Override
-                public void update(BanyandbProperty.UpdateRequest request, StreamObserver<BanyandbProperty.UpdateResponse> responseObserver) {
-                    BanyandbProperty.Property p = request.getProperty().toBuilder()
-                            .setUpdatedAt(TimeUtils.buildTimestamp(ZonedDateTime.now()))
-                            .build();
-                    memory.put(format(p.getMetadata()), p);
-                    responseObserver.onNext(BanyandbProperty.UpdateResponse.newBuilder().build());
+                    if (v == null) {
+                        responseObserver.onNext(BanyandbProperty.ApplyResponse.newBuilder().setCreated(true).setTagsNum(p.getTagsCount()).build());
+                    } else {
+                        responseObserver.onNext(BanyandbProperty.ApplyResponse.newBuilder().setCreated(false).setTagsNum(p.getTagsCount()).build());
+                    }
                     responseObserver.onCompleted();
                 }
 
@@ -99,11 +95,16 @@ public class PropertyStoreTest extends AbstractBanyanDBClientTest {
     }
 
     @Test
-    public void testPropertyStore_create() throws BanyanDBException {
+    public void testPropertyStore_apply() throws BanyanDBException {
         Property property = Property.create("default", "sw", "ui_template")
                 .addTag(TagAndValue.newStringTag("name", "hello"))
                 .build();
-        this.store.create(property);
+        Assert.assertTrue(this.store.apply(property).created());
+        Assert.assertEquals(memory.size(), 1);
+        property = Property.create("default", "sw", "ui_template")
+                .addTag(TagAndValue.newStringTag("name", "hello1"))
+                .build();
+        Assert.assertFalse(this.store.apply(property).created());
         Assert.assertEquals(memory.size(), 1);
     }
 
@@ -112,7 +113,7 @@ public class PropertyStoreTest extends AbstractBanyanDBClientTest {
         Property property = Property.create("default", "sw", "ui_template")
                 .addTag(TagAndValue.newStringTag("name", "hello"))
                 .build();
-        this.store.create(property);
+        Assert.assertTrue(this.store.apply(property).created());
         Property gotProperty = this.store.get("default", "sw", "ui_template");
         Assert.assertNotNull(gotProperty);
         Assert.assertEquals(property, gotProperty);
@@ -124,7 +125,7 @@ public class PropertyStoreTest extends AbstractBanyanDBClientTest {
         Property property = Property.create("default", "sw", "ui_template")
                 .addTag(TagAndValue.newStringTag("name", "hello"))
                 .build();
-        this.store.create(property);
+        Assert.assertTrue(this.store.apply(property, PropertyStore.Strategy.REPLACE).created());
         List<Property> listProperties = this.store.list("default", "sw");
         Assert.assertNotNull(listProperties);
         Assert.assertEquals(1, listProperties.size());
@@ -136,8 +137,8 @@ public class PropertyStoreTest extends AbstractBanyanDBClientTest {
         Property property = Property.create("default", "sw", "ui_template")
                 .addTag(TagAndValue.newStringTag("name", "hello"))
                 .build();
-        this.store.create(property);
-        boolean deleted = this.store.delete("default", "sw", "ui_template");
+        Assert.assertTrue(this.store.apply(property).created());
+        boolean deleted = this.store.delete("default", "sw", "ui_template").deleted();
         Assert.assertTrue(deleted);
         Assert.assertEquals(0, memory.size());
     }