You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/02/26 12:50:30 UTC

[iotdb] branch master updated: [IOTDB-5575] Pipe SDK: PipeProcessor & PipeConnector (#9131)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ad93ca04fb [IOTDB-5575] Pipe SDK: PipeProcessor & PipeConnector (#9131)
ad93ca04fb is described below

commit ad93ca04fbbfc76e70e50dc9507e3b1ca3cf0afe
Author: Itami Sho <42...@users.noreply.github.com>
AuthorDate: Sun Feb 26 20:50:23 2023 +0800

    [IOTDB-5575] Pipe SDK: PipeProcessor & PipeConnector (#9131)
    
    Co-authored-by: Steve Yurong Su <ro...@apache.org>
---
 pipe-api/pom.xml                                   |  68 +++++++++
 .../org/apache/iotdb/pipe/api/PipeConnector.java   | 136 ++++++++++++++++++
 .../org/apache/iotdb/pipe/api/PipeProcessor.java   | 117 ++++++++++++++++
 .../java/org/apache/iotdb/pipe/api/access/Row.java | 154 +++++++++++++++++++++
 .../apache/iotdb/pipe/api/access/RowIterator.java  |  75 ++++++++++
 .../iotdb/pipe/api/collector/EventCollector.java   |  62 +++++++++
 .../iotdb/pipe/api/collector/RowCollector.java     |  43 ++++++
 .../api/customizer/PipeParameterValidator.java     |  99 +++++++++++++
 .../iotdb/pipe/api/customizer/PipeParameters.java  | 112 +++++++++++++++
 .../api/customizer/PipeRuntimeConfiguration.java   |  28 ++++
 .../iotdb/pipe/api/customizer/PipeStrategy.java    |  28 ++++
 .../PipeConnectorRuntimeConfiguration.java         |  84 +++++++++++
 .../connector/parallel/ParallelStrategy.java       |  24 ++++
 .../retry/EqualRetryIntervalStrategy.java          |  63 +++++++++
 .../retry/ExponentialRetryIntervalStrategy.java    |  70 ++++++++++
 .../customizer/connector/retry/RetryStrategy.java  |  34 +++++
 .../customizer/connector/reuse/ReuseStrategy.java  |  24 ++++
 .../PipeProcessorRuntimeConfiguration.java         |  35 +++++
 .../org/apache/iotdb/pipe/api/event/Event.java     |  23 +++
 .../pipe/api/event/deletion/DeletionEvent.java     |  42 ++++++
 .../api/event/insertion/TabletInsertionEvent.java  |  56 ++++++++
 .../api/event/insertion/TsFileInsertionEvent.java  |  44 ++++++
 .../PipeAttributeNotProvidedException.java         |  27 ++++
 .../iotdb/pipe/api/exception/PipeException.java    |  31 +++++
 .../exception/PipeParameterNotValidException.java  |  27 ++++
 .../exception/PipeStrategyNotValidException.java   |  27 ++++
 .../org/apache/iotdb/pipe/api/type/Binary.java     | 151 ++++++++++++++++++++
 .../java/org/apache/iotdb/pipe/api/type/Type.java  |  50 +++++++
 pom.xml                                            |   1 +
 29 files changed, 1735 insertions(+)

diff --git a/pipe-api/pom.xml b/pipe-api/pom.xml
new file mode 100644
index 0000000000..9d97ace514
--- /dev/null
+++ b/pipe-api/pom.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>iotdb-parent</artifactId>
+        <groupId>org.apache.iotdb</groupId>
+        <version>1.1.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>pipe-api</artifactId>
+    <profiles>
+        <profile>
+            <id>get-jar-with-dependencies</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <artifactId>maven-assembly-plugin</artifactId>
+                        <version>${maven.assembly.version}</version>
+                        <configuration>
+                            <descriptorRefs>
+                                <descriptorRef>jar-with-dependencies</descriptorRef>
+                            </descriptorRefs>
+                        </configuration>
+                        <executions>
+                            <execution>
+                                <id>make-assembly</id>
+                                <!-- this is used for inheritance merges -->
+                                <phase>package</phase>
+                                <!-- bind to the packaging phase -->
+                                <goals>
+                                    <goal>single</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>tsfile</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
new file mode 100644
index 0000000000..cc79e19123
--- /dev/null
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api;
+
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+
+/**
+ * PipeConnector
+ *
+ * <p>PipeConnector is responsible for sending events to sinks.
+ *
+ * <p>Various network protocols can be supported by implementing different PipeConnector classes.
+ *
+ * <p>The lifecycle of a PipeConnector is as follows:
+ *
+ * <ul>
+ *   <li>When a collaboration task is created, the KV pairs of `WITH CONNECTOR` clause in SQL are
+ *       parsed and the validation method {@link PipeConnector#validate(PipeParameterValidator)}
+ *       will be called to validate the parameters.
+ *   <li>Before the collaboration task starts, the method {@link
+ *       PipeConnector#customize(PipeParameters, PipeConnectorRuntimeConfiguration)} will be called
+ *       to config the runtime behavior of the PipeConnector and the method {@link
+ *       PipeConnector#handshake()} will be called to create a connection with sink.
+ *   <li>While the collaboration task is in progress:
+ *       <ul>
+ *         <li>PipeCollector captures the events and wraps them into three types of Event instances.
+ *         <li>PipeProcessor processes the event and then passes them to the PipeConnector.
+ *         <li>PipeConnector serializes the events into binaries and send them to sinks. The
+ *             following 3 methods will be called: {@link
+ *             PipeConnector#transfer(TabletInsertionEvent)}, {@link
+ *             PipeConnector#transfer(TsFileInsertionEvent)} and {@link
+ *             PipeConnector#transfer(DeletionEvent)}.
+ *       </ul>
+ *   <li>When the collaboration task is cancelled (the `DROP PIPE` command is executed), the {@link
+ *       PipeConnector#close() } method will be called.
+ * </ul>
+ *
+ * <p>In addition, the method {@link PipeConnector#heartbeat()} will be called periodically to check
+ * whether the connection with sink is still alive. The method {@link PipeConnector#handshake()}
+ * will be called to create a new connection with the sink when the method {@link
+ * PipeConnector#heartbeat()} throws exceptions.
+ */
+public interface PipeConnector extends AutoCloseable {
+
+  /**
+   * This method is mainly used to validate {@link PipeParameters} and it is executed before {@link
+   * PipeConnector#customize(PipeParameters, PipeConnectorRuntimeConfiguration)} is called.
+   *
+   * @param validator the validator used to validate {@link PipeParameters}
+   * @throws Exception if any parameter is not valid
+   */
+  void validate(PipeParameterValidator validator) throws Exception;
+
+  /**
+   * This method is mainly used to customize PipeConnector. In this method, the user can do the
+   * following things:
+   *
+   * <ul>
+   *   <li>Use PipeParameters to parse key-value pair attributes entered by the user.
+   *   <li>Set the running configurations in PipeConnectorRuntimeConfiguration.
+   * </ul>
+   *
+   * <p>This method is called after the method {@link
+   * PipeConnector#validate(PipeParameterValidator)} is called and before the method {@link
+   * PipeConnector#handshake()} is called.
+   *
+   * @param parameters used to parse the input parameters entered by the user
+   * @param configuration used to set the required properties of the running PipeConnector
+   * @throws Exception the user can throw errors if necessary
+   */
+  void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration)
+      throws Exception;
+
+  /**
+   * This method is used to create a connection with sink. This method will be called after the
+   * method {@link PipeConnector#customize(PipeParameters, PipeConnectorRuntimeConfiguration)} is
+   * called or will be called when the method {@link PipeConnector#heartbeat()} throws exceptions.
+   *
+   * @throws Exception if the connection is failed to be created
+   */
+  void handshake() throws Exception;
+
+  /**
+   * This method will be called periodically to check whether the connection with sink is still
+   * alive.
+   *
+   * @throws Exception if the connection dies
+   */
+  void heartbeat() throws Exception;
+
+  /**
+   * This method is used to transfer the TabletInsertionEvent.
+   *
+   * @param tabletInsertionEvent TabletInsertionEvent to be transferred
+   * @throws Exception the user can throw errors if necessary
+   */
+  void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception;
+
+  /**
+   * This method is used to transfer the TsFileInsertionEvent.
+   *
+   * @param tsFileInsertionEvent TsFileInsertionEvent to be transferred
+   * @throws Exception the user can throw errors if necessary
+   */
+  void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception;
+
+  /**
+   * This method is used to transfer the DeletionEvent.
+   *
+   * @param deletionEvent DeletionEvent to be transferred
+   * @throws Exception the user can throw errors if necessary
+   */
+  void transfer(DeletionEvent deletionEvent) throws Exception;
+}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
new file mode 100644
index 0000000000..5de989d568
--- /dev/null
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api;
+
+import org.apache.iotdb.pipe.api.collector.EventCollector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+
+/**
+ * PipeProcessor
+ *
+ * <p>PipeProcessor is used to filter and transform the Event formed by the PipeCollector.
+ *
+ * <p>The lifecycle of a PipeProcessor is as follows:
+ *
+ * <ul>
+ *   <li>When a collaboration task is created, the KV pairs of `WITH PROCESSOR` clause in SQL are
+ *       parsed and the validation method {@link PipeProcessor#validate(PipeParameterValidator)}
+ *       will be called to validate the parameters.
+ *   <li>Before the collaboration task starts, the method {@link
+ *       PipeProcessor#customize(PipeParameters, PipeProcessorRuntimeConfiguration)} will be called
+ *       to config the runtime behavior of the PipeProcessor.
+ *   <li>While the collaboration task is in progress:
+ *       <ul>
+ *         <li>PipeCollector captures the events and wraps them into three types of Event instances.
+ *         <li>PipeProcessor processes the event and then passes them to the PipeConnector. The
+ *             following 3 methods will be called: {@link
+ *             PipeProcessor#process(TabletInsertionEvent, EventCollector)}, {@link
+ *             PipeProcessor#process(TsFileInsertionEvent, EventCollector)} and {@link
+ *             PipeProcessor#process(DeletionEvent, EventCollector)}.
+ *         <li>PipeConnector serializes the events into binaries and send them to sinks.
+ *       </ul>
+ *   <li>When the collaboration task is cancelled (the `DROP PIPE` command is executed), the {@link
+ *       PipeProcessor#close() } method will be called.
+ * </ul>
+ */
+public interface PipeProcessor extends AutoCloseable {
+
+  /**
+   * This method is mainly used to validate {@link PipeParameters} and it is executed before {@link
+   * PipeProcessor#customize(PipeParameters, PipeProcessorRuntimeConfiguration)} is called.
+   *
+   * @param validator the validator used to validate {@link PipeParameters}
+   * @throws Exception if any parameter is not valid
+   */
+  void validate(PipeParameterValidator validator) throws Exception;
+
+  /**
+   * This method is mainly used to customize PipeProcessor. In this method, the user can do the
+   * following things:
+   *
+   * <ul>
+   *   <li>Use PipeParameters to parse key-value pair attributes entered by the user.
+   *   <li>Set the running configurations in PipeProcessorRuntimeConfiguration.
+   * </ul>
+   *
+   * <p>This method is called after the method {@link
+   * PipeProcessor#validate(PipeParameterValidator)} is called and before the beginning of the
+   * events processing.
+   *
+   * @param parameters used to parse the input parameters entered by the user
+   * @param configuration used to set the required properties of the running PipeProcessor
+   * @throws Exception the user can throw errors if necessary
+   */
+  void customize(PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration)
+      throws Exception;
+
+  /**
+   * This method is called to process the TabletInsertionEvent.
+   *
+   * @param tabletInsertionEvent TabletInsertionEvent to be processed
+   * @param eventCollector used to collect result events after processing
+   * @throws Exception the user can throw errors if necessary
+   */
+  void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector)
+      throws Exception;
+
+  /**
+   * This method is called to process the TsFileInsertionEvent.
+   *
+   * @param tsFileInsertionEvent TsFileInsertionEvent to be processed
+   * @param eventCollector used to collect result events after processing
+   * @throws Exception the user can throw errors if necessary
+   */
+  void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector)
+      throws Exception;
+
+  /**
+   * This method is called to process the DeletionEvent.
+   *
+   * @param deletionEvent DeletionEvent to be processed
+   * @param eventCollector used to collect result events after processing
+   * @throws Exception the user can throw errors if necessary
+   */
+  void process(DeletionEvent deletionEvent, EventCollector eventCollector) throws Exception;
+}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/Row.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/Row.java
new file mode 100644
index 0000000000..5478263748
--- /dev/null
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/Row.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api.access;
+
+import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
+import org.apache.iotdb.pipe.api.type.Binary;
+import org.apache.iotdb.pipe.api.type.Type;
+import org.apache.iotdb.tsfile.read.common.Path;
+
+import java.io.IOException;
+import java.util.List;
+
+public interface Row {
+
+  /**
+   * Returns the timestamp of this row.
+   *
+   * @return timestamp
+   */
+  long getTime() throws IOException;
+
+  /**
+   * Returns the int value at the specified column in this row.
+   *
+   * <p>Users need to ensure that the data type of the specified column is {@code Type.INT32}.
+   *
+   * @param columnIndex index of the specified column
+   * @return the int value at the specified column in this row
+   */
+  int getInt(int columnIndex) throws IOException;
+
+  /**
+   * Returns the long value at the specified column in this row.
+   *
+   * <p>Users need to ensure that the data type of the specified column is {@code Type.INT64}.
+   *
+   * @param columnIndex index of the specified column
+   * @return the long value at the specified column in this row
+   */
+  long getLong(int columnIndex) throws IOException;
+
+  /**
+   * Returns the float value at the specified column in this row.
+   *
+   * <p>Users need to ensure that the data type of the specified column is {@code Type.FLOAT}.
+   *
+   * @param columnIndex index of the specified column
+   * @return the float value at the specified column in this row
+   */
+  float getFloat(int columnIndex) throws IOException;
+
+  /**
+   * Returns the double value at the specified column in this row.
+   *
+   * <p>Users need to ensure that the data type of the specified column is {@code Type.DOUBLE}.
+   *
+   * @param columnIndex index of the specified column
+   * @return the double value at the specified column in this row
+   */
+  double getDouble(int columnIndex) throws IOException;
+
+  /**
+   * Returns the boolean value at the specified column in this row.
+   *
+   * <p>Users need to ensure that the data type of the specified column is {@code Type.BOOLEAN}.
+   *
+   * @param columnIndex index of the specified column
+   * @return the boolean value at the specified column in this row
+   */
+  boolean getBoolean(int columnIndex) throws IOException;
+
+  /**
+   * Returns the Binary value at the specified column in this row.
+   *
+   * <p>Users need to ensure that the data type of the specified column is {@code Type.TEXT}.
+   *
+   * @param columnIndex index of the specified column
+   * @return the Binary value at the specified column in this row
+   */
+  Binary getBinary(int columnIndex) throws IOException;
+
+  /**
+   * Returns the String value at the specified column in this row.
+   *
+   * <p>Users need to ensure that the data type of the specified column is {@code Type.TEXT}.
+   *
+   * @param columnIndex index of the specified column
+   * @return the String value at the specified column in this row
+   */
+  String getString(int columnIndex) throws IOException;
+
+  /**
+   * Returns the actual data type of the value at the specified column in this row.
+   *
+   * @param columnIndex index of the specified column
+   * @return the actual data type of the value at the specified column in this row
+   */
+  Type getDataType(int columnIndex);
+
+  /**
+   * Returns {@code true} if the value of the specified column is null.
+   *
+   * @param columnIndex index of the specified column
+   * @return {@code true} if the value of the specified column is null
+   */
+  boolean isNull(int columnIndex);
+
+  /**
+   * Returns the number of columns
+   *
+   * @return the number of columns
+   */
+  int size();
+
+  /**
+   * Returns the actual column index of the given column name.
+   *
+   * @param columnName the column name in Path form
+   * @throws PipeParameterNotValidException if the given column name is not existed in the Row
+   * @return the actual column index of the given column name
+   */
+  int getColumnIndex(Path columnName) throws PipeParameterNotValidException;
+
+  /**
+   * Returns the column names in the Row
+   *
+   * @return the column names in the Row
+   */
+  List<Path> getColumnNames();
+
+  /**
+   * Returns the column data types in the Row
+   *
+   * @return the column data types in the Row
+   */
+  List<Type> getColumnTypes();
+}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/RowIterator.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/RowIterator.java
new file mode 100644
index 0000000000..ae9849eb3c
--- /dev/null
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/RowIterator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api.access;
+
+import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
+import org.apache.iotdb.pipe.api.type.Type;
+import org.apache.iotdb.tsfile.read.common.Path;
+
+import java.io.IOException;
+import java.util.List;
+
+public interface RowIterator {
+
+  /**
+   * Returns {@code true} if the iteration has more rows.
+   *
+   * @return {@code true} if the iteration has more rows
+   */
+  boolean hasNextRow();
+
+  /**
+   * Returns the next row in the iteration.
+   *
+   * <p>Note that the Row instance returned by this method each time is the same instance. In other
+   * words, calling {@code next()} will only change the member variables inside the Row instance,
+   * but will not generate a new Row instance.
+   *
+   * @return the next element in the iteration
+   * @throws IOException if any I/O errors occur
+   */
+  Row next() throws IOException;
+
+  /** Resets the iteration. */
+  void reset();
+
+  /**
+   * Returns the actual column index of the given column name.
+   *
+   * @param columnName the column name in Path form
+   * @throws PipeParameterNotValidException if the given column name is not existed
+   * @return the actual column index of the given column name
+   */
+  int getColumnIndex(Path columnName) throws PipeParameterNotValidException;
+
+  /**
+   * Returns the column names
+   *
+   * @return the column names
+   */
+  List<Path> getColumnNames();
+
+  /**
+   * Returns the column data types
+   *
+   * @return the column data types
+   */
+  List<Type> getColumnTypes();
+}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java
new file mode 100644
index 0000000000..17b8421e95
--- /dev/null
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api.collector;
+
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+
+import java.io.IOException;
+
+/**
+ * Used to collect events generated by {@link PipeProcessor#process(TabletInsertionEvent,
+ * EventCollector)}, {@link PipeProcessor#process(TsFileInsertionEvent, EventCollector)} or {@link
+ * PipeProcessor#process(DeletionEvent, EventCollector)}.
+ */
+public interface EventCollector {
+
+  /**
+   * Collects an insertion event in form of TabletInsertionEvent.
+   *
+   * @param event TabletInsertionEvent to be collected
+   * @throws IOException if any I/O errors occur
+   * @see TabletInsertionEvent
+   */
+  void collectTabletInsertionEvent(TabletInsertionEvent event) throws IOException;
+
+  /**
+   * Collects an insertion event in form of TsFileInsertionEvent.
+   *
+   * @param event TsFileInsertionEvent to be collected
+   * @throws IOException if any I/O errors occur
+   * @see TsFileInsertionEvent
+   */
+  void collectTsFileInsertionEvent(TsFileInsertionEvent event) throws IOException;
+
+  /**
+   * Collects a deletion event.
+   *
+   * @param event DeletionEvent to be collected
+   * @throws IOException if any I/O errors occur
+   * @see DeletionEvent
+   */
+  void collectDeletionEvent(DeletionEvent event) throws IOException;
+}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
new file mode 100644
index 0000000000..ee32de275e
--- /dev/null
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api.collector;
+
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+
+import java.io.IOException;
+import java.util.function.BiConsumer;
+
+/**
+ * Used to collect rows generated by {@link TabletInsertionEvent#processRowByRow(BiConsumer)},{@link
+ * TabletInsertionEvent#processByIterator(BiConsumer)} or {@link
+ * TabletInsertionEvent#processTablet(BiConsumer)}.
+ */
+public interface RowCollector {
+
+  /**
+   * Collects a row.
+   *
+   * @param row Row to be collected
+   * @throws IOException if any I/O errors occur
+   * @see Row
+   */
+  void collectRow(Row row) throws IOException;
+}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameterValidator.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameterValidator.java
new file mode 100644
index 0000000000..fa0632104b
--- /dev/null
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameterValidator.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api.customizer;
+
+import org.apache.iotdb.pipe.api.exception.PipeAttributeNotProvidedException;
+import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
+
+public class PipeParameterValidator {
+
+  private final PipeParameters parameters;
+
+  public PipeParameterValidator(PipeParameters parameters) {
+    this.parameters = parameters;
+  }
+
+  public PipeParameters getParameters() {
+    return parameters;
+  }
+
+  /**
+   * Validates whether the attributes entered by the user contain an attribute whose key is
+   * attributeKey.
+   *
+   * @param key key of the attribute
+   * @throws PipeAttributeNotProvidedException if the attribute is not provided
+   */
+  public PipeParameterValidator validateRequiredAttribute(String key)
+      throws PipeAttributeNotProvidedException {
+    if (!parameters.hasAttribute(key)) {
+      throw new PipeAttributeNotProvidedException(key);
+    }
+    return this;
+  }
+
+  /**
+   * Validates the input parameters according to the validation rule given by the user.
+   *
+   * @param validationRule the validation rule, which can be a lambda expression
+   * @param messageToThrow the message to throw when the given argument is not valid
+   * @param argument the given argument
+   * @throws PipeParameterNotValidException if the given argument is not valid
+   */
+  public PipeParameterValidator validate(
+      PipeParameterValidator.SingleObjectValidationRule validationRule,
+      String messageToThrow,
+      Object argument)
+      throws PipeParameterNotValidException {
+    if (!validationRule.validate(argument)) {
+      throw new PipeParameterNotValidException(messageToThrow);
+    }
+    return this;
+  }
+
+  public interface SingleObjectValidationRule {
+
+    boolean validate(Object arg);
+  }
+
+  /**
+   * Validates the input parameters according to the validation rule given by the user.
+   *
+   * @param validationRule the validation rule, which can be a lambda expression
+   * @param messageToThrow the message to throw when the given arguments are not valid
+   * @param arguments the given arguments
+   * @throws PipeParameterNotValidException if the given arguments are not valid
+   */
+  public PipeParameterValidator validate(
+      PipeParameterValidator.MultipleObjectsValidationRule validationRule,
+      String messageToThrow,
+      Object... arguments)
+      throws PipeParameterNotValidException {
+    if (!validationRule.validate(arguments)) {
+      throw new PipeParameterNotValidException(messageToThrow);
+    }
+    return this;
+  }
+
+  public interface MultipleObjectsValidationRule {
+
+    boolean validate(Object... args);
+  }
+}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameters.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameters.java
new file mode 100644
index 0000000000..18dbfdd4ab
--- /dev/null
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameters.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api.customizer;
+
+import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
+
+import java.util.Map;
+
+/**
+ * Used in {@link PipeProcessor#customize(PipeParameters, PipeProcessorRuntimeConfiguration)} and
+ * {@link PipeConnector#customize(PipeParameters, PipeConnectorRuntimeConfiguration)}.
+ *
+ * <p>This class is used to parse the parameters in WITH PROCESSOR and WITH CONNECTOR when creating
+ * a pipe.
+ *
+ * <p>The input parameters is the key-value pair attributes for customization.
+ */
+public class PipeParameters {
+
+  private final Map<String, String> attributes;
+
+  public PipeParameters(Map<String, String> attributes) {
+    this.attributes = attributes;
+  }
+
+  public Map<String, String> getAttribute() {
+    return attributes;
+  }
+
+  public boolean hasAttribute(String key) {
+    return attributes.containsKey(key);
+  }
+
+  public String getString(String key) {
+    return attributes.get(key);
+  }
+
+  public Boolean getBoolean(String key) {
+    String value = attributes.get(key);
+    return value == null ? null : Boolean.parseBoolean(value);
+  }
+
+  public Integer getInt(String key) {
+    String value = attributes.get(key);
+    return value == null ? null : Integer.parseInt(value);
+  }
+
+  public Long getLong(String key) {
+    String value = attributes.get(key);
+    return value == null ? null : Long.parseLong(value);
+  }
+
+  public Float getFloat(String key) {
+    String value = attributes.get(key);
+    return value == null ? null : Float.parseFloat(value);
+  }
+
+  public Double getDouble(String key) {
+    String value = attributes.get(key);
+    return value == null ? null : Double.parseDouble(value);
+  }
+
+  public String getStringOrDefault(String key, String defaultValue) {
+    String value = attributes.get(key);
+    return value == null ? defaultValue : value;
+  }
+
+  public boolean getBooleanOrDefault(String key, boolean defaultValue) {
+    String value = attributes.get(key);
+    return value == null ? defaultValue : Boolean.parseBoolean(value);
+  }
+
+  public int getIntOrDefault(String key, int defaultValue) {
+    String value = attributes.get(key);
+    return value == null ? defaultValue : Integer.parseInt(value);
+  }
+
+  public long getLongOrDefault(String key, long defaultValue) {
+    String value = attributes.get(key);
+    return value == null ? defaultValue : Long.parseLong(value);
+  }
+
+  public float getFloatOrDefault(String key, float defaultValue) {
+    String value = attributes.get(key);
+    return value == null ? defaultValue : Float.parseFloat(value);
+  }
+
+  public double getDoubleOrDefault(String key, double defaultValue) {
+    String value = attributes.get(key);
+    return value == null ? defaultValue : Double.parseDouble(value);
+  }
+}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeRuntimeConfiguration.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeRuntimeConfiguration.java
new file mode 100644
index 0000000000..c75f85bb7a
--- /dev/null
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeRuntimeConfiguration.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api.customizer;
+
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+public interface PipeRuntimeConfiguration {
+
+  /** @throws PipeException if invalid runtime configuration is set */
+  void check() throws PipeException;
+}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeStrategy.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeStrategy.java
new file mode 100644
index 0000000000..433ccc8351
--- /dev/null
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeStrategy.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api.customizer;
+
+import org.apache.iotdb.pipe.api.exception.PipeStrategyNotValidException;
+
+public interface PipeStrategy {
+
+  /** @throws PipeStrategyNotValidException if invalid strategy is set */
+  void check() throws PipeStrategyNotValidException;
+}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/PipeConnectorRuntimeConfiguration.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/PipeConnectorRuntimeConfiguration.java
new file mode 100644
index 0000000000..0841780010
--- /dev/null
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/PipeConnectorRuntimeConfiguration.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api.customizer.connector;
+
+import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.PipeRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.connector.parallel.ParallelStrategy;
+import org.apache.iotdb.pipe.api.customizer.connector.retry.RetryStrategy;
+import org.apache.iotdb.pipe.api.customizer.connector.reuse.ReuseStrategy;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+/**
+ * Used in {@link PipeConnector#customize(PipeParameters, PipeConnectorRuntimeConfiguration)} to customize
+ * the runtime behavior of the PipeConnector.
+ * <p>
+ * Supports calling methods in a chain.
+ * <p>
+ * Sample code:
+ * <pre>{@code
+ * @Override
+ * public void beforeStart(PipeParameters params, PipeConnectorRuntimeConfiguration configs) {
+ *   configs
+ *       .reuseStrategy(X)
+ *       .parallelStrategy(Y)
+ *       .retryStrategy(Z);
+ * }</pre>
+ */
+public class PipeConnectorRuntimeConfiguration implements PipeRuntimeConfiguration {
+
+  private ReuseStrategy reuseStrategy;
+  private ParallelStrategy parallelStrategy;
+  private RetryStrategy retryStrategy;
+
+  public PipeConnectorRuntimeConfiguration reuseStrategy(ReuseStrategy reuseStrategy) {
+    this.reuseStrategy = reuseStrategy;
+    return this;
+  }
+
+  public PipeConnectorRuntimeConfiguration parallelStrategy(ParallelStrategy parallelStrategy) {
+    this.parallelStrategy = parallelStrategy;
+    return this;
+  }
+
+  public PipeConnectorRuntimeConfiguration retryStrategy(RetryStrategy retryStrategy) {
+    this.retryStrategy = retryStrategy;
+    return this;
+  }
+
+  @Override
+  public void check() throws PipeException {
+    if (reuseStrategy == null) {
+      throw new PipeException("ReuseStrategy is not set!");
+    }
+    reuseStrategy.check();
+
+    if (parallelStrategy == null) {
+      throw new PipeException("ParallelStrategy is not set!");
+    }
+    parallelStrategy.check();
+
+    if (retryStrategy == null) {
+      throw new PipeException("RetryStrategy is not set!");
+    }
+    retryStrategy.check();
+  }
+}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/parallel/ParallelStrategy.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/parallel/ParallelStrategy.java
new file mode 100644
index 0000000000..3df93c6c5c
--- /dev/null
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/parallel/ParallelStrategy.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api.customizer.connector.parallel;
+
+import org.apache.iotdb.pipe.api.customizer.PipeStrategy;
+
+public interface ParallelStrategy extends PipeStrategy {}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/EqualRetryIntervalStrategy.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/EqualRetryIntervalStrategy.java
new file mode 100644
index 0000000000..fe693cc6fa
--- /dev/null
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/EqualRetryIntervalStrategy.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api.customizer.connector.retry;
+
+import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.exception.PipeStrategyNotValidException;
+
+/**
+ * Used in {@link PipeConnector#customize(PipeParameters, PipeConnectorRuntimeConfiguration)}. When
+ * the PipeConnector fails to connect to the sink, it will try to reconnect by the specified
+ * strategy.
+ *
+ * <p>When PipeConnector is set to {@link EqualRetryIntervalStrategy}, the interval of waiting time
+ * for retrying will be the same.
+ *
+ * @see PipeConnector
+ * @see PipeConnectorRuntimeConfiguration
+ */
+public class EqualRetryIntervalStrategy implements RetryStrategy {
+
+  private final int maxRetryTimes;
+  private final long retryInterval;
+
+  /**
+   * @param maxRetryTimes maxRetryTimes > 0
+   * @param retryInterval retryInterval > 0
+   */
+  public EqualRetryIntervalStrategy(int maxRetryTimes, long retryInterval) {
+    this.maxRetryTimes = maxRetryTimes;
+    this.retryInterval = retryInterval;
+  }
+
+  @Override
+  public void check() {
+    if (maxRetryTimes <= 0) {
+      throw new PipeStrategyNotValidException(
+          String.format("Parameter maxRetryTimes(%d) should be greater than zero.", maxRetryTimes));
+    }
+    if (retryInterval <= 0) {
+      throw new PipeStrategyNotValidException(
+          String.format("Parameter retryInterval(%d) should be greater than zero.", retryInterval));
+    }
+  }
+}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/ExponentialRetryIntervalStrategy.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/ExponentialRetryIntervalStrategy.java
new file mode 100644
index 0000000000..797372158c
--- /dev/null
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/ExponentialRetryIntervalStrategy.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api.customizer.connector.retry;
+
+import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
+
+/**
+ * Used in {@link PipeConnector#customize(PipeParameters, PipeConnectorRuntimeConfiguration)}. When
+ * the PipeConnector fails to connect to the sink, it will try to reconnect by the specified
+ * strategy.
+ *
+ * <p>When PipeConnector is set to {@link ExponentialRetryIntervalStrategy}, the interval of waiting
+ * time for retrying will be increased exponentially each time.
+ *
+ * @see PipeConnector
+ * @see PipeConnectorRuntimeConfiguration
+ */
+public class ExponentialRetryIntervalStrategy implements RetryStrategy {
+
+  private final int maxRetryTimes;
+  private final long initInterval;
+  private final double backOffFactor;
+
+  /**
+   * @param maxRetryTimes maxRetryTimes > 0
+   * @param initInterval retryInterval > 0
+   * @param backOffFactor backOffFactor > 0
+   */
+  public ExponentialRetryIntervalStrategy(
+      int maxRetryTimes, long initInterval, double backOffFactor) {
+    this.maxRetryTimes = maxRetryTimes;
+    this.initInterval = initInterval;
+    this.backOffFactor = backOffFactor;
+  }
+
+  @Override
+  public void check() {
+    if (maxRetryTimes <= 0) {
+      throw new RuntimeException(
+          String.format("Parameter maxRetryTimes(%d) should be greater than zero.", maxRetryTimes));
+    }
+    if (initInterval <= 0) {
+      throw new RuntimeException(
+          String.format("Parameter retryInterval(%d) should be greater than zero.", initInterval));
+    }
+    if (backOffFactor <= 0) {
+      throw new RuntimeException(
+          String.format("Parameter backOffFactor(%f) should be greater than zero.", backOffFactor));
+    }
+  }
+}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/RetryStrategy.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/RetryStrategy.java
new file mode 100644
index 0000000000..27e9c08d1f
--- /dev/null
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/RetryStrategy.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api.customizer.connector.retry;
+
+import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.PipeStrategy;
+import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
+
+/**
+ * Used to customize the strategy for reconnecting to sinks in {@link
+ * PipeConnector#customize(PipeParameters, PipeConnectorRuntimeConfiguration)}.
+ *
+ * <p>When the PipeConnector fails to connect to the sink, it will try to reconnect by the specified
+ * strategy.
+ */
+public interface RetryStrategy extends PipeStrategy {}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/reuse/ReuseStrategy.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/reuse/ReuseStrategy.java
new file mode 100644
index 0000000000..a103b1db97
--- /dev/null
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/reuse/ReuseStrategy.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api.customizer.connector.reuse;
+
+import org.apache.iotdb.pipe.api.customizer.PipeStrategy;
+
+public interface ReuseStrategy extends PipeStrategy {}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/processor/PipeProcessorRuntimeConfiguration.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/processor/PipeProcessorRuntimeConfiguration.java
new file mode 100644
index 0000000000..a0bbd23033
--- /dev/null
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/processor/PipeProcessorRuntimeConfiguration.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api.customizer.processor;
+
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.PipeRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+/**
+ * Used in {@link PipeProcessor#customize(PipeParameters, PipeProcessorRuntimeConfiguration)} to
+ * customize the runtime behavior of the PipeProcessor.
+ */
+public class PipeProcessorRuntimeConfiguration implements PipeRuntimeConfiguration {
+
+  @Override
+  public void check() throws PipeException {}
+}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java
new file mode 100644
index 0000000000..74ddf9e47d
--- /dev/null
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api.event;
+
+/** This interface is used to abstract events in collaboration tasks. */
+public interface Event {}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/deletion/DeletionEvent.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/deletion/DeletionEvent.java
new file mode 100644
index 0000000000..8a32339cf4
--- /dev/null
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/deletion/DeletionEvent.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api.event.deletion;
+
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+
+/** DeletionEvent is used to define the event of deletion. */
+public interface DeletionEvent extends Event {
+
+  /**
+   * The method is used to get the path pattern of the deleted data.
+   *
+   * @return String
+   */
+  Path getPath();
+
+  /**
+   * The method is used to get the time range of the deleted data.
+   *
+   * @return TimeRange
+   */
+  TimeRange getTimeRange();
+}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/insertion/TabletInsertionEvent.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/insertion/TabletInsertionEvent.java
new file mode 100644
index 0000000000..5880f0fa71
--- /dev/null
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/insertion/TabletInsertionEvent.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api.event.insertion;
+
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.collector.RowCollector;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+
+import java.util.Iterator;
+import java.util.function.BiConsumer;
+
+/** TabletInsertionEvent is used to define the event of data insertion. */
+public interface TabletInsertionEvent extends Event {
+
+  /**
+   * The consumer processes the data row by row and collects the results by RowCollector.
+   *
+   * @return TabletInsertionEvent a new TabletInsertionEvent contains the results collected by the
+   *     RowCollector
+   */
+  TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> consumer);
+
+  /**
+   * The consumer processes the data by the Iterator and collects the results by RowCollector.
+   *
+   * @return TabletInsertionEvent a new TabletInsertionEvent contains the results collected by the
+   *     RowCollector
+   */
+  TabletInsertionEvent processByIterator(BiConsumer<Iterator<Row>, RowCollector> consumer);
+
+  /**
+   * The consumer processes the Tablet directly and collects the results by RowCollector.
+   *
+   * @return TabletInsertionEvent a new TabletInsertionEvent contains the results collected by the
+   *     RowCollector
+   */
+  TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> consumer);
+}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/insertion/TsFileInsertionEvent.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/insertion/TsFileInsertionEvent.java
new file mode 100644
index 0000000000..7f324b8de6
--- /dev/null
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/insertion/TsFileInsertionEvent.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api.event.insertion;
+
+import org.apache.iotdb.pipe.api.event.Event;
+
+/**
+ * TsFileInsertionEvent is used to define the event of writing TsFile. Event data stores in disks,
+ * which is compressed and encoded, and requires IO cost for computational processing.
+ */
+public interface TsFileInsertionEvent extends Event {
+
+  /**
+   * The method is used to convert the TsFileInsertionEvent into several TabletInsertionEvents.
+   *
+   * @return the list of TsFileInsertionEvent
+   */
+  Iterable<TabletInsertionEvent> toTabletInsertionEvents();
+
+  /**
+   * The method is used to compact several TabletInsertionEvents into one TsFileInsertionEvent. The
+   * underlying data in TabletInsertionEvents will be stored into a TsFile.
+   *
+   * @return TsFileInsertionEvent
+   */
+  TsFileInsertionEvent toTsFileInsertionEvent(Iterable<TabletInsertionEvent> iterable);
+}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeAttributeNotProvidedException.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeAttributeNotProvidedException.java
new file mode 100644
index 0000000000..2a529bfdee
--- /dev/null
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeAttributeNotProvidedException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api.exception;
+
+public class PipeAttributeNotProvidedException extends PipeParameterNotValidException {
+
+  public PipeAttributeNotProvidedException(String key) {
+    super(String.format("attribute \"%s\" is required but was not provided.", key));
+  }
+}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeException.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeException.java
new file mode 100644
index 0000000000..9d1638352e
--- /dev/null
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api.exception;
+
+public class PipeException extends RuntimeException {
+
+  public PipeException(String message) {
+    super(message);
+  }
+
+  public PipeException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeParameterNotValidException.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeParameterNotValidException.java
new file mode 100644
index 0000000000..9788bb3f4a
--- /dev/null
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeParameterNotValidException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api.exception;
+
+public class PipeParameterNotValidException extends PipeException {
+
+  public PipeParameterNotValidException(String message) {
+    super(message);
+  }
+}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeStrategyNotValidException.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeStrategyNotValidException.java
new file mode 100644
index 0000000000..2bf4668414
--- /dev/null
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeStrategyNotValidException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api.exception;
+
+public class PipeStrategyNotValidException extends PipeException {
+
+  public PipeStrategyNotValidException(String message) {
+    super(message);
+  }
+}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/type/Binary.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/type/Binary.java
new file mode 100644
index 0000000000..bdf871bf84
--- /dev/null
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/type/Binary.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api.type;
+
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+
+/**
+ * Override compareTo() and equals() function to Binary class. This class is used to accept Java
+ * String type
+ */
+public class Binary implements Comparable<Binary>, Serializable {
+
+  private static final long serialVersionUID = 1250049718612917815L;
+  public static final String STRING_ENCODING = "UTF-8";
+  public static final Charset STRING_CHARSET = Charset.forName(STRING_ENCODING);
+
+  private final byte[] values;
+
+  private int hash;
+
+  // indicate whether hash has been calculated
+  private boolean hasCalculatedHash;
+
+  private String stringCache;
+
+  /** if the bytes v is modified, the modification is visible to this binary. */
+  public Binary(byte[] v) {
+    this.values = v;
+  }
+
+  public Binary(String s) {
+    this.values = (s == null) ? null : s.getBytes(STRING_CHARSET);
+  }
+
+  public static Binary valueOf(String value) {
+    return new Binary(stringToBytes(value));
+  }
+
+  @Override
+  public int compareTo(Binary other) {
+    if (other == null) {
+      if (this.values == null) {
+        return 0;
+      } else {
+        return 1;
+      }
+    }
+
+    int i = 0;
+    while (i < getLength() && i < other.getLength()) {
+      if (this.values[i] == other.values[i]) {
+        i++;
+        continue;
+      }
+      return this.values[i] - other.values[i];
+    }
+    return getLength() - other.getLength();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    }
+    if (other == null) {
+      return false;
+    }
+    if (getClass() != other.getClass()) {
+      return false;
+    }
+
+    return compareTo((Binary) other) == 0;
+  }
+
+  @Override
+  public int hashCode() {
+    if (!hasCalculatedHash) {
+      hash = Arrays.hashCode(values);
+      hasCalculatedHash = true;
+    }
+    return hash;
+  }
+
+  /**
+   * Get length of values. Returns -1 if values is null.
+   *
+   * @return length
+   */
+  public int getLength() {
+    if (this.values == null) {
+      return -1;
+    }
+    return this.values.length;
+  }
+
+  public boolean isNull() {
+    return values == null;
+  }
+
+  public String getStringValue() {
+    if (values == null) {
+      return null;
+    }
+    if (stringCache == null) {
+      stringCache = new String(this.values, STRING_CHARSET);
+    }
+    return stringCache;
+  }
+
+  public String getTextEncodingType() {
+    return STRING_ENCODING;
+  }
+
+  @Override
+  public String toString() {
+    return getStringValue();
+  }
+
+  public byte[] getValues() {
+    return values;
+  }
+
+  /**
+   * convert string to byte array using UTF-8 encoding.
+   *
+   * @param str input string
+   * @return byte array
+   */
+  public static byte[] stringToBytes(String str) {
+    return str.getBytes(STRING_CHARSET);
+  }
+}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/type/Type.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/type/Type.java
new file mode 100644
index 0000000000..362355bcbc
--- /dev/null
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/type/Type.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api.type;
+
+public enum Type {
+  /** BOOLEAN */
+  BOOLEAN((byte) 0),
+
+  /** INT32 */
+  INT32((byte) 1),
+
+  /** INT64 */
+  INT64((byte) 2),
+
+  /** FLOAT */
+  FLOAT((byte) 3),
+
+  /** DOUBLE */
+  DOUBLE((byte) 4),
+
+  /** TEXT */
+  TEXT((byte) 5);
+
+  private final byte type;
+
+  Type(byte type) {
+    this.type = type;
+  }
+
+  public byte getType() {
+    return type;
+  }
+}
diff --git a/pom.xml b/pom.xml
index d46136dc3c..36f770645e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -125,6 +125,7 @@
         <module>schema-engine-tag</module>
         <module>isession</module>
         <module>mlnode</module>
+        <module>pipe-api</module>
     </modules>
     <!-- Properties Management -->
     <properties>