You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/03/11 17:57:13 UTC
[iotdb] branch master updated: [IOTDB-5656] subscription-api (#9263)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e9af4e89ae [IOTDB-5656] subscription-api (#9263)
e9af4e89ae is described below
commit e9af4e89aed5e887f04839a3a50f488acae18c90
Author: CritasWang <cr...@outlook.com>
AuthorDate: Sun Mar 12 01:57:07 2023 +0800
[IOTDB-5656] subscription-api (#9263)
Co-authored-by: Steve Yurong Su <ro...@apache.org>
---
pom.xml | 1 +
subscription-api/pom.xml | 68 ++++++++++++
.../api/SubscriptionConfiguration.java | 114 +++++++++++++++++++++
.../subscription/api/SubscriptionFactory.java | 47 +++++++++
.../iotdb/subscription/api/consumer/Consumer.java | 56 ++++++++++
.../api/consumer/pull/PullConsumer.java | 38 +++++++
.../api/consumer/push/DataArrivalListener.java | 37 +++++++
.../api/consumer/push/ExceptionListener.java | 34 ++++++
.../api/consumer/push/PushConsumer.java | 68 ++++++++++++
.../api/dataset/SubscriptionDataSet.java | 22 ++++
.../api/exception/SubscriptionException.java | 31 ++++++
.../SubscriptionStrategyNotValidException.java | 31 ++++++
.../api/strategy/SubscriptionStrategy.java | 29 ++++++
.../disorder/DisorderHandlingStrategy.java | 35 +++++++
.../api/strategy/disorder/IntolerableStrategy.java | 32 ++++++
.../api/strategy/disorder/WatermarkStrategy.java | 36 +++++++
.../strategy/topic/MultipleConnectionStrategy.java | 63 ++++++++++++
.../api/strategy/topic/SingleTopicStrategy.java | 52 ++++++++++
.../api/strategy/topic/TopicsStrategy.java | 24 +++++
19 files changed, 818 insertions(+)
diff --git a/pom.xml b/pom.xml
index 71b2c8ffbf..f93eb4a716 100644
--- a/pom.xml
+++ b/pom.xml
@@ -126,6 +126,7 @@
<module>isession</module>
<module>mlnode</module>
<module>pipe-api</module>
+ <module>subscription-api</module>
</modules>
<!-- Properties Management -->
<properties>
diff --git a/subscription-api/pom.xml b/subscription-api/pom.xml
new file mode 100644
index 0000000000..750845fc84
--- /dev/null
+++ b/subscription-api/pom.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>iotdb-parent</artifactId>
+ <groupId>org.apache.iotdb</groupId>
+ <version>1.2.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>subscription-api</artifactId>
+ <profiles>
+ <profile>
+ <id>get-jar-with-dependencies</id>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>${maven.assembly.version}</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <!-- this is used for inheritance merges -->
+ <phase>package</phase>
+ <!-- bind to the packaging phase -->
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>tsfile</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/SubscriptionConfiguration.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/SubscriptionConfiguration.java
new file mode 100644
index 0000000000..836202d0cd
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/SubscriptionConfiguration.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api;
+
+import org.apache.iotdb.subscription.api.exception.SubscriptionException;
+import org.apache.iotdb.subscription.api.strategy.disorder.DisorderHandlingStrategy;
+import org.apache.iotdb.subscription.api.strategy.topic.TopicsStrategy;
+
+public class SubscriptionConfiguration {
+
+ private String host;
+ private Integer port;
+ private String username;
+ private String password;
+ private String group;
+ private DisorderHandlingStrategy disorderHandlingStrategy;
+ private TopicsStrategy topicStrategy;
+
+ private SubscriptionConfiguration() {}
+
+ private void check() throws SubscriptionException {
+ if (host == null) {
+ throw new SubscriptionException("Host is not set!");
+ }
+ if (port == null) {
+ throw new SubscriptionException("Port is not set!");
+ }
+ if (username == null) {
+ throw new SubscriptionException("Username is not set!");
+ }
+ if (password == null) {
+ throw new SubscriptionException("Password is not set!");
+ }
+ if (group == null) {
+ throw new SubscriptionException("Group is not set!");
+ }
+
+ if (disorderHandlingStrategy == null) {
+ throw new SubscriptionException("WatermarkStrategy is not set!");
+ }
+ disorderHandlingStrategy.check();
+
+ if (topicStrategy == null) {
+ throw new SubscriptionException("TopicStrategy is not set!");
+ }
+ topicStrategy.check();
+ }
+
+ public static class Builder {
+
+ private final SubscriptionConfiguration subscriptionConfiguration;
+
+ public Builder() {
+ subscriptionConfiguration = new SubscriptionConfiguration();
+ }
+
+ public Builder host(String host) {
+ subscriptionConfiguration.host = host;
+ return this;
+ }
+
+ public Builder port(int port) {
+ subscriptionConfiguration.port = port;
+ return this;
+ }
+
+ public Builder username(String username) {
+ subscriptionConfiguration.username = username;
+ return this;
+ }
+
+ public Builder password(String password) {
+ subscriptionConfiguration.password = password;
+ return this;
+ }
+
+ public Builder group(String group) {
+ subscriptionConfiguration.group = group;
+ return this;
+ }
+
+ public Builder disorderHandlingStrategy(DisorderHandlingStrategy disorderHandlingStrategy) {
+ subscriptionConfiguration.disorderHandlingStrategy = disorderHandlingStrategy;
+ return this;
+ }
+
+ public Builder topicStrategy(TopicsStrategy topicStrategy) {
+ subscriptionConfiguration.topicStrategy = topicStrategy;
+ return this;
+ }
+
+ public SubscriptionConfiguration build() {
+ subscriptionConfiguration.check();
+ return subscriptionConfiguration;
+ }
+ }
+}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/SubscriptionFactory.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/SubscriptionFactory.java
new file mode 100644
index 0000000000..3da98245d6
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/SubscriptionFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api;
+
+import org.apache.iotdb.subscription.api.consumer.pull.PullConsumer;
+import org.apache.iotdb.subscription.api.consumer.push.PushConsumer;
+import org.apache.iotdb.subscription.api.exception.SubscriptionException;
+
+public interface SubscriptionFactory {
+
+ /**
+ * Create a push consumer.
+ *
+ * @param subscriptionConfiguration subscription configuration
+ * @return push consumer
+ * @throws SubscriptionException if the subscription configuration is not valid
+ */
+ PushConsumer createPushConsumer(SubscriptionConfiguration subscriptionConfiguration)
+ throws SubscriptionException;
+
+ /**
+ * Create a pull consumer.
+ *
+ * @param subscriptionConfiguration subscription configuration
+ * @return pull consumer
+ * @throws SubscriptionException if the subscription configuration is not valid
+ */
+ PullConsumer createPullConsumer(SubscriptionConfiguration subscriptionConfiguration)
+ throws SubscriptionException;
+}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/consumer/Consumer.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/consumer/Consumer.java
new file mode 100644
index 0000000000..bd042d9927
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/consumer/Consumer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api.consumer;
+
+import org.apache.iotdb.subscription.api.exception.SubscriptionException;
+
+import java.util.List;
+
+public interface Consumer extends AutoCloseable {
+
+ /** Open the subscription. */
+ void openSubscription() throws SubscriptionException;
+
+ /** Close the subscription. */
+ void closeSubscription() throws SubscriptionException;
+
+ /**
+ * Check if the subscription is closed.
+ *
+ * @return true if the subscription is closed, false otherwise
+ */
+ boolean isClosed();
+
+ /**
+ * Get the consumer group of the subscription.
+ *
+ * @return the consumer group
+ * @throws SubscriptionException if the consumer group cannot be retrieved
+ */
+ String consumerGroup() throws SubscriptionException;
+
+ /**
+ * Get the topics of the subscription.
+ *
+ * @return the topics
+ * @throws SubscriptionException if the topics cannot be retrieved
+ */
+ List<String> subscription() throws SubscriptionException;
+}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/consumer/pull/PullConsumer.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/consumer/pull/PullConsumer.java
new file mode 100644
index 0000000000..ebc7e9e83e
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/consumer/pull/PullConsumer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api.consumer.pull;
+
+import org.apache.iotdb.subscription.api.consumer.Consumer;
+import org.apache.iotdb.subscription.api.dataset.SubscriptionDataSet;
+import org.apache.iotdb.subscription.api.exception.SubscriptionException;
+
+import java.util.List;
+
+public interface PullConsumer extends Consumer {
+
+ /**
+ * Poll data from the subscription server.
+ *
+ * @param timeoutInMs timeout in milliseconds. If no data arrives in the given time, return null.
+ * @return a list of SubscriptionDataSets
+ * @throws SubscriptionException if any error occurs
+ */
+ List<SubscriptionDataSet> poll(long timeoutInMs) throws SubscriptionException;
+}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/consumer/push/DataArrivalListener.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/consumer/push/DataArrivalListener.java
new file mode 100644
index 0000000000..cd4aee2489
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/consumer/push/DataArrivalListener.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api.consumer.push;
+
+import org.apache.iotdb.subscription.api.dataset.SubscriptionDataSet;
+import org.apache.iotdb.subscription.api.exception.SubscriptionException;
+
+import java.util.List;
+
+@FunctionalInterface
+public interface DataArrivalListener {
+
+ /**
+ * Called when data arrives in the subscription.
+ *
+ * @param subscriptionDataSets the data
+ * @throws SubscriptionException if the data cannot be handled
+ */
+ void onDataArrival(List<SubscriptionDataSet> subscriptionDataSets) throws SubscriptionException;
+}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/consumer/push/ExceptionListener.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/consumer/push/ExceptionListener.java
new file mode 100644
index 0000000000..108e7d6614
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/consumer/push/ExceptionListener.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api.consumer.push;
+
+import org.apache.iotdb.subscription.api.exception.SubscriptionException;
+
+@FunctionalInterface
+public interface ExceptionListener {
+
+ /**
+ * Called when an exception occurs in the subscription.
+ *
+ * @param subscriptionException the exception
+ * @throws SubscriptionException if the exception cannot be handled
+ */
+ void onException(SubscriptionException subscriptionException) throws SubscriptionException;
+}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/consumer/push/PushConsumer.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/consumer/push/PushConsumer.java
new file mode 100644
index 0000000000..c8a4d35801
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/consumer/push/PushConsumer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api.consumer.push;
+
+import org.apache.iotdb.subscription.api.consumer.Consumer;
+import org.apache.iotdb.subscription.api.exception.SubscriptionException;
+
+public interface PushConsumer extends Consumer {
+
+ /**
+ * Register a listener to listen to the data arrival. The method should be called before the
+ * consumer is started. The DataArrivalListener can NOT be changed once the consumer is started.
+ *
+ * @param listener the listener to listen to the data arrival.
+ * @throws SubscriptionException if the listener cannot be registered. Mainly because the
+ * PushConsumer is running.
+ */
+ void registerDataArrivalListener(DataArrivalListener listener) throws SubscriptionException;
+
+ /**
+ * Register a listener to listen to the exception. The method should be called before the consumer
+ * is started. The ExceptionListener can NOT be changed once the consumer is started.
+ *
+ * @param listener the listener to listen to the exception.
+ * @throws SubscriptionException if the listener cannot be registered. Mainly because the
+ * PushConsumer is running.
+ */
+ void registerExceptionListener(ExceptionListener listener) throws SubscriptionException;
+
+ /**
+ * Start the consumer to listen to the data. If the consumer is already listening, do nothing.
+ *
+ * @throws SubscriptionException if the consumer cannot start, e.g. the DataArrivalListener is not
+ * registered or the ExceptionListener is not registered.
+ */
+ void start() throws SubscriptionException;
+
+ /**
+ * Stop the consumer to listen to the data. If the consumer is not listening, do nothing.
+ *
+ * @throws SubscriptionException if the consumer cannot stop.
+ */
+ void stop() throws SubscriptionException;
+
+ /**
+ * Check if the consumer is listening to the data.
+ *
+ * @return true if the consumer is listening to the data, false otherwise.
+ */
+ boolean isListening();
+}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/dataset/SubscriptionDataSet.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/dataset/SubscriptionDataSet.java
new file mode 100644
index 0000000000..0192192898
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/dataset/SubscriptionDataSet.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api.dataset;
+
+public interface SubscriptionDataSet {}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/exception/SubscriptionException.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/exception/SubscriptionException.java
new file mode 100644
index 0000000000..3cb660c24c
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/exception/SubscriptionException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api.exception;
+
+public class SubscriptionException extends RuntimeException {
+
+ public SubscriptionException(String message) {
+ super(message);
+ }
+
+ public SubscriptionException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/exception/SubscriptionStrategyNotValidException.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/exception/SubscriptionStrategyNotValidException.java
new file mode 100644
index 0000000000..171f6c95a6
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/exception/SubscriptionStrategyNotValidException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api.exception;
+
+public class SubscriptionStrategyNotValidException extends SubscriptionException {
+
+ public SubscriptionStrategyNotValidException(String message) {
+ super(message);
+ }
+
+ public SubscriptionStrategyNotValidException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/SubscriptionStrategy.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/SubscriptionStrategy.java
new file mode 100644
index 0000000000..0fbfe56dad
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/SubscriptionStrategy.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api.strategy;
+
+import org.apache.iotdb.subscription.api.exception.SubscriptionStrategyNotValidException;
+
+/** Subscription strategy interface. */
+public interface SubscriptionStrategy {
+
+ /** @throws SubscriptionStrategyNotValidException if invalid strategy is set */
+ void check() throws SubscriptionStrategyNotValidException;
+}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/disorder/DisorderHandlingStrategy.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/disorder/DisorderHandlingStrategy.java
new file mode 100644
index 0000000000..57faca6f89
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/disorder/DisorderHandlingStrategy.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api.strategy.disorder;
+
+import org.apache.iotdb.subscription.api.strategy.SubscriptionStrategy;
+
+public abstract class DisorderHandlingStrategy implements SubscriptionStrategy {
+
+ protected final long watermark;
+
+ protected DisorderHandlingStrategy(long watermark) {
+ this.watermark = watermark;
+ }
+
+ public long getWatermark() {
+ return watermark;
+ }
+}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/disorder/IntolerableStrategy.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/disorder/IntolerableStrategy.java
new file mode 100644
index 0000000000..e358974b9e
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/disorder/IntolerableStrategy.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api.strategy.disorder;
+
+import org.apache.iotdb.subscription.api.exception.SubscriptionStrategyNotValidException;
+
+public class IntolerableStrategy extends DisorderHandlingStrategy {
+
+ protected IntolerableStrategy() {
+ super(0);
+ }
+
+ @Override
+ public void check() throws SubscriptionStrategyNotValidException {}
+}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/disorder/WatermarkStrategy.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/disorder/WatermarkStrategy.java
new file mode 100644
index 0000000000..7af3fc5ac3
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/disorder/WatermarkStrategy.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api.strategy.disorder;
+
+import org.apache.iotdb.subscription.api.exception.SubscriptionStrategyNotValidException;
+
+public class WatermarkStrategy extends DisorderHandlingStrategy {
+
+ public WatermarkStrategy(long watermark) {
+ super(watermark);
+ }
+
+ @Override
+ public void check() throws SubscriptionStrategyNotValidException {
+ if (watermark < 0) {
+ throw new SubscriptionStrategyNotValidException("watermark should be a non-negative number!");
+ }
+ }
+}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/topic/MultipleConnectionStrategy.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/topic/MultipleConnectionStrategy.java
new file mode 100644
index 0000000000..18af9687f2
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/topic/MultipleConnectionStrategy.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api.strategy.topic;
+
+import org.apache.iotdb.subscription.api.exception.SubscriptionStrategyNotValidException;
+import org.apache.iotdb.tsfile.read.common.parser.PathNodesGenerator;
+
+import org.antlr.v4.runtime.misc.ParseCancellationException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class MultipleConnectionStrategy implements TopicsStrategy {
+
+ private final List<String> topics;
+
+ public MultipleConnectionStrategy(List<String> topics) {
+ this.topics = topics;
+ }
+
+ public MultipleConnectionStrategy(String... topics) {
+ this.topics = new ArrayList<>();
+ this.topics.addAll(Arrays.asList(topics));
+ }
+
+ public List<String> getTopics() {
+ return topics;
+ }
+
+ @Override
+ public void check() throws SubscriptionStrategyNotValidException {
+ if (topics == null || topics.isEmpty()) {
+ throw new SubscriptionStrategyNotValidException("topics are not set!");
+ }
+ topics.forEach(
+ topic -> {
+ try {
+ PathNodesGenerator.checkPath(topic);
+ } catch (ParseCancellationException e) {
+ throw new SubscriptionStrategyNotValidException(
+ String.format("%s is not a legal path pattern!", topic), e);
+ }
+ });
+ }
+}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/topic/SingleTopicStrategy.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/topic/SingleTopicStrategy.java
new file mode 100644
index 0000000000..07ec534a12
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/topic/SingleTopicStrategy.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api.strategy.topic;
+
+import org.apache.iotdb.subscription.api.exception.SubscriptionStrategyNotValidException;
+import org.apache.iotdb.tsfile.read.common.parser.PathNodesGenerator;
+
+import org.antlr.v4.runtime.misc.ParseCancellationException;
+import org.apache.commons.lang3.StringUtils;
+
+public class SingleTopicStrategy implements TopicsStrategy {
+
+ private final String topic;
+
+ public SingleTopicStrategy(String topic) {
+ this.topic = topic;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ @Override
+ public void check() throws SubscriptionStrategyNotValidException {
+ if (StringUtils.isAllBlank(topic)) {
+ throw new SubscriptionStrategyNotValidException("topic is not set!");
+ }
+ try {
+ PathNodesGenerator.checkPath(topic);
+ } catch (ParseCancellationException e) {
+ throw new SubscriptionStrategyNotValidException(
+ String.format("%s is not a legal path pattern", topic), e);
+ }
+ }
+}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/topic/TopicsStrategy.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/topic/TopicsStrategy.java
new file mode 100644
index 0000000000..6826ca5149
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/topic/TopicsStrategy.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api.strategy.topic;
+
+import org.apache.iotdb.subscription.api.strategy.SubscriptionStrategy;
+
+public interface TopicsStrategy extends SubscriptionStrategy {}