You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/03/11 17:57:13 UTC

[iotdb] branch master updated: [IOTDB-5656] subscription-api (#9263)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e9af4e89ae [IOTDB-5656] subscription-api (#9263)
e9af4e89ae is described below

commit e9af4e89aed5e887f04839a3a50f488acae18c90
Author: CritasWang <cr...@outlook.com>
AuthorDate: Sun Mar 12 01:57:07 2023 +0800

    [IOTDB-5656] subscription-api (#9263)
    
    Co-authored-by: Steve Yurong Su <ro...@apache.org>
---
 pom.xml                                            |   1 +
 subscription-api/pom.xml                           |  68 ++++++++++++
 .../api/SubscriptionConfiguration.java             | 114 +++++++++++++++++++++
 .../subscription/api/SubscriptionFactory.java      |  47 +++++++++
 .../iotdb/subscription/api/consumer/Consumer.java  |  56 ++++++++++
 .../api/consumer/pull/PullConsumer.java            |  38 +++++++
 .../api/consumer/push/DataArrivalListener.java     |  37 +++++++
 .../api/consumer/push/ExceptionListener.java       |  34 ++++++
 .../api/consumer/push/PushConsumer.java            |  68 ++++++++++++
 .../api/dataset/SubscriptionDataSet.java           |  22 ++++
 .../api/exception/SubscriptionException.java       |  31 ++++++
 .../SubscriptionStrategyNotValidException.java     |  31 ++++++
 .../api/strategy/SubscriptionStrategy.java         |  29 ++++++
 .../disorder/DisorderHandlingStrategy.java         |  35 +++++++
 .../api/strategy/disorder/IntolerableStrategy.java |  32 ++++++
 .../api/strategy/disorder/WatermarkStrategy.java   |  36 +++++++
 .../strategy/topic/MultipleConnectionStrategy.java |  63 ++++++++++++
 .../api/strategy/topic/SingleTopicStrategy.java    |  52 ++++++++++
 .../api/strategy/topic/TopicsStrategy.java         |  24 +++++
 19 files changed, 818 insertions(+)

diff --git a/pom.xml b/pom.xml
index 71b2c8ffbf..f93eb4a716 100644
--- a/pom.xml
+++ b/pom.xml
@@ -126,6 +126,7 @@
         <module>isession</module>
         <module>mlnode</module>
         <module>pipe-api</module>
+        <module>subscription-api</module>
     </modules>
     <!-- Properties Management -->
     <properties>
diff --git a/subscription-api/pom.xml b/subscription-api/pom.xml
new file mode 100644
index 0000000000..750845fc84
--- /dev/null
+++ b/subscription-api/pom.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>iotdb-parent</artifactId>
+        <groupId>org.apache.iotdb</groupId>
+        <version>1.2.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>subscription-api</artifactId>
+    <profiles>
+        <profile>
+            <id>get-jar-with-dependencies</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <artifactId>maven-assembly-plugin</artifactId>
+                        <version>${maven.assembly.version}</version>
+                        <configuration>
+                            <descriptorRefs>
+                                <descriptorRef>jar-with-dependencies</descriptorRef>
+                            </descriptorRefs>
+                        </configuration>
+                        <executions>
+                            <execution>
+                                <id>make-assembly</id>
+                                <!-- this is used for inheritance merges -->
+                                <phase>package</phase>
+                                <!-- bind to the packaging phase -->
+                                <goals>
+                                    <goal>single</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>tsfile</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/SubscriptionConfiguration.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/SubscriptionConfiguration.java
new file mode 100644
index 0000000000..836202d0cd
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/SubscriptionConfiguration.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api;
+
+import org.apache.iotdb.subscription.api.exception.SubscriptionException;
+import org.apache.iotdb.subscription.api.strategy.disorder.DisorderHandlingStrategy;
+import org.apache.iotdb.subscription.api.strategy.topic.TopicsStrategy;
+
+public class SubscriptionConfiguration {
+
+  private String host;
+  private Integer port;
+  private String username;
+  private String password;
+  private String group;
+  private DisorderHandlingStrategy disorderHandlingStrategy;
+  private TopicsStrategy topicStrategy;
+
+  private SubscriptionConfiguration() {}
+
+  private void check() throws SubscriptionException {
+    if (host == null) {
+      throw new SubscriptionException("Host is not set!");
+    }
+    if (port == null) {
+      throw new SubscriptionException("Port is not set!");
+    }
+    if (username == null) {
+      throw new SubscriptionException("Username is not set!");
+    }
+    if (password == null) {
+      throw new SubscriptionException("Password is not set!");
+    }
+    if (group == null) {
+      throw new SubscriptionException("Group is not set!");
+    }
+
+    if (disorderHandlingStrategy == null) {
+      throw new SubscriptionException("WatermarkStrategy is not set!");
+    }
+    disorderHandlingStrategy.check();
+
+    if (topicStrategy == null) {
+      throw new SubscriptionException("TopicStrategy is not set!");
+    }
+    topicStrategy.check();
+  }
+
+  public static class Builder {
+
+    private final SubscriptionConfiguration subscriptionConfiguration;
+
+    public Builder() {
+      subscriptionConfiguration = new SubscriptionConfiguration();
+    }
+
+    public Builder host(String host) {
+      subscriptionConfiguration.host = host;
+      return this;
+    }
+
+    public Builder port(int port) {
+      subscriptionConfiguration.port = port;
+      return this;
+    }
+
+    public Builder username(String username) {
+      subscriptionConfiguration.username = username;
+      return this;
+    }
+
+    public Builder password(String password) {
+      subscriptionConfiguration.password = password;
+      return this;
+    }
+
+    public Builder group(String group) {
+      subscriptionConfiguration.group = group;
+      return this;
+    }
+
+    public Builder disorderHandlingStrategy(DisorderHandlingStrategy disorderHandlingStrategy) {
+      subscriptionConfiguration.disorderHandlingStrategy = disorderHandlingStrategy;
+      return this;
+    }
+
+    public Builder topicStrategy(TopicsStrategy topicStrategy) {
+      subscriptionConfiguration.topicStrategy = topicStrategy;
+      return this;
+    }
+
+    public SubscriptionConfiguration build() {
+      subscriptionConfiguration.check();
+      return subscriptionConfiguration;
+    }
+  }
+}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/SubscriptionFactory.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/SubscriptionFactory.java
new file mode 100644
index 0000000000..3da98245d6
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/SubscriptionFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api;
+
+import org.apache.iotdb.subscription.api.consumer.pull.PullConsumer;
+import org.apache.iotdb.subscription.api.consumer.push.PushConsumer;
+import org.apache.iotdb.subscription.api.exception.SubscriptionException;
+
+public interface SubscriptionFactory {
+
+  /**
+   * Create a push consumer.
+   *
+   * @param subscriptionConfiguration subscription configuration
+   * @return push consumer
+   * @throws SubscriptionException if the subscription configuration is not valid
+   */
+  PushConsumer createPushConsumer(SubscriptionConfiguration subscriptionConfiguration)
+      throws SubscriptionException;
+
+  /**
+   * Create a pull consumer.
+   *
+   * @param subscriptionConfiguration subscription configuration
+   * @return pull consumer
+   * @throws SubscriptionException if the subscription configuration is not valid
+   */
+  PullConsumer createPullConsumer(SubscriptionConfiguration subscriptionConfiguration)
+      throws SubscriptionException;
+}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/consumer/Consumer.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/consumer/Consumer.java
new file mode 100644
index 0000000000..bd042d9927
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/consumer/Consumer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api.consumer;
+
+import org.apache.iotdb.subscription.api.exception.SubscriptionException;
+
+import java.util.List;
+
+public interface Consumer extends AutoCloseable {
+
+  /** Open the subscription. */
+  void openSubscription() throws SubscriptionException;
+
+  /** Close the subscription. */
+  void closeSubscription() throws SubscriptionException;
+
+  /**
+   * Check if the subscription is closed.
+   *
+   * @return true if the subscription is closed, false otherwise
+   */
+  boolean isClosed();
+
+  /**
+   * Get the consumer group of the subscription.
+   *
+   * @return the consumer group
+   * @throws SubscriptionException if the consumer group cannot be retrieved
+   */
+  String consumerGroup() throws SubscriptionException;
+
+  /**
+   * Get the topics of the subscription.
+   *
+   * @return the topics
+   * @throws SubscriptionException if the topics cannot be retrieved
+   */
+  List<String> subscription() throws SubscriptionException;
+}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/consumer/pull/PullConsumer.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/consumer/pull/PullConsumer.java
new file mode 100644
index 0000000000..ebc7e9e83e
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/consumer/pull/PullConsumer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api.consumer.pull;
+
+import org.apache.iotdb.subscription.api.consumer.Consumer;
+import org.apache.iotdb.subscription.api.dataset.SubscriptionDataSet;
+import org.apache.iotdb.subscription.api.exception.SubscriptionException;
+
+import java.util.List;
+
+public interface PullConsumer extends Consumer {
+
+  /**
+   * Poll data from the subscription server.
+   *
+   * @param timeoutInMs timeout in milliseconds. If no data arrives in the given time, return null.
+   * @return a list of SubscriptionDataSets
+   * @throws SubscriptionException if any error occurs
+   */
+  List<SubscriptionDataSet> poll(long timeoutInMs) throws SubscriptionException;
+}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/consumer/push/DataArrivalListener.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/consumer/push/DataArrivalListener.java
new file mode 100644
index 0000000000..cd4aee2489
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/consumer/push/DataArrivalListener.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api.consumer.push;
+
+import org.apache.iotdb.subscription.api.dataset.SubscriptionDataSet;
+import org.apache.iotdb.subscription.api.exception.SubscriptionException;
+
+import java.util.List;
+
+@FunctionalInterface
+public interface DataArrivalListener {
+
+  /**
+   * Called when data arrives in the subscription.
+   *
+   * @param subscriptionDataSets the data
+   * @throws SubscriptionException if the data cannot be handled
+   */
+  void onDataArrival(List<SubscriptionDataSet> subscriptionDataSets) throws SubscriptionException;
+}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/consumer/push/ExceptionListener.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/consumer/push/ExceptionListener.java
new file mode 100644
index 0000000000..108e7d6614
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/consumer/push/ExceptionListener.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api.consumer.push;
+
+import org.apache.iotdb.subscription.api.exception.SubscriptionException;
+
+@FunctionalInterface
+public interface ExceptionListener {
+
+  /**
+   * Called when an exception occurs in the subscription.
+   *
+   * @param subscriptionException the exception
+   * @throws SubscriptionException if the exception cannot be handled
+   */
+  void onException(SubscriptionException subscriptionException) throws SubscriptionException;
+}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/consumer/push/PushConsumer.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/consumer/push/PushConsumer.java
new file mode 100644
index 0000000000..c8a4d35801
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/consumer/push/PushConsumer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api.consumer.push;
+
+import org.apache.iotdb.subscription.api.consumer.Consumer;
+import org.apache.iotdb.subscription.api.exception.SubscriptionException;
+
+public interface PushConsumer extends Consumer {
+
+  /**
+   * Register a listener to listen to the data arrival. The method should be called before the
+   * consumer is started. The DataArrivalListener can NOT be changed once the consumer is started.
+   *
+   * @param listener the listener to listen to the data arrival.
+   * @throws SubscriptionException if the listener cannot be registered. Mainly because the
+   *     PushConsumer is running.
+   */
+  void registerDataArrivalListener(DataArrivalListener listener) throws SubscriptionException;
+
+  /**
+   * Register a listener to listen to the exception. The method should be called before the consumer
+   * is started. The ExceptionListener can NOT be changed once the consumer is started.
+   *
+   * @param listener the listener to listen to the exception.
+   * @throws SubscriptionException if the listener cannot be registered. Mainly because the
+   *     PushConsumer is running.
+   */
+  void registerExceptionListener(ExceptionListener listener) throws SubscriptionException;
+
+  /**
+   * Start the consumer to listen to the data. If the consumer is already listening, do nothing.
+   *
+   * @throws SubscriptionException if the consumer cannot start, e.g. the DataArrivalListener is not
+   *     registered or the ExceptionListener is not registered.
+   */
+  void start() throws SubscriptionException;
+
+  /**
+   * Stop the consumer to listen to the data. If the consumer is not listening, do nothing.
+   *
+   * @throws SubscriptionException if the consumer cannot stop.
+   */
+  void stop() throws SubscriptionException;
+
+  /**
+   * Check if the consumer is listening to the data.
+   *
+   * @return true if the consumer is listening to the data, false otherwise.
+   */
+  boolean isListening();
+}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/dataset/SubscriptionDataSet.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/dataset/SubscriptionDataSet.java
new file mode 100644
index 0000000000..0192192898
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/dataset/SubscriptionDataSet.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api.dataset;
+
+public interface SubscriptionDataSet {}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/exception/SubscriptionException.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/exception/SubscriptionException.java
new file mode 100644
index 0000000000..3cb660c24c
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/exception/SubscriptionException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api.exception;
+
+public class SubscriptionException extends RuntimeException {
+
+  public SubscriptionException(String message) {
+    super(message);
+  }
+
+  public SubscriptionException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/exception/SubscriptionStrategyNotValidException.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/exception/SubscriptionStrategyNotValidException.java
new file mode 100644
index 0000000000..171f6c95a6
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/exception/SubscriptionStrategyNotValidException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api.exception;
+
+public class SubscriptionStrategyNotValidException extends SubscriptionException {
+
+  public SubscriptionStrategyNotValidException(String message) {
+    super(message);
+  }
+
+  public SubscriptionStrategyNotValidException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/SubscriptionStrategy.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/SubscriptionStrategy.java
new file mode 100644
index 0000000000..0fbfe56dad
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/SubscriptionStrategy.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api.strategy;
+
+import org.apache.iotdb.subscription.api.exception.SubscriptionStrategyNotValidException;
+
+/** Subscription strategy interface. */
+public interface SubscriptionStrategy {
+
+  /** @throws SubscriptionStrategyNotValidException if invalid strategy is set */
+  void check() throws SubscriptionStrategyNotValidException;
+}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/disorder/DisorderHandlingStrategy.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/disorder/DisorderHandlingStrategy.java
new file mode 100644
index 0000000000..57faca6f89
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/disorder/DisorderHandlingStrategy.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api.strategy.disorder;
+
+import org.apache.iotdb.subscription.api.strategy.SubscriptionStrategy;
+
+public abstract class DisorderHandlingStrategy implements SubscriptionStrategy {
+
+  protected final long watermark;
+
+  protected DisorderHandlingStrategy(long watermark) {
+    this.watermark = watermark;
+  }
+
+  public long getWatermark() {
+    return watermark;
+  }
+}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/disorder/IntolerableStrategy.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/disorder/IntolerableStrategy.java
new file mode 100644
index 0000000000..e358974b9e
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/disorder/IntolerableStrategy.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api.strategy.disorder;
+
+import org.apache.iotdb.subscription.api.exception.SubscriptionStrategyNotValidException;
+
+public class IntolerableStrategy extends DisorderHandlingStrategy {
+
+  protected IntolerableStrategy() {
+    super(0);
+  }
+
+  @Override
+  public void check() throws SubscriptionStrategyNotValidException {}
+}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/disorder/WatermarkStrategy.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/disorder/WatermarkStrategy.java
new file mode 100644
index 0000000000..7af3fc5ac3
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/disorder/WatermarkStrategy.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api.strategy.disorder;
+
+import org.apache.iotdb.subscription.api.exception.SubscriptionStrategyNotValidException;
+
+public class WatermarkStrategy extends DisorderHandlingStrategy {
+
+  public WatermarkStrategy(long watermark) {
+    super(watermark);
+  }
+
+  @Override
+  public void check() throws SubscriptionStrategyNotValidException {
+    if (watermark < 0) {
+      throw new SubscriptionStrategyNotValidException("watermark should be a non-negative number!");
+    }
+  }
+}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/topic/MultipleConnectionStrategy.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/topic/MultipleConnectionStrategy.java
new file mode 100644
index 0000000000..18af9687f2
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/topic/MultipleConnectionStrategy.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api.strategy.topic;
+
+import org.apache.iotdb.subscription.api.exception.SubscriptionStrategyNotValidException;
+import org.apache.iotdb.tsfile.read.common.parser.PathNodesGenerator;
+
+import org.antlr.v4.runtime.misc.ParseCancellationException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class MultipleConnectionStrategy implements TopicsStrategy {
+
+  private final List<String> topics;
+
+  public MultipleConnectionStrategy(List<String> topics) {
+    this.topics = topics;
+  }
+
+  public MultipleConnectionStrategy(String... topics) {
+    this.topics = new ArrayList<>();
+    this.topics.addAll(Arrays.asList(topics));
+  }
+
+  public List<String> getTopics() {
+    return topics;
+  }
+
+  @Override
+  public void check() throws SubscriptionStrategyNotValidException {
+    if (topics == null || topics.isEmpty()) {
+      throw new SubscriptionStrategyNotValidException("topics are not set!");
+    }
+    topics.forEach(
+        topic -> {
+          try {
+            PathNodesGenerator.checkPath(topic);
+          } catch (ParseCancellationException e) {
+            throw new SubscriptionStrategyNotValidException(
+                String.format("%s is not a legal path pattern!", topic), e);
+          }
+        });
+  }
+}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/topic/SingleTopicStrategy.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/topic/SingleTopicStrategy.java
new file mode 100644
index 0000000000..07ec534a12
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/topic/SingleTopicStrategy.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api.strategy.topic;
+
+import org.apache.iotdb.subscription.api.exception.SubscriptionStrategyNotValidException;
+import org.apache.iotdb.tsfile.read.common.parser.PathNodesGenerator;
+
+import org.antlr.v4.runtime.misc.ParseCancellationException;
+import org.apache.commons.lang3.StringUtils;
+
+public class SingleTopicStrategy implements TopicsStrategy {
+
+  private final String topic;
+
+  public SingleTopicStrategy(String topic) {
+    this.topic = topic;
+  }
+
+  public String getTopic() {
+    return topic;
+  }
+
+  @Override
+  public void check() throws SubscriptionStrategyNotValidException {
+    if (StringUtils.isAllBlank(topic)) {
+      throw new SubscriptionStrategyNotValidException("topic is not set!");
+    }
+    try {
+      PathNodesGenerator.checkPath(topic);
+    } catch (ParseCancellationException e) {
+      throw new SubscriptionStrategyNotValidException(
+          String.format("%s is not a legal path pattern", topic), e);
+    }
+  }
+}
diff --git a/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/topic/TopicsStrategy.java b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/topic/TopicsStrategy.java
new file mode 100644
index 0000000000..6826ca5149
--- /dev/null
+++ b/subscription-api/src/main/java/org/apache/iotdb/subscription/api/strategy/topic/TopicsStrategy.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.api.strategy.topic;
+
+import org.apache.iotdb.subscription.api.strategy.SubscriptionStrategy;
+
+public interface TopicsStrategy extends SubscriptionStrategy {}