You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/08/05 20:22:23 UTC

[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6265: NIFI-10234 Implement PutIoTDB

exceptionfactory commented on code in PR #6265:
URL: https://github.com/apache/nifi/pull/6265#discussion_r939172193


##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/pom.xml:
##########
@@ -0,0 +1,116 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-iotdb-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.18.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.nifi</groupId>
+    <artifactId>nifi-iotdb-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-session</artifactId>
+            <version>${iotdb.sdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-server</artifactId>
+            <version>${iotdb.sdk.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-server</artifactId>
+            <version>${iotdb.sdk.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <version>${project.version}</version>

Review Comment:
   This dependency should be marked with a `test` scope.



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/pom.xml:
##########
@@ -0,0 +1,116 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-iotdb-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.18.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.nifi</groupId>
+    <artifactId>nifi-iotdb-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-session</artifactId>
+            <version>${iotdb.sdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-server</artifactId>
+            <version>${iotdb.sdk.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-server</artifactId>
+            <version>${iotdb.sdk.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-dbcp-service-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <version>${project.version}</version>
+            <type>nar</type>
+        </dependency>

Review Comment:
   This dependency must be removed to `nifi-iotdb-nar`, instead of this `jar` module.



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-nar/pom.xml:
##########
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-iotdb-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.18.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.nifi</groupId>
+    <artifactId>nifi-iotdb-nar</artifactId>
+    <packaging>nar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-iotdb-processors</artifactId>
+        </dependency>

Review Comment:
   This module needs a dependency on `nifi-standard-services-api-nar` of type `nar` in order to access standard NiFi Controller Services at runtime.



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.text.SimpleDateFormat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processors.model.Field;
+import org.apache.nifi.processors.model.Schema;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.Tuple;
+
+public abstract class AbstractIoTDB extends AbstractProcessor {
+    private static final int DEFAULT_IOTDB_PORT = 6667;
+
+    private static Map<RecordFieldType, TSDataType> typeMap =
+            new HashMap<>();
+
+    static final Set<RecordFieldType> supportedType =
+            new HashSet<>();
+
+    static final Set<RecordFieldType> supportedTimeType =
+            new HashSet<>();
+
+    private static final String[] STRING_TIME_FORMAT =
+            new String[]{
+                    "yyyy-MM-dd HH:mm:ss.SSSX",
+                    "yyyy/MM/dd HH:mm:ss.SSSX",
+                    "yyyy.MM.dd HH:mm:ss.SSSX",
+                    "yyyy-MM-dd HH:mm:ssX",
+                    "yyyy/MM/dd HH:mm:ssX",
+                    "yyyy.MM.dd HH:mm:ssX",
+                    "yyyy-MM-dd HH:mm:ss.SSSz",
+                    "yyyy/MM/dd HH:mm:ss.SSSz",
+                    "yyyy.MM.dd HH:mm:ss.SSSz",
+                    "yyyy-MM-dd HH:mm:ssz",
+                    "yyyy/MM/dd HH:mm:ssz",
+                    "yyyy.MM.dd HH:mm:ssz",
+                    "yyyy-MM-dd HH:mm:ss.SSS",
+                    "yyyy/MM/dd HH:mm:ss.SSS",
+                    "yyyy.MM.dd HH:mm:ss.SSS",
+                    "yyyy-MM-dd HH:mm:ss",
+                    "yyyy/MM/dd HH:mm:ss",
+                    "yyyy.MM.dd HH:mm:ss",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSX",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSX",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSX",
+                    "yyyy-MM-dd'T'HH:mm:ssX",
+                    "yyyy/MM/dd'T'HH:mm:ssX",
+                    "yyyy.MM.dd'T'HH:mm:ssX",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSz",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSz",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSz",
+                    "yyyy-MM-dd'T'HH:mm:ssz",
+                    "yyyy/MM/dd'T'HH:mm:ssz",
+                    "yyyy.MM.dd'T'HH:mm:ssz",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSS",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSS",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSS",
+                    "yyyy-MM-dd'T'HH:mm:ss",
+                    "yyyy/MM/dd'T'HH:mm:ss",
+                    "yyyy.MM.dd'T'HH:mm:ss"
+            };
+
+    static final PropertyDescriptor IOTDB_HOST =
+            new PropertyDescriptor.Builder()
+                    .name("Host")
+                    .description("The host of IoTDB.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor IOTDB_PORT =
+            new PropertyDescriptor.Builder()
+                    .name("Port")
+                    .description("The port of IoTDB.")
+                    .defaultValue(String.valueOf(DEFAULT_IOTDB_PORT))
+                    .addValidator(StandardValidators.PORT_VALIDATOR)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor USERNAME =
+            new PropertyDescriptor.Builder()
+                    .name("Username")
+                    .description("Username to access the IoTDB.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor PASSWORD =
+            new PropertyDescriptor.Builder()
+                    .name("Password")
+                    .description("Password to access the IoTDB.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(true)
+                    .sensitive(true)
+                    .build();
+
+    protected static List<PropertyDescriptor> descriptors = new ArrayList<>();
+
+    protected static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("files that were successfully processed")
+                    .build();
+    protected static final Relationship REL_FAILURE =
+            new Relationship.Builder()
+                    .name("failure")
+                    .description("files that were not successfully processed")
+                    .build();
+
+    protected static Set<Relationship> relationships = new HashSet<>();
+
+    static {
+        descriptors.add(IOTDB_HOST);
+        descriptors.add(IOTDB_PORT);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+
+        typeMap.put(RecordFieldType.STRING, TSDataType.TEXT);
+        typeMap.put(RecordFieldType.BOOLEAN, TSDataType.BOOLEAN);
+        typeMap.put(RecordFieldType.INT, TSDataType.INT32);
+        typeMap.put(RecordFieldType.LONG, TSDataType.INT64);
+        typeMap.put(RecordFieldType.FLOAT, TSDataType.FLOAT);
+        typeMap.put(RecordFieldType.DOUBLE, TSDataType.DOUBLE);
+
+        supportedType.add(RecordFieldType.BOOLEAN);
+        supportedType.add(RecordFieldType.STRING);
+        supportedType.add(RecordFieldType.INT);
+        supportedType.add(RecordFieldType.LONG);
+        supportedType.add(RecordFieldType.FLOAT);
+        supportedType.add(RecordFieldType.DOUBLE);
+
+        supportedTimeType.add(RecordFieldType.STRING);
+        supportedTimeType.add(RecordFieldType.LONG);
+    }
+
+    protected final AtomicReference<Session> session = new AtomicReference<>(null);
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        connectToIoTDB(context);
+    }
+
+    void connectToIoTDB(ProcessContext context) {
+        if (session.get() == null) {
+            final String host, username, password;
+            final int port;
+
+            host = context.getProperty(IOTDB_HOST).getValue();
+            port = Integer.parseInt(context.getProperty(IOTDB_PORT).getValue());
+            username = context.getProperty(USERNAME).getValue();
+            password = context.getProperty(PASSWORD).getValue();
+
+            session.set(
+                    new Session.Builder()
+                            .host(host)
+                            .port(port)
+                            .username(username)
+                            .password(password)
+                            .build());
+            try {
+                session.get().open();
+            } catch (IoTDBConnectionException e) {
+                getLogger().error(e.getMessage());

Review Comment:
   This should be changed to log a message with the exception:
   ```suggestion
                   getLogger().error("IoTDB connection failed", e);
   ```



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDB.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.io.InputStream;
+import java.text.SimpleDateFormat;
+import java.util.Set;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.processors.model.Schema;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.Tuple;
+
+@Tags({"iotdb", "insert", "tablet"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "This is a record aware processor that reads the content of the incoming FlowFile as individual records using the "
+                + "configured 'Record Reader' and writes them to Apache IoTDB using native interface.")
+public class PutIoTDB extends AbstractIoTDB {
+
+    static final PropertyDescriptor RECORD_READER_FACTORY =
+            new PropertyDescriptor.Builder()
+                    .name("Record Reader")
+                    .description(
+                            "Specifies the type of Record Reader controller service to use for parsing the incoming data "
+                                    + "and determining the schema")
+                    .identifiesControllerService(RecordReaderFactory.class)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor TIME_FIELD =
+            new PropertyDescriptor.Builder()
+                    .name("Time Field")
+                    .description(
+                            "The field name which represents time. It can be updated by expression language.")
+                    .defaultValue("Time")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor SCHEMA =
+            new PropertyDescriptor.Builder()
+                    .name("Schema")
+                    .description(
+                            "The schema that IoTDB needs doesn't support good by NiFi.\n"
+                                    + "Therefore, you can define the schema here.\n"
+                                    + "Besides, you can set encoding type and compression type by this method.\n"
+                                    + "If you don't set this property, the inferred schema will be used.\n"
+                                    + "It can be updated by expression language.")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor ALIGNED =
+            new PropertyDescriptor.Builder()
+                    .name("Aligned")
+                    .description("Whether using aligned interface? It can be updated by expression language.")
+                    .allowableValues(new String[] {"true", "false"})
+                    .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor MAX_ROW_NUMBER =
+            new PropertyDescriptor.Builder()
+                    .name("Max Row Number")
+                    .description(
+                            "Specifies the max row number of each tablet. It can be updated by expression language.")

Review Comment:
   ```suggestion
                               "Specifies the max row number of each Apache IoTDB Tablet")
   ```



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDB.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.io.InputStream;
+import java.text.SimpleDateFormat;
+import java.util.Set;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.processors.model.Schema;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.Tuple;
+
+@Tags({"iotdb", "insert", "tablet"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "This is a record aware processor that reads the content of the incoming FlowFile as individual records using the "
+                + "configured 'Record Reader' and writes them to Apache IoTDB using native interface.")
+public class PutIoTDB extends AbstractIoTDB {
+
+    static final PropertyDescriptor RECORD_READER_FACTORY =
+            new PropertyDescriptor.Builder()
+                    .name("Record Reader")
+                    .description(
+                            "Specifies the type of Record Reader controller service to use for parsing the incoming data "
+                                    + "and determining the schema")
+                    .identifiesControllerService(RecordReaderFactory.class)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor TIME_FIELD =
+            new PropertyDescriptor.Builder()
+                    .name("Time Field")
+                    .description(
+                            "The field name which represents time. It can be updated by expression language.")
+                    .defaultValue("Time")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor SCHEMA =
+            new PropertyDescriptor.Builder()
+                    .name("Schema")
+                    .description(
+                            "The schema that IoTDB needs doesn't support good by NiFi.\n"
+                                    + "Therefore, you can define the schema here.\n"
+                                    + "Besides, you can set encoding type and compression type by this method.\n"
+                                    + "If you don't set this property, the inferred schema will be used.\n"
+                                    + "It can be updated by expression language.")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor ALIGNED =
+            new PropertyDescriptor.Builder()
+                    .name("Aligned")
+                    .description("Whether using aligned interface? It can be updated by expression language.")
+                    .allowableValues(new String[] {"true", "false"})
+                    .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor MAX_ROW_NUMBER =
+            new PropertyDescriptor.Builder()
+                    .name("Max Row Number")
+                    .description(
+                            "Specifies the max row number of each tablet. It can be updated by expression language.")
+                    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static {
+        descriptors.add(RECORD_READER_FACTORY);
+        descriptors.add(TIME_FIELD);
+        descriptors.add(SCHEMA);
+        descriptors.add(ALIGNED);
+        descriptors.add(MAX_ROW_NUMBER);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext processContext, ProcessSession processSession)
+            throws ProcessException {
+        final RecordReaderFactory recordParserFactory =
+                processContext
+                        .getProperty(RECORD_READER_FACTORY)
+                        .asControllerService(RecordReaderFactory.class);
+
+        String timeFieldProperty = processContext.getProperty(TIME_FIELD).getValue();
+        String schemaProperty = processContext.getProperty(SCHEMA).getValue();
+        String alignedProperty = processContext.getProperty(ALIGNED).getValue();
+        String maxRowNumberProperty = processContext.getProperty(MAX_ROW_NUMBER).getValue();
+
+        FlowFile flowFile = processSession.get();
+
+        String timeField = timeFieldProperty != null ? timeFieldProperty : "Time";
+        Boolean aligned = alignedProperty != null ? Boolean.valueOf(alignedProperty) : false;
+        int maxRowNumber = maxRowNumberProperty != null ? Integer.valueOf(maxRowNumberProperty) : 1024;
+
+        try (final InputStream inputStream = processSession.read(flowFile);
+             final RecordReader recordReader =
+                     recordParserFactory.createRecordReader(flowFile, inputStream, getLogger())) {
+            if (flowFile == null) {
+                inputStream.close();
+                recordReader.close();
+                processSession.transfer(flowFile, REL_SUCCESS);
+                return;
+            }
+
+            HashMap<String, Tablet> tablets;
+            boolean needInitFormatter;
+            Schema schema;
+            Tuple<Boolean, String> result;
+
+            result =
+                    schemaProperty != null
+                            ? validateSchemaAttribute(schemaProperty)
+                            : validateSchema(recordReader.getSchema());
+
+            if (!result.getKey()) {
+                getLogger().error(result.getValue());
+                inputStream.close();
+                recordReader.close();
+                processSession.transfer(flowFile, REL_FAILURE);
+                return;
+            } else {
+                if (result.getValue() != null) {
+                    getLogger().warn(result.getValue());
+                }
+            }
+
+            schema =
+                    schemaProperty != null
+                            ? new ObjectMapper().readValue(schemaProperty, Schema.class)

Review Comment:
   `ObjectMapper` should be declared as a static member and reused.



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/model/Schema.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.model;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.stream.Collectors;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+public class Schema {

Review Comment:
   It would be helpful to name this class more specifically given the various types of schemas in the context of Apache NiFi. What do you think of `DatabaseSchema` or `TimeDatabaseSchema`?



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDB.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.io.InputStream;
+import java.text.SimpleDateFormat;
+import java.util.Set;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.processors.model.Schema;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.Tuple;
+
+@Tags({"iotdb", "insert", "tablet"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "This is a record aware processor that reads the content of the incoming FlowFile as individual records using the "
+                + "configured 'Record Reader' and writes them to Apache IoTDB using native interface.")
+public class PutIoTDB extends AbstractIoTDB {
+
+    static final PropertyDescriptor RECORD_READER_FACTORY =
+            new PropertyDescriptor.Builder()
+                    .name("Record Reader")
+                    .description(
+                            "Specifies the type of Record Reader controller service to use for parsing the incoming data "
+                                    + "and determining the schema")
+                    .identifiesControllerService(RecordReaderFactory.class)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor TIME_FIELD =
+            new PropertyDescriptor.Builder()
+                    .name("Time Field")
+                    .description(
+                            "The field name which represents time. It can be updated by expression language.")
+                    .defaultValue("Time")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor SCHEMA =
+            new PropertyDescriptor.Builder()
+                    .name("Schema")
+                    .description(
+                            "The schema that IoTDB needs doesn't support good by NiFi.\n"
+                                    + "Therefore, you can define the schema here.\n"
+                                    + "Besides, you can set encoding type and compression type by this method.\n"
+                                    + "If you don't set this property, the inferred schema will be used.\n"
+                                    + "It can be updated by expression language.")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor ALIGNED =
+            new PropertyDescriptor.Builder()
+                    .name("Aligned")
+                    .description("Whether using aligned interface? It can be updated by expression language.")
+                    .allowableValues(new String[] {"true", "false"})
+                    .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor MAX_ROW_NUMBER =
+            new PropertyDescriptor.Builder()
+                    .name("Max Row Number")
+                    .description(
+                            "Specifies the max row number of each tablet. It can be updated by expression language.")
+                    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static {
+        descriptors.add(RECORD_READER_FACTORY);
+        descriptors.add(TIME_FIELD);
+        descriptors.add(SCHEMA);
+        descriptors.add(ALIGNED);
+        descriptors.add(MAX_ROW_NUMBER);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext processContext, ProcessSession processSession)
+            throws ProcessException {
+        final RecordReaderFactory recordParserFactory =
+                processContext
+                        .getProperty(RECORD_READER_FACTORY)
+                        .asControllerService(RecordReaderFactory.class);
+
+        String timeFieldProperty = processContext.getProperty(TIME_FIELD).getValue();
+        String schemaProperty = processContext.getProperty(SCHEMA).getValue();
+        String alignedProperty = processContext.getProperty(ALIGNED).getValue();
+        String maxRowNumberProperty = processContext.getProperty(MAX_ROW_NUMBER).getValue();
+
+        FlowFile flowFile = processSession.get();
+
+        String timeField = timeFieldProperty != null ? timeFieldProperty : "Time";
+        Boolean aligned = alignedProperty != null ? Boolean.valueOf(alignedProperty) : false;
+        int maxRowNumber = maxRowNumberProperty != null ? Integer.valueOf(maxRowNumberProperty) : 1024;
+
+        try (final InputStream inputStream = processSession.read(flowFile);
+             final RecordReader recordReader =
+                     recordParserFactory.createRecordReader(flowFile, inputStream, getLogger())) {
+            if (flowFile == null) {
+                inputStream.close();
+                recordReader.close();
+                processSession.transfer(flowFile, REL_SUCCESS);
+                return;
+            }
+
+            HashMap<String, Tablet> tablets;
+            boolean needInitFormatter;
+            Schema schema;
+            Tuple<Boolean, String> result;
+
+            result =
+                    schemaProperty != null
+                            ? validateSchemaAttribute(schemaProperty)
+                            : validateSchema(recordReader.getSchema());
+
+            if (!result.getKey()) {
+                getLogger().error(result.getValue());
+                inputStream.close();
+                recordReader.close();
+                processSession.transfer(flowFile, REL_FAILURE);
+                return;
+            } else {
+                if (result.getValue() != null) {
+                    getLogger().warn(result.getValue());
+                }
+            }

Review Comment:
   All log messages should have a readable message, in addition to passing the relevant value.



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDB.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.io.InputStream;
+import java.text.SimpleDateFormat;
+import java.util.Set;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.processors.model.Schema;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.Tuple;
+
+@Tags({"iotdb", "insert", "tablet"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "This is a record aware processor that reads the content of the incoming FlowFile as individual records using the "
+                + "configured 'Record Reader' and writes them to Apache IoTDB using native interface.")
+public class PutIoTDB extends AbstractIoTDB {
+
+    static final PropertyDescriptor RECORD_READER_FACTORY =
+            new PropertyDescriptor.Builder()
+                    .name("Record Reader")
+                    .description(
+                            "Specifies the type of Record Reader controller service to use for parsing the incoming data "
+                                    + "and determining the schema")
+                    .identifiesControllerService(RecordReaderFactory.class)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor TIME_FIELD =
+            new PropertyDescriptor.Builder()
+                    .name("Time Field")
+                    .description(
+                            "The field name which represents time. It can be updated by expression language.")
+                    .defaultValue("Time")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor SCHEMA =
+            new PropertyDescriptor.Builder()
+                    .name("Schema")
+                    .description(
+                            "The schema that IoTDB needs doesn't support good by NiFi.\n"
+                                    + "Therefore, you can define the schema here.\n"
+                                    + "Besides, you can set encoding type and compression type by this method.\n"
+                                    + "If you don't set this property, the inferred schema will be used.\n"
+                                    + "It can be updated by expression language.")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor ALIGNED =
+            new PropertyDescriptor.Builder()
+                    .name("Aligned")
+                    .description("Whether using aligned interface? It can be updated by expression language.")
+                    .allowableValues(new String[] {"true", "false"})
+                    .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor MAX_ROW_NUMBER =
+            new PropertyDescriptor.Builder()
+                    .name("Max Row Number")
+                    .description(
+                            "Specifies the max row number of each tablet. It can be updated by expression language.")
+                    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static {
+        descriptors.add(RECORD_READER_FACTORY);
+        descriptors.add(TIME_FIELD);
+        descriptors.add(SCHEMA);
+        descriptors.add(ALIGNED);
+        descriptors.add(MAX_ROW_NUMBER);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext processContext, ProcessSession processSession)
+            throws ProcessException {
+        final RecordReaderFactory recordParserFactory =
+                processContext
+                        .getProperty(RECORD_READER_FACTORY)
+                        .asControllerService(RecordReaderFactory.class);
+
+        String timeFieldProperty = processContext.getProperty(TIME_FIELD).getValue();
+        String schemaProperty = processContext.getProperty(SCHEMA).getValue();
+        String alignedProperty = processContext.getProperty(ALIGNED).getValue();
+        String maxRowNumberProperty = processContext.getProperty(MAX_ROW_NUMBER).getValue();
+
+        FlowFile flowFile = processSession.get();
+
+        String timeField = timeFieldProperty != null ? timeFieldProperty : "Time";
+        Boolean aligned = alignedProperty != null ? Boolean.valueOf(alignedProperty) : false;
+        int maxRowNumber = maxRowNumberProperty != null ? Integer.valueOf(maxRowNumberProperty) : 1024;
+
+        try (final InputStream inputStream = processSession.read(flowFile);
+             final RecordReader recordReader =
+                     recordParserFactory.createRecordReader(flowFile, inputStream, getLogger())) {
+            if (flowFile == null) {
+                inputStream.close();
+                recordReader.close();
+                processSession.transfer(flowFile, REL_SUCCESS);
+                return;
+            }
+
+            HashMap<String, Tablet> tablets;
+            boolean needInitFormatter;
+            Schema schema;
+            Tuple<Boolean, String> result;
+
+            result =
+                    schemaProperty != null
+                            ? validateSchemaAttribute(schemaProperty)
+                            : validateSchema(recordReader.getSchema());
+
+            if (!result.getKey()) {
+                getLogger().error(result.getValue());
+                inputStream.close();
+                recordReader.close();
+                processSession.transfer(flowFile, REL_FAILURE);
+                return;
+            } else {
+                if (result.getValue() != null) {
+                    getLogger().warn(result.getValue());
+                }
+            }
+
+            schema =
+                    schemaProperty != null
+                            ? new ObjectMapper().readValue(schemaProperty, Schema.class)
+                            : convertSchema(recordReader.getSchema());
+
+            List<String> fieldNames = schema.getFieldNames();
+
+            needInitFormatter = schema.getTimeType() != Schema.TimeType.LONG;
+
+            tablets = generateTablets(schema, maxRowNumber);
+            SimpleDateFormat timeFormatter = null;
+
+            Record record;
+
+            while ((record = recordReader.nextRecord()) != null) {
+                Object[] values = record.getValues();
+                if (timeFormatter == null && needInitFormatter) {
+                    timeFormatter = initFormatter((String) values[0]);
+                    if (timeFormatter == null) {
+                        getLogger().error("The format of time is not supported.");
+                        inputStream.close();
+                        recordReader.close();
+                        processSession.transfer(flowFile, REL_FAILURE);
+                        return;
+                    }
+                }
+
+                long timestamp;
+                if (needInitFormatter) {
+                    timestamp = timeFormatter.parse((String) values[0]).getTime();
+                } else {
+                    timestamp = (Long) values[0];
+                }
+
+                boolean flag = false;

Review Comment:
   The purpose of this variable is somewhat unclear based on the name.



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.text.SimpleDateFormat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processors.model.Field;
+import org.apache.nifi.processors.model.Schema;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.Tuple;
+
+public abstract class AbstractIoTDB extends AbstractProcessor {
+    private static final int DEFAULT_IOTDB_PORT = 6667;
+
+    private static Map<RecordFieldType, TSDataType> typeMap =
+            new HashMap<>();
+
+    static final Set<RecordFieldType> supportedType =
+            new HashSet<>();
+
+    static final Set<RecordFieldType> supportedTimeType =
+            new HashSet<>();
+
+    private static final String[] STRING_TIME_FORMAT =
+            new String[]{
+                    "yyyy-MM-dd HH:mm:ss.SSSX",
+                    "yyyy/MM/dd HH:mm:ss.SSSX",
+                    "yyyy.MM.dd HH:mm:ss.SSSX",
+                    "yyyy-MM-dd HH:mm:ssX",
+                    "yyyy/MM/dd HH:mm:ssX",
+                    "yyyy.MM.dd HH:mm:ssX",
+                    "yyyy-MM-dd HH:mm:ss.SSSz",
+                    "yyyy/MM/dd HH:mm:ss.SSSz",
+                    "yyyy.MM.dd HH:mm:ss.SSSz",
+                    "yyyy-MM-dd HH:mm:ssz",
+                    "yyyy/MM/dd HH:mm:ssz",
+                    "yyyy.MM.dd HH:mm:ssz",
+                    "yyyy-MM-dd HH:mm:ss.SSS",
+                    "yyyy/MM/dd HH:mm:ss.SSS",
+                    "yyyy.MM.dd HH:mm:ss.SSS",
+                    "yyyy-MM-dd HH:mm:ss",
+                    "yyyy/MM/dd HH:mm:ss",
+                    "yyyy.MM.dd HH:mm:ss",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSX",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSX",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSX",
+                    "yyyy-MM-dd'T'HH:mm:ssX",
+                    "yyyy/MM/dd'T'HH:mm:ssX",
+                    "yyyy.MM.dd'T'HH:mm:ssX",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSz",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSz",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSz",
+                    "yyyy-MM-dd'T'HH:mm:ssz",
+                    "yyyy/MM/dd'T'HH:mm:ssz",
+                    "yyyy.MM.dd'T'HH:mm:ssz",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSS",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSS",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSS",
+                    "yyyy-MM-dd'T'HH:mm:ss",
+                    "yyyy/MM/dd'T'HH:mm:ss",
+                    "yyyy.MM.dd'T'HH:mm:ss"
+            };
+
+    static final PropertyDescriptor IOTDB_HOST =
+            new PropertyDescriptor.Builder()
+                    .name("Host")
+                    .description("The host of IoTDB.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor IOTDB_PORT =
+            new PropertyDescriptor.Builder()
+                    .name("Port")
+                    .description("The port of IoTDB.")
+                    .defaultValue(String.valueOf(DEFAULT_IOTDB_PORT))
+                    .addValidator(StandardValidators.PORT_VALIDATOR)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor USERNAME =
+            new PropertyDescriptor.Builder()
+                    .name("Username")
+                    .description("Username to access the IoTDB.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor PASSWORD =
+            new PropertyDescriptor.Builder()
+                    .name("Password")
+                    .description("Password to access the IoTDB.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(true)
+                    .sensitive(true)
+                    .build();
+
+    protected static List<PropertyDescriptor> descriptors = new ArrayList<>();
+
+    protected static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("files that were successfully processed")
+                    .build();
+    protected static final Relationship REL_FAILURE =
+            new Relationship.Builder()
+                    .name("failure")
+                    .description("files that were not successfully processed")
+                    .build();
+
+    protected static Set<Relationship> relationships = new HashSet<>();
+
+    static {
+        descriptors.add(IOTDB_HOST);
+        descriptors.add(IOTDB_PORT);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+
+        typeMap.put(RecordFieldType.STRING, TSDataType.TEXT);
+        typeMap.put(RecordFieldType.BOOLEAN, TSDataType.BOOLEAN);
+        typeMap.put(RecordFieldType.INT, TSDataType.INT32);
+        typeMap.put(RecordFieldType.LONG, TSDataType.INT64);
+        typeMap.put(RecordFieldType.FLOAT, TSDataType.FLOAT);
+        typeMap.put(RecordFieldType.DOUBLE, TSDataType.DOUBLE);
+
+        supportedType.add(RecordFieldType.BOOLEAN);
+        supportedType.add(RecordFieldType.STRING);
+        supportedType.add(RecordFieldType.INT);
+        supportedType.add(RecordFieldType.LONG);
+        supportedType.add(RecordFieldType.FLOAT);
+        supportedType.add(RecordFieldType.DOUBLE);
+
+        supportedTimeType.add(RecordFieldType.STRING);
+        supportedTimeType.add(RecordFieldType.LONG);
+    }
+
+    protected final AtomicReference<Session> session = new AtomicReference<>(null);
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        connectToIoTDB(context);
+    }
+
+    void connectToIoTDB(ProcessContext context) {
+        if (session.get() == null) {
+            final String host, username, password;
+            final int port;
+
+            host = context.getProperty(IOTDB_HOST).getValue();
+            port = Integer.parseInt(context.getProperty(IOTDB_PORT).getValue());
+            username = context.getProperty(USERNAME).getValue();
+            password = context.getProperty(PASSWORD).getValue();
+
+            session.set(
+                    new Session.Builder()
+                            .host(host)
+                            .port(port)
+                            .username(username)
+                            .password(password)
+                            .build());
+            try {
+                session.get().open();
+            } catch (IoTDBConnectionException e) {

Review Comment:
   In addition to logging the message, it would be best to throw an exception, because the processor will not be functional without a working connection to IoTDB.



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/pom.xml:
##########
@@ -0,0 +1,116 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-iotdb-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.18.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.nifi</groupId>
+    <artifactId>nifi-iotdb-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-session</artifactId>
+            <version>${iotdb.sdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-server</artifactId>
+            <version>${iotdb.sdk.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-server</artifactId>
+            <version>${iotdb.sdk.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-dbcp-service-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <version>${project.version}</version>
+            <type>nar</type>
+        </dependency>
+
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <version>${jackson.bom.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+            <version>${jackson.bom.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>${jackson.bom.version}</version>

Review Comment:
   The `version` property can be removed since the Bill-of-Materials dependency in the root configuration manages the version.



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.text.SimpleDateFormat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processors.model.Field;
+import org.apache.nifi.processors.model.Schema;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.Tuple;
+
+public abstract class AbstractIoTDB extends AbstractProcessor {
+    private static final int DEFAULT_IOTDB_PORT = 6667;
+
+    private static Map<RecordFieldType, TSDataType> typeMap =
+            new HashMap<>();
+
+    static final Set<RecordFieldType> supportedType =
+            new HashSet<>();
+
+    static final Set<RecordFieldType> supportedTimeType =
+            new HashSet<>();
+
+    private static final String[] STRING_TIME_FORMAT =
+            new String[]{
+                    "yyyy-MM-dd HH:mm:ss.SSSX",
+                    "yyyy/MM/dd HH:mm:ss.SSSX",
+                    "yyyy.MM.dd HH:mm:ss.SSSX",
+                    "yyyy-MM-dd HH:mm:ssX",
+                    "yyyy/MM/dd HH:mm:ssX",
+                    "yyyy.MM.dd HH:mm:ssX",
+                    "yyyy-MM-dd HH:mm:ss.SSSz",
+                    "yyyy/MM/dd HH:mm:ss.SSSz",
+                    "yyyy.MM.dd HH:mm:ss.SSSz",
+                    "yyyy-MM-dd HH:mm:ssz",
+                    "yyyy/MM/dd HH:mm:ssz",
+                    "yyyy.MM.dd HH:mm:ssz",
+                    "yyyy-MM-dd HH:mm:ss.SSS",
+                    "yyyy/MM/dd HH:mm:ss.SSS",
+                    "yyyy.MM.dd HH:mm:ss.SSS",
+                    "yyyy-MM-dd HH:mm:ss",
+                    "yyyy/MM/dd HH:mm:ss",
+                    "yyyy.MM.dd HH:mm:ss",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSX",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSX",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSX",
+                    "yyyy-MM-dd'T'HH:mm:ssX",
+                    "yyyy/MM/dd'T'HH:mm:ssX",
+                    "yyyy.MM.dd'T'HH:mm:ssX",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSz",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSz",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSz",
+                    "yyyy-MM-dd'T'HH:mm:ssz",
+                    "yyyy/MM/dd'T'HH:mm:ssz",
+                    "yyyy.MM.dd'T'HH:mm:ssz",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSS",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSS",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSS",
+                    "yyyy-MM-dd'T'HH:mm:ss",
+                    "yyyy/MM/dd'T'HH:mm:ss",
+                    "yyyy.MM.dd'T'HH:mm:ss"
+            };
+
+    static final PropertyDescriptor IOTDB_HOST =
+            new PropertyDescriptor.Builder()
+                    .name("Host")
+                    .description("The host of IoTDB.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor IOTDB_PORT =
+            new PropertyDescriptor.Builder()
+                    .name("Port")
+                    .description("The port of IoTDB.")
+                    .defaultValue(String.valueOf(DEFAULT_IOTDB_PORT))
+                    .addValidator(StandardValidators.PORT_VALIDATOR)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor USERNAME =
+            new PropertyDescriptor.Builder()
+                    .name("Username")
+                    .description("Username to access the IoTDB.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor PASSWORD =
+            new PropertyDescriptor.Builder()
+                    .name("Password")
+                    .description("Password to access the IoTDB.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(true)
+                    .sensitive(true)
+                    .build();
+
+    protected static List<PropertyDescriptor> descriptors = new ArrayList<>();
+
+    protected static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("files that were successfully processed")
+                    .build();
+    protected static final Relationship REL_FAILURE =
+            new Relationship.Builder()
+                    .name("failure")
+                    .description("files that were not successfully processed")
+                    .build();
+
+    protected static Set<Relationship> relationships = new HashSet<>();
+
+    static {
+        descriptors.add(IOTDB_HOST);
+        descriptors.add(IOTDB_PORT);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+
+        typeMap.put(RecordFieldType.STRING, TSDataType.TEXT);
+        typeMap.put(RecordFieldType.BOOLEAN, TSDataType.BOOLEAN);
+        typeMap.put(RecordFieldType.INT, TSDataType.INT32);
+        typeMap.put(RecordFieldType.LONG, TSDataType.INT64);
+        typeMap.put(RecordFieldType.FLOAT, TSDataType.FLOAT);
+        typeMap.put(RecordFieldType.DOUBLE, TSDataType.DOUBLE);
+
+        supportedType.add(RecordFieldType.BOOLEAN);
+        supportedType.add(RecordFieldType.STRING);
+        supportedType.add(RecordFieldType.INT);
+        supportedType.add(RecordFieldType.LONG);
+        supportedType.add(RecordFieldType.FLOAT);
+        supportedType.add(RecordFieldType.DOUBLE);
+
+        supportedTimeType.add(RecordFieldType.STRING);
+        supportedTimeType.add(RecordFieldType.LONG);
+    }
+
+    protected final AtomicReference<Session> session = new AtomicReference<>(null);
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        connectToIoTDB(context);
+    }
+
+    void connectToIoTDB(ProcessContext context) {
+        if (session.get() == null) {
+            final String host, username, password;
+            final int port;
+
+            host = context.getProperty(IOTDB_HOST).getValue();
+            port = Integer.parseInt(context.getProperty(IOTDB_PORT).getValue());
+            username = context.getProperty(USERNAME).getValue();
+            password = context.getProperty(PASSWORD).getValue();
+
+            session.set(
+                    new Session.Builder()
+                            .host(host)
+                            .port(port)
+                            .username(username)
+                            .password(password)
+                            .build());
+            try {
+                session.get().open();
+            } catch (IoTDBConnectionException e) {
+                getLogger().error(e.getMessage());
+                e.printStackTrace();
+            }
+        }
+    }
+
+    public void stop(ProcessContext context) {
+        if (session.get() != null) {
+            try {
+                session.get().close();
+            } catch (IoTDBConnectionException e) {
+                e.printStackTrace();
+            }
+            session.set(null);
+        }
+    }
+
+    protected TSDataType getType(RecordFieldType type) {
+        return typeMap.get(type);
+    }
+
+    protected Tuple<Boolean, String> validateSchemaAttribute(String schemaAttribute) {
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode schema = null;
+        try {
+            schema = mapper.readTree(schemaAttribute);
+        } catch (JsonProcessingException e) {
+            return new Tuple<>(false, e.getMessage());
+        }
+        Set<String> keySet = new HashSet<>();
+        schema.fieldNames().forEachRemaining(field -> keySet.add(field));
+
+        if (!keySet.contains("timeType") || !keySet.contains("fields")) {
+            String msg = "The JSON of schema must contain `timeType` and `fields`.";
+            return new Tuple<>(false, msg);
+        }
+
+        if (!Schema.getSupportedTimeType().contains(schema.get("timeType").asText().toLowerCase())) {
+            String msg =
+                    String.format(
+                            "Unknown `timeType`: %s, there are only two options `long` and `string` for this property.",
+                            schema.get("timeType").asText());
+            return new Tuple<>(false, msg);
+        }
+
+        for (int i = 0; i < schema.get("fields").size(); i++) {
+            JsonNode field = schema.get("fields").get(i);
+            Set<String> fieldKeySet = new HashSet<>();
+
+            field.fieldNames().forEachRemaining(fieldName -> fieldKeySet.add(fieldName));
+            if (!fieldKeySet.contains("tsName") || !fieldKeySet.contains("dataType")) {
+                String msg = "`tsName` or `dataType` has not been set.";
+                return new Tuple<>(false, msg);
+            }
+
+            if (!field.get("tsName").asText().startsWith("root.")) {
+                String msg =
+                        String.format("The tsName `%s` is not start with 'root.'.", field.get("tsName").asText());
+                return new Tuple<>(false, msg);
+            }
+
+            if (!Field.getSupportedDataType().contains(field.get("dataType").asText())) {
+                String msg =
+                        String.format(
+                                "Unknown `dataType`: %s. The supported dataTypes are %s",
+                                field.get("dataType").asText(), Field.getSupportedDataType());
+                return new Tuple<>(false, msg);
+            }
+
+            Set<String> supportedKeySet = new HashSet<>();
+            supportedKeySet.add("tsName");
+            supportedKeySet.add("dataType");
+            supportedKeySet.add("encoding");
+            supportedKeySet.add("compressionType");
+
+            HashSet<String> tmpKetSet = new HashSet<>();
+            tmpKetSet.addAll(supportedKeySet);
+            tmpKetSet.addAll(fieldKeySet);
+            tmpKetSet.removeAll(supportedKeySet);
+            if (!tmpKetSet.isEmpty()) {
+                String msg = "Unknown property or properties: " + tmpKetSet;
+                return new Tuple<>(false, msg);
+            }
+
+            if (fieldKeySet.contains("compressionType") && !fieldKeySet.contains("encoding")) {
+                String msg =
+                        "The `compressionType` has been set, but the `encoding` has not. The property `compressionType` will not take effect.";
+                return new Tuple<>(true, msg);
+            }
+
+            if (field.get("encoding") != null
+                    && !Field.getSupportedEncoding().contains(field.get("encoding").asText())) {
+                String msg =
+                        String.format(
+                                "Unknown `encoding`: %s, The supported encoding types are %s",
+                                field.get("encoding").asText(), Field.getSupportedEncoding());
+                return new Tuple<>(false, msg);
+            }
+
+            if (field.get("compressionType") != null
+                    && !Field.getSupportedCompressionType().contains(field.get("compressionType"))) {
+                String msg =
+                        String.format(
+                                "Unknown `compressionType`: %s, The supported compressionType are %s",
+                                field.get("compressionType").asText(), Field.getSupportedCompressionType());
+                return new Tuple<>(false, msg);
+            }
+        }
+
+        return new Tuple<>(true, null);
+    }
+
+    protected Tuple<Boolean, String> validateSchema(RecordSchema recordSchema) {
+        List<String> fieldNames = recordSchema.getFieldNames();
+        List<DataType> dataTypes = recordSchema.getDataTypes();
+        if (!fieldNames.contains("Time")) {
+            return new Tuple<>(false, "The fields must contain `Time`.");

Review Comment:
   It is best to avoid the period character in messages:
   ```suggestion
               return new Tuple<>(false, "The fields must contain `Time`");
   ```



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/pom.xml:
##########
@@ -0,0 +1,116 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-iotdb-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.18.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.nifi</groupId>
+    <artifactId>nifi-iotdb-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-session</artifactId>
+            <version>${iotdb.sdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-server</artifactId>
+            <version>${iotdb.sdk.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-server</artifactId>
+            <version>${iotdb.sdk.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-dbcp-service-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <version>${project.version}</version>
+            <type>nar</type>
+        </dependency>
+
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <version>${jackson.bom.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+            <version>${jackson.bom.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>${jackson.bom.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>

Review Comment:
   JUnit 5 is included as a default dependency, so this dependency on JUnit 4 should be removed.



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.text.SimpleDateFormat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processors.model.Field;
+import org.apache.nifi.processors.model.Schema;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.Tuple;
+
+public abstract class AbstractIoTDB extends AbstractProcessor {
+    private static final int DEFAULT_IOTDB_PORT = 6667;
+
+    private static Map<RecordFieldType, TSDataType> typeMap =
+            new HashMap<>();
+
+    static final Set<RecordFieldType> supportedType =
+            new HashSet<>();
+
+    static final Set<RecordFieldType> supportedTimeType =
+            new HashSet<>();
+
+    private static final String[] STRING_TIME_FORMAT =
+            new String[]{
+                    "yyyy-MM-dd HH:mm:ss.SSSX",
+                    "yyyy/MM/dd HH:mm:ss.SSSX",
+                    "yyyy.MM.dd HH:mm:ss.SSSX",
+                    "yyyy-MM-dd HH:mm:ssX",
+                    "yyyy/MM/dd HH:mm:ssX",
+                    "yyyy.MM.dd HH:mm:ssX",
+                    "yyyy-MM-dd HH:mm:ss.SSSz",
+                    "yyyy/MM/dd HH:mm:ss.SSSz",
+                    "yyyy.MM.dd HH:mm:ss.SSSz",
+                    "yyyy-MM-dd HH:mm:ssz",
+                    "yyyy/MM/dd HH:mm:ssz",
+                    "yyyy.MM.dd HH:mm:ssz",
+                    "yyyy-MM-dd HH:mm:ss.SSS",
+                    "yyyy/MM/dd HH:mm:ss.SSS",
+                    "yyyy.MM.dd HH:mm:ss.SSS",
+                    "yyyy-MM-dd HH:mm:ss",
+                    "yyyy/MM/dd HH:mm:ss",
+                    "yyyy.MM.dd HH:mm:ss",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSX",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSX",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSX",
+                    "yyyy-MM-dd'T'HH:mm:ssX",
+                    "yyyy/MM/dd'T'HH:mm:ssX",
+                    "yyyy.MM.dd'T'HH:mm:ssX",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSz",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSz",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSz",
+                    "yyyy-MM-dd'T'HH:mm:ssz",
+                    "yyyy/MM/dd'T'HH:mm:ssz",
+                    "yyyy.MM.dd'T'HH:mm:ssz",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSS",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSS",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSS",
+                    "yyyy-MM-dd'T'HH:mm:ss",
+                    "yyyy/MM/dd'T'HH:mm:ss",
+                    "yyyy.MM.dd'T'HH:mm:ss"
+            };
+
+    static final PropertyDescriptor IOTDB_HOST =
+            new PropertyDescriptor.Builder()
+                    .name("Host")
+                    .description("The host of IoTDB.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor IOTDB_PORT =
+            new PropertyDescriptor.Builder()
+                    .name("Port")
+                    .description("The port of IoTDB.")
+                    .defaultValue(String.valueOf(DEFAULT_IOTDB_PORT))
+                    .addValidator(StandardValidators.PORT_VALIDATOR)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor USERNAME =
+            new PropertyDescriptor.Builder()
+                    .name("Username")
+                    .description("Username to access the IoTDB.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor PASSWORD =
+            new PropertyDescriptor.Builder()
+                    .name("Password")
+                    .description("Password to access the IoTDB.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(true)
+                    .sensitive(true)
+                    .build();
+
+    protected static List<PropertyDescriptor> descriptors = new ArrayList<>();
+
+    protected static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("files that were successfully processed")
+                    .build();
+    protected static final Relationship REL_FAILURE =
+            new Relationship.Builder()
+                    .name("failure")
+                    .description("files that were not successfully processed")
+                    .build();
+
+    protected static Set<Relationship> relationships = new HashSet<>();
+
+    static {
+        descriptors.add(IOTDB_HOST);
+        descriptors.add(IOTDB_PORT);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+
+        typeMap.put(RecordFieldType.STRING, TSDataType.TEXT);
+        typeMap.put(RecordFieldType.BOOLEAN, TSDataType.BOOLEAN);
+        typeMap.put(RecordFieldType.INT, TSDataType.INT32);
+        typeMap.put(RecordFieldType.LONG, TSDataType.INT64);
+        typeMap.put(RecordFieldType.FLOAT, TSDataType.FLOAT);
+        typeMap.put(RecordFieldType.DOUBLE, TSDataType.DOUBLE);
+
+        supportedType.add(RecordFieldType.BOOLEAN);
+        supportedType.add(RecordFieldType.STRING);
+        supportedType.add(RecordFieldType.INT);
+        supportedType.add(RecordFieldType.LONG);
+        supportedType.add(RecordFieldType.FLOAT);
+        supportedType.add(RecordFieldType.DOUBLE);
+
+        supportedTimeType.add(RecordFieldType.STRING);
+        supportedTimeType.add(RecordFieldType.LONG);
+    }
+
+    protected final AtomicReference<Session> session = new AtomicReference<>(null);
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        connectToIoTDB(context);
+    }
+
+    void connectToIoTDB(ProcessContext context) {
+        if (session.get() == null) {
+            final String host, username, password;
+            final int port;
+
+            host = context.getProperty(IOTDB_HOST).getValue();
+            port = Integer.parseInt(context.getProperty(IOTDB_PORT).getValue());
+            username = context.getProperty(USERNAME).getValue();
+            password = context.getProperty(PASSWORD).getValue();
+
+            session.set(
+                    new Session.Builder()
+                            .host(host)
+                            .port(port)
+                            .username(username)
+                            .password(password)
+                            .build());
+            try {
+                session.get().open();
+            } catch (IoTDBConnectionException e) {
+                getLogger().error(e.getMessage());
+                e.printStackTrace();
+            }
+        }
+    }
+
+    public void stop(ProcessContext context) {
+        if (session.get() != null) {
+            try {
+                session.get().close();
+            } catch (IoTDBConnectionException e) {
+                e.printStackTrace();
+            }
+            session.set(null);
+        }
+    }
+
+    protected TSDataType getType(RecordFieldType type) {
+        return typeMap.get(type);
+    }
+
+    protected Tuple<Boolean, String> validateSchemaAttribute(String schemaAttribute) {
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode schema = null;
+        try {
+            schema = mapper.readTree(schemaAttribute);
+        } catch (JsonProcessingException e) {
+            return new Tuple<>(false, e.getMessage());
+        }
+        Set<String> keySet = new HashSet<>();
+        schema.fieldNames().forEachRemaining(field -> keySet.add(field));
+
+        if (!keySet.contains("timeType") || !keySet.contains("fields")) {
+            String msg = "The JSON of schema must contain `timeType` and `fields`.";
+            return new Tuple<>(false, msg);
+        }
+
+        if (!Schema.getSupportedTimeType().contains(schema.get("timeType").asText().toLowerCase())) {
+            String msg =
+                    String.format(
+                            "Unknown `timeType`: %s, there are only two options `long` and `string` for this property.",
+                            schema.get("timeType").asText());
+            return new Tuple<>(false, msg);
+        }
+
+        for (int i = 0; i < schema.get("fields").size(); i++) {
+            JsonNode field = schema.get("fields").get(i);
+            Set<String> fieldKeySet = new HashSet<>();
+
+            field.fieldNames().forEachRemaining(fieldName -> fieldKeySet.add(fieldName));
+            if (!fieldKeySet.contains("tsName") || !fieldKeySet.contains("dataType")) {
+                String msg = "`tsName` or `dataType` has not been set.";
+                return new Tuple<>(false, msg);
+            }
+
+            if (!field.get("tsName").asText().startsWith("root.")) {
+                String msg =
+                        String.format("The tsName `%s` is not start with 'root.'.", field.get("tsName").asText());
+                return new Tuple<>(false, msg);
+            }
+
+            if (!Field.getSupportedDataType().contains(field.get("dataType").asText())) {
+                String msg =
+                        String.format(
+                                "Unknown `dataType`: %s. The supported dataTypes are %s",
+                                field.get("dataType").asText(), Field.getSupportedDataType());
+                return new Tuple<>(false, msg);
+            }
+
+            Set<String> supportedKeySet = new HashSet<>();
+            supportedKeySet.add("tsName");
+            supportedKeySet.add("dataType");
+            supportedKeySet.add("encoding");
+            supportedKeySet.add("compressionType");
+
+            HashSet<String> tmpKetSet = new HashSet<>();
+            tmpKetSet.addAll(supportedKeySet);
+            tmpKetSet.addAll(fieldKeySet);
+            tmpKetSet.removeAll(supportedKeySet);
+            if (!tmpKetSet.isEmpty()) {
+                String msg = "Unknown property or properties: " + tmpKetSet;
+                return new Tuple<>(false, msg);
+            }
+
+            if (fieldKeySet.contains("compressionType") && !fieldKeySet.contains("encoding")) {
+                String msg =
+                        "The `compressionType` has been set, but the `encoding` has not. The property `compressionType` will not take effect.";
+                return new Tuple<>(true, msg);
+            }
+
+            if (field.get("encoding") != null
+                    && !Field.getSupportedEncoding().contains(field.get("encoding").asText())) {
+                String msg =
+                        String.format(
+                                "Unknown `encoding`: %s, The supported encoding types are %s",
+                                field.get("encoding").asText(), Field.getSupportedEncoding());
+                return new Tuple<>(false, msg);
+            }
+
+            if (field.get("compressionType") != null
+                    && !Field.getSupportedCompressionType().contains(field.get("compressionType"))) {
+                String msg =
+                        String.format(
+                                "Unknown `compressionType`: %s, The supported compressionType are %s",
+                                field.get("compressionType").asText(), Field.getSupportedCompressionType());
+                return new Tuple<>(false, msg);
+            }
+        }
+
+        return new Tuple<>(true, null);
+    }
+
+    protected Tuple<Boolean, String> validateSchema(RecordSchema recordSchema) {
+        List<String> fieldNames = recordSchema.getFieldNames();
+        List<DataType> dataTypes = recordSchema.getDataTypes();
+        if (!fieldNames.contains("Time")) {

Review Comment:
   As with other strings, `Time` should be declared statically and reused.



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.text.SimpleDateFormat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processors.model.Field;
+import org.apache.nifi.processors.model.Schema;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.Tuple;
+
+public abstract class AbstractIoTDB extends AbstractProcessor {
+    private static final int DEFAULT_IOTDB_PORT = 6667;
+
+    private static Map<RecordFieldType, TSDataType> typeMap =
+            new HashMap<>();
+
+    static final Set<RecordFieldType> supportedType =
+            new HashSet<>();
+
+    static final Set<RecordFieldType> supportedTimeType =
+            new HashSet<>();
+
+    private static final String[] STRING_TIME_FORMAT =
+            new String[]{
+                    "yyyy-MM-dd HH:mm:ss.SSSX",
+                    "yyyy/MM/dd HH:mm:ss.SSSX",
+                    "yyyy.MM.dd HH:mm:ss.SSSX",
+                    "yyyy-MM-dd HH:mm:ssX",
+                    "yyyy/MM/dd HH:mm:ssX",
+                    "yyyy.MM.dd HH:mm:ssX",
+                    "yyyy-MM-dd HH:mm:ss.SSSz",
+                    "yyyy/MM/dd HH:mm:ss.SSSz",
+                    "yyyy.MM.dd HH:mm:ss.SSSz",
+                    "yyyy-MM-dd HH:mm:ssz",
+                    "yyyy/MM/dd HH:mm:ssz",
+                    "yyyy.MM.dd HH:mm:ssz",
+                    "yyyy-MM-dd HH:mm:ss.SSS",
+                    "yyyy/MM/dd HH:mm:ss.SSS",
+                    "yyyy.MM.dd HH:mm:ss.SSS",
+                    "yyyy-MM-dd HH:mm:ss",
+                    "yyyy/MM/dd HH:mm:ss",
+                    "yyyy.MM.dd HH:mm:ss",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSX",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSX",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSX",
+                    "yyyy-MM-dd'T'HH:mm:ssX",
+                    "yyyy/MM/dd'T'HH:mm:ssX",
+                    "yyyy.MM.dd'T'HH:mm:ssX",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSz",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSz",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSz",
+                    "yyyy-MM-dd'T'HH:mm:ssz",
+                    "yyyy/MM/dd'T'HH:mm:ssz",
+                    "yyyy.MM.dd'T'HH:mm:ssz",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSS",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSS",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSS",
+                    "yyyy-MM-dd'T'HH:mm:ss",
+                    "yyyy/MM/dd'T'HH:mm:ss",
+                    "yyyy.MM.dd'T'HH:mm:ss"
+            };
+
+    static final PropertyDescriptor IOTDB_HOST =
+            new PropertyDescriptor.Builder()
+                    .name("Host")
+                    .description("The host of IoTDB.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor IOTDB_PORT =
+            new PropertyDescriptor.Builder()
+                    .name("Port")
+                    .description("The port of IoTDB.")
+                    .defaultValue(String.valueOf(DEFAULT_IOTDB_PORT))
+                    .addValidator(StandardValidators.PORT_VALIDATOR)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor USERNAME =
+            new PropertyDescriptor.Builder()
+                    .name("Username")
+                    .description("Username to access the IoTDB.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor PASSWORD =
+            new PropertyDescriptor.Builder()
+                    .name("Password")
+                    .description("Password to access the IoTDB.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(true)
+                    .sensitive(true)
+                    .build();
+
+    protected static List<PropertyDescriptor> descriptors = new ArrayList<>();
+
+    protected static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("files that were successfully processed")
+                    .build();
+    protected static final Relationship REL_FAILURE =
+            new Relationship.Builder()
+                    .name("failure")
+                    .description("files that were not successfully processed")
+                    .build();
+
+    protected static Set<Relationship> relationships = new HashSet<>();
+
+    static {
+        descriptors.add(IOTDB_HOST);
+        descriptors.add(IOTDB_PORT);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+
+        typeMap.put(RecordFieldType.STRING, TSDataType.TEXT);
+        typeMap.put(RecordFieldType.BOOLEAN, TSDataType.BOOLEAN);
+        typeMap.put(RecordFieldType.INT, TSDataType.INT32);
+        typeMap.put(RecordFieldType.LONG, TSDataType.INT64);
+        typeMap.put(RecordFieldType.FLOAT, TSDataType.FLOAT);
+        typeMap.put(RecordFieldType.DOUBLE, TSDataType.DOUBLE);
+
+        supportedType.add(RecordFieldType.BOOLEAN);
+        supportedType.add(RecordFieldType.STRING);
+        supportedType.add(RecordFieldType.INT);
+        supportedType.add(RecordFieldType.LONG);
+        supportedType.add(RecordFieldType.FLOAT);
+        supportedType.add(RecordFieldType.DOUBLE);
+
+        supportedTimeType.add(RecordFieldType.STRING);
+        supportedTimeType.add(RecordFieldType.LONG);
+    }
+
+    protected final AtomicReference<Session> session = new AtomicReference<>(null);
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        connectToIoTDB(context);
+    }
+
+    void connectToIoTDB(ProcessContext context) {
+        if (session.get() == null) {
+            final String host, username, password;
+            final int port;
+
+            host = context.getProperty(IOTDB_HOST).getValue();
+            port = Integer.parseInt(context.getProperty(IOTDB_PORT).getValue());
+            username = context.getProperty(USERNAME).getValue();
+            password = context.getProperty(PASSWORD).getValue();
+
+            session.set(
+                    new Session.Builder()
+                            .host(host)
+                            .port(port)
+                            .username(username)
+                            .password(password)
+                            .build());
+            try {
+                session.get().open();
+            } catch (IoTDBConnectionException e) {
+                getLogger().error(e.getMessage());
+                e.printStackTrace();
+            }
+        }
+    }
+
+    public void stop(ProcessContext context) {
+        if (session.get() != null) {
+            try {
+                session.get().close();
+            } catch (IoTDBConnectionException e) {
+                e.printStackTrace();

Review Comment:
   `printStackTrace` should be replaced an error log.



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.text.SimpleDateFormat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processors.model.Field;
+import org.apache.nifi.processors.model.Schema;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.Tuple;
+
+public abstract class AbstractIoTDB extends AbstractProcessor {
+    private static final int DEFAULT_IOTDB_PORT = 6667;
+
+    private static Map<RecordFieldType, TSDataType> typeMap =
+            new HashMap<>();
+
+    static final Set<RecordFieldType> supportedType =
+            new HashSet<>();
+
+    static final Set<RecordFieldType> supportedTimeType =
+            new HashSet<>();
+
+    private static final String[] STRING_TIME_FORMAT =
+            new String[]{
+                    "yyyy-MM-dd HH:mm:ss.SSSX",
+                    "yyyy/MM/dd HH:mm:ss.SSSX",
+                    "yyyy.MM.dd HH:mm:ss.SSSX",
+                    "yyyy-MM-dd HH:mm:ssX",
+                    "yyyy/MM/dd HH:mm:ssX",
+                    "yyyy.MM.dd HH:mm:ssX",
+                    "yyyy-MM-dd HH:mm:ss.SSSz",
+                    "yyyy/MM/dd HH:mm:ss.SSSz",
+                    "yyyy.MM.dd HH:mm:ss.SSSz",
+                    "yyyy-MM-dd HH:mm:ssz",
+                    "yyyy/MM/dd HH:mm:ssz",
+                    "yyyy.MM.dd HH:mm:ssz",
+                    "yyyy-MM-dd HH:mm:ss.SSS",
+                    "yyyy/MM/dd HH:mm:ss.SSS",
+                    "yyyy.MM.dd HH:mm:ss.SSS",
+                    "yyyy-MM-dd HH:mm:ss",
+                    "yyyy/MM/dd HH:mm:ss",
+                    "yyyy.MM.dd HH:mm:ss",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSX",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSX",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSX",
+                    "yyyy-MM-dd'T'HH:mm:ssX",
+                    "yyyy/MM/dd'T'HH:mm:ssX",
+                    "yyyy.MM.dd'T'HH:mm:ssX",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSz",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSz",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSz",
+                    "yyyy-MM-dd'T'HH:mm:ssz",
+                    "yyyy/MM/dd'T'HH:mm:ssz",
+                    "yyyy.MM.dd'T'HH:mm:ssz",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSS",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSS",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSS",
+                    "yyyy-MM-dd'T'HH:mm:ss",
+                    "yyyy/MM/dd'T'HH:mm:ss",
+                    "yyyy.MM.dd'T'HH:mm:ss"
+            };
+
+    static final PropertyDescriptor IOTDB_HOST =
+            new PropertyDescriptor.Builder()
+                    .name("Host")
+                    .description("The host of IoTDB.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor IOTDB_PORT =
+            new PropertyDescriptor.Builder()
+                    .name("Port")
+                    .description("The port of IoTDB.")
+                    .defaultValue(String.valueOf(DEFAULT_IOTDB_PORT))
+                    .addValidator(StandardValidators.PORT_VALIDATOR)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor USERNAME =
+            new PropertyDescriptor.Builder()
+                    .name("Username")
+                    .description("Username to access the IoTDB.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor PASSWORD =
+            new PropertyDescriptor.Builder()
+                    .name("Password")
+                    .description("Password to access the IoTDB.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(true)
+                    .sensitive(true)
+                    .build();
+
+    protected static List<PropertyDescriptor> descriptors = new ArrayList<>();
+
+    protected static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("files that were successfully processed")
+                    .build();
+    protected static final Relationship REL_FAILURE =
+            new Relationship.Builder()
+                    .name("failure")
+                    .description("files that were not successfully processed")
+                    .build();
+
+    protected static Set<Relationship> relationships = new HashSet<>();
+
+    static {
+        descriptors.add(IOTDB_HOST);
+        descriptors.add(IOTDB_PORT);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+
+        typeMap.put(RecordFieldType.STRING, TSDataType.TEXT);
+        typeMap.put(RecordFieldType.BOOLEAN, TSDataType.BOOLEAN);
+        typeMap.put(RecordFieldType.INT, TSDataType.INT32);
+        typeMap.put(RecordFieldType.LONG, TSDataType.INT64);
+        typeMap.put(RecordFieldType.FLOAT, TSDataType.FLOAT);
+        typeMap.put(RecordFieldType.DOUBLE, TSDataType.DOUBLE);
+
+        supportedType.add(RecordFieldType.BOOLEAN);
+        supportedType.add(RecordFieldType.STRING);
+        supportedType.add(RecordFieldType.INT);
+        supportedType.add(RecordFieldType.LONG);
+        supportedType.add(RecordFieldType.FLOAT);
+        supportedType.add(RecordFieldType.DOUBLE);
+
+        supportedTimeType.add(RecordFieldType.STRING);
+        supportedTimeType.add(RecordFieldType.LONG);
+    }
+
+    protected final AtomicReference<Session> session = new AtomicReference<>(null);
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        connectToIoTDB(context);
+    }
+
+    void connectToIoTDB(ProcessContext context) {
+        if (session.get() == null) {
+            final String host, username, password;
+            final int port;
+
+            host = context.getProperty(IOTDB_HOST).getValue();
+            port = Integer.parseInt(context.getProperty(IOTDB_PORT).getValue());
+            username = context.getProperty(USERNAME).getValue();
+            password = context.getProperty(PASSWORD).getValue();

Review Comment:
   Variable declaration and assignment should be combined into single lines as follows:
   ```suggestion
               final String host = context.getProperty(IOTDB_HOST).getValue();
               final int port = Integer.parseInt(context.getProperty(IOTDB_PORT).getValue());
               final String username = context.getProperty(USERNAME).getValue();
               final String password = context.getProperty(PASSWORD).getValue();
   ```



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.text.SimpleDateFormat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processors.model.Field;
+import org.apache.nifi.processors.model.Schema;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.Tuple;
+
+public abstract class AbstractIoTDB extends AbstractProcessor {
+    private static final int DEFAULT_IOTDB_PORT = 6667;
+
+    private static Map<RecordFieldType, TSDataType> typeMap =
+            new HashMap<>();
+
+    static final Set<RecordFieldType> supportedType =
+            new HashSet<>();
+
+    static final Set<RecordFieldType> supportedTimeType =
+            new HashSet<>();
+
+    private static final String[] STRING_TIME_FORMAT =
+            new String[]{
+                    "yyyy-MM-dd HH:mm:ss.SSSX",
+                    "yyyy/MM/dd HH:mm:ss.SSSX",
+                    "yyyy.MM.dd HH:mm:ss.SSSX",
+                    "yyyy-MM-dd HH:mm:ssX",
+                    "yyyy/MM/dd HH:mm:ssX",
+                    "yyyy.MM.dd HH:mm:ssX",
+                    "yyyy-MM-dd HH:mm:ss.SSSz",
+                    "yyyy/MM/dd HH:mm:ss.SSSz",
+                    "yyyy.MM.dd HH:mm:ss.SSSz",
+                    "yyyy-MM-dd HH:mm:ssz",
+                    "yyyy/MM/dd HH:mm:ssz",
+                    "yyyy.MM.dd HH:mm:ssz",
+                    "yyyy-MM-dd HH:mm:ss.SSS",
+                    "yyyy/MM/dd HH:mm:ss.SSS",
+                    "yyyy.MM.dd HH:mm:ss.SSS",
+                    "yyyy-MM-dd HH:mm:ss",
+                    "yyyy/MM/dd HH:mm:ss",
+                    "yyyy.MM.dd HH:mm:ss",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSX",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSX",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSX",
+                    "yyyy-MM-dd'T'HH:mm:ssX",
+                    "yyyy/MM/dd'T'HH:mm:ssX",
+                    "yyyy.MM.dd'T'HH:mm:ssX",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSz",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSz",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSz",
+                    "yyyy-MM-dd'T'HH:mm:ssz",
+                    "yyyy/MM/dd'T'HH:mm:ssz",
+                    "yyyy.MM.dd'T'HH:mm:ssz",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSS",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSS",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSS",
+                    "yyyy-MM-dd'T'HH:mm:ss",
+                    "yyyy/MM/dd'T'HH:mm:ss",
+                    "yyyy.MM.dd'T'HH:mm:ss"
+            };
+
+    static final PropertyDescriptor IOTDB_HOST =
+            new PropertyDescriptor.Builder()
+                    .name("Host")
+                    .description("The host of IoTDB.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor IOTDB_PORT =
+            new PropertyDescriptor.Builder()
+                    .name("Port")
+                    .description("The port of IoTDB.")
+                    .defaultValue(String.valueOf(DEFAULT_IOTDB_PORT))
+                    .addValidator(StandardValidators.PORT_VALIDATOR)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor USERNAME =
+            new PropertyDescriptor.Builder()
+                    .name("Username")
+                    .description("Username to access the IoTDB.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor PASSWORD =
+            new PropertyDescriptor.Builder()
+                    .name("Password")
+                    .description("Password to access the IoTDB.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(true)
+                    .sensitive(true)
+                    .build();
+
+    protected static List<PropertyDescriptor> descriptors = new ArrayList<>();
+
+    protected static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("files that were successfully processed")
+                    .build();
+    protected static final Relationship REL_FAILURE =
+            new Relationship.Builder()
+                    .name("failure")
+                    .description("files that were not successfully processed")
+                    .build();
+
+    protected static Set<Relationship> relationships = new HashSet<>();
+
+    static {
+        descriptors.add(IOTDB_HOST);
+        descriptors.add(IOTDB_PORT);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+
+        typeMap.put(RecordFieldType.STRING, TSDataType.TEXT);
+        typeMap.put(RecordFieldType.BOOLEAN, TSDataType.BOOLEAN);
+        typeMap.put(RecordFieldType.INT, TSDataType.INT32);
+        typeMap.put(RecordFieldType.LONG, TSDataType.INT64);
+        typeMap.put(RecordFieldType.FLOAT, TSDataType.FLOAT);
+        typeMap.put(RecordFieldType.DOUBLE, TSDataType.DOUBLE);
+
+        supportedType.add(RecordFieldType.BOOLEAN);
+        supportedType.add(RecordFieldType.STRING);
+        supportedType.add(RecordFieldType.INT);
+        supportedType.add(RecordFieldType.LONG);
+        supportedType.add(RecordFieldType.FLOAT);
+        supportedType.add(RecordFieldType.DOUBLE);
+
+        supportedTimeType.add(RecordFieldType.STRING);
+        supportedTimeType.add(RecordFieldType.LONG);
+    }
+
+    protected final AtomicReference<Session> session = new AtomicReference<>(null);
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        connectToIoTDB(context);
+    }
+
+    void connectToIoTDB(ProcessContext context) {
+        if (session.get() == null) {
+            final String host, username, password;
+            final int port;
+
+            host = context.getProperty(IOTDB_HOST).getValue();
+            port = Integer.parseInt(context.getProperty(IOTDB_PORT).getValue());
+            username = context.getProperty(USERNAME).getValue();
+            password = context.getProperty(PASSWORD).getValue();
+
+            session.set(
+                    new Session.Builder()
+                            .host(host)
+                            .port(port)
+                            .username(username)
+                            .password(password)
+                            .build());
+            try {
+                session.get().open();
+            } catch (IoTDBConnectionException e) {
+                getLogger().error(e.getMessage());
+                e.printStackTrace();
+            }
+        }
+    }
+
+    public void stop(ProcessContext context) {
+        if (session.get() != null) {
+            try {
+                session.get().close();
+            } catch (IoTDBConnectionException e) {
+                e.printStackTrace();
+            }
+            session.set(null);
+        }
+    }
+
+    protected TSDataType getType(RecordFieldType type) {
+        return typeMap.get(type);
+    }
+
+    protected Tuple<Boolean, String> validateSchemaAttribute(String schemaAttribute) {
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode schema = null;
+        try {
+            schema = mapper.readTree(schemaAttribute);
+        } catch (JsonProcessingException e) {
+            return new Tuple<>(false, e.getMessage());
+        }
+        Set<String> keySet = new HashSet<>();
+        schema.fieldNames().forEachRemaining(field -> keySet.add(field));
+
+        if (!keySet.contains("timeType") || !keySet.contains("fields")) {

Review Comment:
   It would be helpful to declare `timeType` and `fields` as static variables so that they can be reused in multiple locations.



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.text.SimpleDateFormat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processors.model.Field;
+import org.apache.nifi.processors.model.Schema;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.Tuple;
+
+public abstract class AbstractIoTDB extends AbstractProcessor {
+    private static final int DEFAULT_IOTDB_PORT = 6667;
+
+    private static Map<RecordFieldType, TSDataType> typeMap =
+            new HashMap<>();
+
+    static final Set<RecordFieldType> supportedType =
+            new HashSet<>();
+
+    static final Set<RecordFieldType> supportedTimeType =
+            new HashSet<>();
+
+    private static final String[] STRING_TIME_FORMAT =
+            new String[]{
+                    "yyyy-MM-dd HH:mm:ss.SSSX",
+                    "yyyy/MM/dd HH:mm:ss.SSSX",
+                    "yyyy.MM.dd HH:mm:ss.SSSX",
+                    "yyyy-MM-dd HH:mm:ssX",
+                    "yyyy/MM/dd HH:mm:ssX",
+                    "yyyy.MM.dd HH:mm:ssX",
+                    "yyyy-MM-dd HH:mm:ss.SSSz",
+                    "yyyy/MM/dd HH:mm:ss.SSSz",
+                    "yyyy.MM.dd HH:mm:ss.SSSz",
+                    "yyyy-MM-dd HH:mm:ssz",
+                    "yyyy/MM/dd HH:mm:ssz",
+                    "yyyy.MM.dd HH:mm:ssz",
+                    "yyyy-MM-dd HH:mm:ss.SSS",
+                    "yyyy/MM/dd HH:mm:ss.SSS",
+                    "yyyy.MM.dd HH:mm:ss.SSS",
+                    "yyyy-MM-dd HH:mm:ss",
+                    "yyyy/MM/dd HH:mm:ss",
+                    "yyyy.MM.dd HH:mm:ss",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSX",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSX",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSX",
+                    "yyyy-MM-dd'T'HH:mm:ssX",
+                    "yyyy/MM/dd'T'HH:mm:ssX",
+                    "yyyy.MM.dd'T'HH:mm:ssX",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSz",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSz",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSz",
+                    "yyyy-MM-dd'T'HH:mm:ssz",
+                    "yyyy/MM/dd'T'HH:mm:ssz",
+                    "yyyy.MM.dd'T'HH:mm:ssz",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSS",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSS",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSS",
+                    "yyyy-MM-dd'T'HH:mm:ss",
+                    "yyyy/MM/dd'T'HH:mm:ss",
+                    "yyyy.MM.dd'T'HH:mm:ss"
+            };
+
+    static final PropertyDescriptor IOTDB_HOST =
+            new PropertyDescriptor.Builder()
+                    .name("Host")
+                    .description("The host of IoTDB.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor IOTDB_PORT =
+            new PropertyDescriptor.Builder()
+                    .name("Port")
+                    .description("The port of IoTDB.")
+                    .defaultValue(String.valueOf(DEFAULT_IOTDB_PORT))
+                    .addValidator(StandardValidators.PORT_VALIDATOR)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor USERNAME =
+            new PropertyDescriptor.Builder()
+                    .name("Username")
+                    .description("Username to access the IoTDB.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor PASSWORD =
+            new PropertyDescriptor.Builder()
+                    .name("Password")
+                    .description("Password to access the IoTDB.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(true)
+                    .sensitive(true)
+                    .build();
+
+    protected static List<PropertyDescriptor> descriptors = new ArrayList<>();
+
+    protected static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("files that were successfully processed")
+                    .build();
+    protected static final Relationship REL_FAILURE =
+            new Relationship.Builder()
+                    .name("failure")
+                    .description("files that were not successfully processed")
+                    .build();
+
+    protected static Set<Relationship> relationships = new HashSet<>();
+
+    static {
+        descriptors.add(IOTDB_HOST);
+        descriptors.add(IOTDB_PORT);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+
+        typeMap.put(RecordFieldType.STRING, TSDataType.TEXT);
+        typeMap.put(RecordFieldType.BOOLEAN, TSDataType.BOOLEAN);
+        typeMap.put(RecordFieldType.INT, TSDataType.INT32);
+        typeMap.put(RecordFieldType.LONG, TSDataType.INT64);
+        typeMap.put(RecordFieldType.FLOAT, TSDataType.FLOAT);
+        typeMap.put(RecordFieldType.DOUBLE, TSDataType.DOUBLE);
+
+        supportedType.add(RecordFieldType.BOOLEAN);
+        supportedType.add(RecordFieldType.STRING);
+        supportedType.add(RecordFieldType.INT);
+        supportedType.add(RecordFieldType.LONG);
+        supportedType.add(RecordFieldType.FLOAT);
+        supportedType.add(RecordFieldType.DOUBLE);
+
+        supportedTimeType.add(RecordFieldType.STRING);
+        supportedTimeType.add(RecordFieldType.LONG);
+    }
+
+    protected final AtomicReference<Session> session = new AtomicReference<>(null);
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        connectToIoTDB(context);
+    }
+
+    void connectToIoTDB(ProcessContext context) {
+        if (session.get() == null) {
+            final String host, username, password;
+            final int port;
+
+            host = context.getProperty(IOTDB_HOST).getValue();
+            port = Integer.parseInt(context.getProperty(IOTDB_PORT).getValue());
+            username = context.getProperty(USERNAME).getValue();
+            password = context.getProperty(PASSWORD).getValue();
+
+            session.set(
+                    new Session.Builder()
+                            .host(host)
+                            .port(port)
+                            .username(username)
+                            .password(password)
+                            .build());
+            try {
+                session.get().open();
+            } catch (IoTDBConnectionException e) {
+                getLogger().error(e.getMessage());
+                e.printStackTrace();
+            }
+        }
+    }
+
+    public void stop(ProcessContext context) {
+        if (session.get() != null) {
+            try {
+                session.get().close();
+            } catch (IoTDBConnectionException e) {
+                e.printStackTrace();
+            }
+            session.set(null);
+        }
+    }
+
+    protected TSDataType getType(RecordFieldType type) {
+        return typeMap.get(type);
+    }
+
+    protected Tuple<Boolean, String> validateSchemaAttribute(String schemaAttribute) {
+        ObjectMapper mapper = new ObjectMapper();

Review Comment:
   An instance of `ObjectMapper` is thread-safe, so this can be declared as a static member and accessed.



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.text.SimpleDateFormat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processors.model.Field;
+import org.apache.nifi.processors.model.Schema;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.Tuple;
+
+public abstract class AbstractIoTDB extends AbstractProcessor {
+    private static final int DEFAULT_IOTDB_PORT = 6667;
+
+    private static Map<RecordFieldType, TSDataType> typeMap =
+            new HashMap<>();
+
+    static final Set<RecordFieldType> supportedType =
+            new HashSet<>();
+
+    static final Set<RecordFieldType> supportedTimeType =
+            new HashSet<>();
+
+    private static final String[] STRING_TIME_FORMAT =
+            new String[]{
+                    "yyyy-MM-dd HH:mm:ss.SSSX",
+                    "yyyy/MM/dd HH:mm:ss.SSSX",
+                    "yyyy.MM.dd HH:mm:ss.SSSX",
+                    "yyyy-MM-dd HH:mm:ssX",
+                    "yyyy/MM/dd HH:mm:ssX",
+                    "yyyy.MM.dd HH:mm:ssX",
+                    "yyyy-MM-dd HH:mm:ss.SSSz",
+                    "yyyy/MM/dd HH:mm:ss.SSSz",
+                    "yyyy.MM.dd HH:mm:ss.SSSz",
+                    "yyyy-MM-dd HH:mm:ssz",
+                    "yyyy/MM/dd HH:mm:ssz",
+                    "yyyy.MM.dd HH:mm:ssz",
+                    "yyyy-MM-dd HH:mm:ss.SSS",
+                    "yyyy/MM/dd HH:mm:ss.SSS",
+                    "yyyy.MM.dd HH:mm:ss.SSS",
+                    "yyyy-MM-dd HH:mm:ss",
+                    "yyyy/MM/dd HH:mm:ss",
+                    "yyyy.MM.dd HH:mm:ss",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSX",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSX",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSX",
+                    "yyyy-MM-dd'T'HH:mm:ssX",
+                    "yyyy/MM/dd'T'HH:mm:ssX",
+                    "yyyy.MM.dd'T'HH:mm:ssX",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSz",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSz",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSz",
+                    "yyyy-MM-dd'T'HH:mm:ssz",
+                    "yyyy/MM/dd'T'HH:mm:ssz",
+                    "yyyy.MM.dd'T'HH:mm:ssz",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSS",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSS",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSS",
+                    "yyyy-MM-dd'T'HH:mm:ss",
+                    "yyyy/MM/dd'T'HH:mm:ss",
+                    "yyyy.MM.dd'T'HH:mm:ss"
+            };
+
+    static final PropertyDescriptor IOTDB_HOST =
+            new PropertyDescriptor.Builder()
+                    .name("Host")
+                    .description("The host of IoTDB.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor IOTDB_PORT =
+            new PropertyDescriptor.Builder()
+                    .name("Port")
+                    .description("The port of IoTDB.")
+                    .defaultValue(String.valueOf(DEFAULT_IOTDB_PORT))
+                    .addValidator(StandardValidators.PORT_VALIDATOR)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor USERNAME =
+            new PropertyDescriptor.Builder()
+                    .name("Username")
+                    .description("Username to access the IoTDB.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor PASSWORD =
+            new PropertyDescriptor.Builder()
+                    .name("Password")
+                    .description("Password to access the IoTDB.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(true)
+                    .sensitive(true)
+                    .build();
+
+    protected static List<PropertyDescriptor> descriptors = new ArrayList<>();
+
+    protected static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("files that were successfully processed")
+                    .build();
+    protected static final Relationship REL_FAILURE =
+            new Relationship.Builder()
+                    .name("failure")
+                    .description("files that were not successfully processed")
+                    .build();
+
+    protected static Set<Relationship> relationships = new HashSet<>();
+
+    static {
+        descriptors.add(IOTDB_HOST);
+        descriptors.add(IOTDB_PORT);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+
+        typeMap.put(RecordFieldType.STRING, TSDataType.TEXT);
+        typeMap.put(RecordFieldType.BOOLEAN, TSDataType.BOOLEAN);
+        typeMap.put(RecordFieldType.INT, TSDataType.INT32);
+        typeMap.put(RecordFieldType.LONG, TSDataType.INT64);
+        typeMap.put(RecordFieldType.FLOAT, TSDataType.FLOAT);
+        typeMap.put(RecordFieldType.DOUBLE, TSDataType.DOUBLE);
+
+        supportedType.add(RecordFieldType.BOOLEAN);
+        supportedType.add(RecordFieldType.STRING);
+        supportedType.add(RecordFieldType.INT);
+        supportedType.add(RecordFieldType.LONG);
+        supportedType.add(RecordFieldType.FLOAT);
+        supportedType.add(RecordFieldType.DOUBLE);
+
+        supportedTimeType.add(RecordFieldType.STRING);
+        supportedTimeType.add(RecordFieldType.LONG);
+    }
+
+    protected final AtomicReference<Session> session = new AtomicReference<>(null);
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        connectToIoTDB(context);
+    }
+
+    void connectToIoTDB(ProcessContext context) {
+        if (session.get() == null) {
+            final String host, username, password;
+            final int port;
+
+            host = context.getProperty(IOTDB_HOST).getValue();
+            port = Integer.parseInt(context.getProperty(IOTDB_PORT).getValue());
+            username = context.getProperty(USERNAME).getValue();
+            password = context.getProperty(PASSWORD).getValue();
+
+            session.set(
+                    new Session.Builder()
+                            .host(host)
+                            .port(port)
+                            .username(username)
+                            .password(password)
+                            .build());
+            try {
+                session.get().open();
+            } catch (IoTDBConnectionException e) {
+                getLogger().error(e.getMessage());
+                e.printStackTrace();
+            }
+        }
+    }
+
+    public void stop(ProcessContext context) {
+        if (session.get() != null) {
+            try {
+                session.get().close();
+            } catch (IoTDBConnectionException e) {
+                e.printStackTrace();
+            }
+            session.set(null);
+        }
+    }
+
+    protected TSDataType getType(RecordFieldType type) {
+        return typeMap.get(type);
+    }
+
+    protected Tuple<Boolean, String> validateSchemaAttribute(String schemaAttribute) {

Review Comment:
   Although the `Tuple` is a useful utility class, it is not very useful for conveying other details. It would be better to create a custom object and reuse that instead of using `Tuple`.



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDB.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.io.InputStream;
+import java.text.SimpleDateFormat;
+import java.util.Set;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.processors.model.Schema;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.Tuple;
+
+@Tags({"iotdb", "insert", "tablet"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "This is a record aware processor that reads the content of the incoming FlowFile as individual records using the "
+                + "configured 'Record Reader' and writes them to Apache IoTDB using native interface.")
+public class PutIoTDB extends AbstractIoTDB {
+
+    static final PropertyDescriptor RECORD_READER_FACTORY =
+            new PropertyDescriptor.Builder()
+                    .name("Record Reader")
+                    .description(
+                            "Specifies the type of Record Reader controller service to use for parsing the incoming data "
+                                    + "and determining the schema")
+                    .identifiesControllerService(RecordReaderFactory.class)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor TIME_FIELD =
+            new PropertyDescriptor.Builder()
+                    .name("Time Field")
+                    .description(
+                            "The field name which represents time. It can be updated by expression language.")

Review Comment:
   The second sentence can be removed since support for FlowFile attributes and expression language will be indicated automatically in generated documentation.
   ```suggestion
                               "The field name which represents time")
   ```



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDB.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.io.InputStream;
+import java.text.SimpleDateFormat;
+import java.util.Set;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.processors.model.Schema;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.Tuple;
+
+@Tags({"iotdb", "insert", "tablet"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "This is a record aware processor that reads the content of the incoming FlowFile as individual records using the "
+                + "configured 'Record Reader' and writes them to Apache IoTDB using native interface.")
+public class PutIoTDB extends AbstractIoTDB {
+
+    static final PropertyDescriptor RECORD_READER_FACTORY =
+            new PropertyDescriptor.Builder()
+                    .name("Record Reader")
+                    .description(
+                            "Specifies the type of Record Reader controller service to use for parsing the incoming data "
+                                    + "and determining the schema")
+                    .identifiesControllerService(RecordReaderFactory.class)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor TIME_FIELD =
+            new PropertyDescriptor.Builder()
+                    .name("Time Field")
+                    .description(
+                            "The field name which represents time. It can be updated by expression language.")
+                    .defaultValue("Time")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor SCHEMA =
+            new PropertyDescriptor.Builder()
+                    .name("Schema")
+                    .description(
+                            "The schema that IoTDB needs doesn't support good by NiFi.\n"
+                                    + "Therefore, you can define the schema here.\n"
+                                    + "Besides, you can set encoding type and compression type by this method.\n"
+                                    + "If you don't set this property, the inferred schema will be used.\n"
+                                    + "It can be updated by expression language.")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor ALIGNED =
+            new PropertyDescriptor.Builder()
+                    .name("Aligned")
+                    .description("Whether using aligned interface? It can be updated by expression language.")
+                    .allowableValues(new String[] {"true", "false"})

Review Comment:
   ```suggestion
                       .allowableValues("true", "false")
   ```



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.text.SimpleDateFormat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processors.model.Field;
+import org.apache.nifi.processors.model.Schema;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.Tuple;
+
+public abstract class AbstractIoTDB extends AbstractProcessor {
+    private static final int DEFAULT_IOTDB_PORT = 6667;
+
+    private static Map<RecordFieldType, TSDataType> typeMap =
+            new HashMap<>();
+
+    static final Set<RecordFieldType> supportedType =
+            new HashSet<>();
+
+    static final Set<RecordFieldType> supportedTimeType =
+            new HashSet<>();
+
+    private static final String[] STRING_TIME_FORMAT =
+            new String[]{
+                    "yyyy-MM-dd HH:mm:ss.SSSX",
+                    "yyyy/MM/dd HH:mm:ss.SSSX",
+                    "yyyy.MM.dd HH:mm:ss.SSSX",
+                    "yyyy-MM-dd HH:mm:ssX",
+                    "yyyy/MM/dd HH:mm:ssX",
+                    "yyyy.MM.dd HH:mm:ssX",
+                    "yyyy-MM-dd HH:mm:ss.SSSz",
+                    "yyyy/MM/dd HH:mm:ss.SSSz",
+                    "yyyy.MM.dd HH:mm:ss.SSSz",
+                    "yyyy-MM-dd HH:mm:ssz",
+                    "yyyy/MM/dd HH:mm:ssz",
+                    "yyyy.MM.dd HH:mm:ssz",
+                    "yyyy-MM-dd HH:mm:ss.SSS",
+                    "yyyy/MM/dd HH:mm:ss.SSS",
+                    "yyyy.MM.dd HH:mm:ss.SSS",
+                    "yyyy-MM-dd HH:mm:ss",
+                    "yyyy/MM/dd HH:mm:ss",
+                    "yyyy.MM.dd HH:mm:ss",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSX",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSX",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSX",
+                    "yyyy-MM-dd'T'HH:mm:ssX",
+                    "yyyy/MM/dd'T'HH:mm:ssX",
+                    "yyyy.MM.dd'T'HH:mm:ssX",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSz",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSz",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSz",
+                    "yyyy-MM-dd'T'HH:mm:ssz",
+                    "yyyy/MM/dd'T'HH:mm:ssz",
+                    "yyyy.MM.dd'T'HH:mm:ssz",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSS",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSS",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSS",
+                    "yyyy-MM-dd'T'HH:mm:ss",
+                    "yyyy/MM/dd'T'HH:mm:ss",
+                    "yyyy.MM.dd'T'HH:mm:ss"
+            };
+
+    static final PropertyDescriptor IOTDB_HOST =
+            new PropertyDescriptor.Builder()
+                    .name("Host")
+                    .description("The host of IoTDB.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor IOTDB_PORT =
+            new PropertyDescriptor.Builder()
+                    .name("Port")
+                    .description("The port of IoTDB.")
+                    .defaultValue(String.valueOf(DEFAULT_IOTDB_PORT))
+                    .addValidator(StandardValidators.PORT_VALIDATOR)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor USERNAME =
+            new PropertyDescriptor.Builder()
+                    .name("Username")
+                    .description("Username to access the IoTDB.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor PASSWORD =
+            new PropertyDescriptor.Builder()
+                    .name("Password")
+                    .description("Password to access the IoTDB.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(true)
+                    .sensitive(true)
+                    .build();
+
+    protected static List<PropertyDescriptor> descriptors = new ArrayList<>();
+
+    protected static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("files that were successfully processed")
+                    .build();
+    protected static final Relationship REL_FAILURE =
+            new Relationship.Builder()
+                    .name("failure")
+                    .description("files that were not successfully processed")
+                    .build();
+
+    protected static Set<Relationship> relationships = new HashSet<>();
+
+    static {
+        descriptors.add(IOTDB_HOST);
+        descriptors.add(IOTDB_PORT);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+
+        typeMap.put(RecordFieldType.STRING, TSDataType.TEXT);
+        typeMap.put(RecordFieldType.BOOLEAN, TSDataType.BOOLEAN);
+        typeMap.put(RecordFieldType.INT, TSDataType.INT32);
+        typeMap.put(RecordFieldType.LONG, TSDataType.INT64);
+        typeMap.put(RecordFieldType.FLOAT, TSDataType.FLOAT);
+        typeMap.put(RecordFieldType.DOUBLE, TSDataType.DOUBLE);
+
+        supportedType.add(RecordFieldType.BOOLEAN);
+        supportedType.add(RecordFieldType.STRING);
+        supportedType.add(RecordFieldType.INT);
+        supportedType.add(RecordFieldType.LONG);
+        supportedType.add(RecordFieldType.FLOAT);
+        supportedType.add(RecordFieldType.DOUBLE);
+
+        supportedTimeType.add(RecordFieldType.STRING);
+        supportedTimeType.add(RecordFieldType.LONG);
+    }
+
+    protected final AtomicReference<Session> session = new AtomicReference<>(null);
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        connectToIoTDB(context);
+    }
+
+    void connectToIoTDB(ProcessContext context) {
+        if (session.get() == null) {
+            final String host, username, password;
+            final int port;
+
+            host = context.getProperty(IOTDB_HOST).getValue();
+            port = Integer.parseInt(context.getProperty(IOTDB_PORT).getValue());
+            username = context.getProperty(USERNAME).getValue();
+            password = context.getProperty(PASSWORD).getValue();
+
+            session.set(
+                    new Session.Builder()
+                            .host(host)
+                            .port(port)
+                            .username(username)
+                            .password(password)
+                            .build());
+            try {
+                session.get().open();
+            } catch (IoTDBConnectionException e) {
+                getLogger().error(e.getMessage());
+                e.printStackTrace();
+            }
+        }
+    }
+
+    public void stop(ProcessContext context) {
+        if (session.get() != null) {
+            try {
+                session.get().close();
+            } catch (IoTDBConnectionException e) {
+                e.printStackTrace();
+            }
+            session.set(null);
+        }
+    }
+
+    protected TSDataType getType(RecordFieldType type) {
+        return typeMap.get(type);
+    }
+
+    protected Tuple<Boolean, String> validateSchemaAttribute(String schemaAttribute) {
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode schema = null;
+        try {
+            schema = mapper.readTree(schemaAttribute);
+        } catch (JsonProcessingException e) {
+            return new Tuple<>(false, e.getMessage());
+        }
+        Set<String> keySet = new HashSet<>();
+        schema.fieldNames().forEachRemaining(field -> keySet.add(field));
+
+        if (!keySet.contains("timeType") || !keySet.contains("fields")) {
+            String msg = "The JSON of schema must contain `timeType` and `fields`.";
+            return new Tuple<>(false, msg);
+        }
+
+        if (!Schema.getSupportedTimeType().contains(schema.get("timeType").asText().toLowerCase())) {
+            String msg =
+                    String.format(
+                            "Unknown `timeType`: %s, there are only two options `long` and `string` for this property.",
+                            schema.get("timeType").asText());
+            return new Tuple<>(false, msg);
+        }
+
+        for (int i = 0; i < schema.get("fields").size(); i++) {
+            JsonNode field = schema.get("fields").get(i);
+            Set<String> fieldKeySet = new HashSet<>();
+
+            field.fieldNames().forEachRemaining(fieldName -> fieldKeySet.add(fieldName));
+            if (!fieldKeySet.contains("tsName") || !fieldKeySet.contains("dataType")) {
+                String msg = "`tsName` or `dataType` has not been set.";
+                return new Tuple<>(false, msg);
+            }
+
+            if (!field.get("tsName").asText().startsWith("root.")) {
+                String msg =
+                        String.format("The tsName `%s` is not start with 'root.'.", field.get("tsName").asText());
+                return new Tuple<>(false, msg);
+            }
+
+            if (!Field.getSupportedDataType().contains(field.get("dataType").asText())) {
+                String msg =
+                        String.format(
+                                "Unknown `dataType`: %s. The supported dataTypes are %s",
+                                field.get("dataType").asText(), Field.getSupportedDataType());
+                return new Tuple<>(false, msg);
+            }
+
+            Set<String> supportedKeySet = new HashSet<>();
+            supportedKeySet.add("tsName");
+            supportedKeySet.add("dataType");
+            supportedKeySet.add("encoding");
+            supportedKeySet.add("compressionType");
+
+            HashSet<String> tmpKetSet = new HashSet<>();
+            tmpKetSet.addAll(supportedKeySet);
+            tmpKetSet.addAll(fieldKeySet);
+            tmpKetSet.removeAll(supportedKeySet);
+            if (!tmpKetSet.isEmpty()) {
+                String msg = "Unknown property or properties: " + tmpKetSet;
+                return new Tuple<>(false, msg);
+            }
+
+            if (fieldKeySet.contains("compressionType") && !fieldKeySet.contains("encoding")) {
+                String msg =
+                        "The `compressionType` has been set, but the `encoding` has not. The property `compressionType` will not take effect.";
+                return new Tuple<>(true, msg);
+            }
+
+            if (field.get("encoding") != null
+                    && !Field.getSupportedEncoding().contains(field.get("encoding").asText())) {
+                String msg =
+                        String.format(
+                                "Unknown `encoding`: %s, The supported encoding types are %s",
+                                field.get("encoding").asText(), Field.getSupportedEncoding());
+                return new Tuple<>(false, msg);
+            }
+
+            if (field.get("compressionType") != null
+                    && !Field.getSupportedCompressionType().contains(field.get("compressionType"))) {
+                String msg =
+                        String.format(
+                                "Unknown `compressionType`: %s, The supported compressionType are %s",
+                                field.get("compressionType").asText(), Field.getSupportedCompressionType());
+                return new Tuple<>(false, msg);
+            }
+        }
+
+        return new Tuple<>(true, null);
+    }
+
+    protected Tuple<Boolean, String> validateSchema(RecordSchema recordSchema) {
+        List<String> fieldNames = recordSchema.getFieldNames();
+        List<DataType> dataTypes = recordSchema.getDataTypes();
+        if (!fieldNames.contains("Time")) {
+            return new Tuple<>(false, "The fields must contain `Time`.");
+        }
+        if (!supportedTimeType.contains(recordSchema.getDataType("Time").get().getFieldType())) {
+            return new Tuple<>(false, "The dataType of `Time` must be `STRING` or `LONG`.");
+        }
+        fieldNames.remove("Time");
+        for (String fieldName : fieldNames) {
+            if (!fieldName.startsWith("root.")) {
+                String msg = String.format("The tsName `%s` is not start with 'root.'.", fieldName);
+                return new Tuple<>(false, msg);
+            }
+        }
+        for (DataType type : dataTypes) {
+            RecordFieldType dataType = type.getFieldType();
+            if (!supportedType.contains(dataType)) {
+                String msg =
+                        String.format(
+                                "Unknown `dataType`: %s. The supported dataTypes are %s",
+                                dataType.toString(), supportedType);
+                return new Tuple<>(false, msg);
+            }
+        }
+
+        return new Tuple<>(true, null);
+    }
+
+    protected Map<String, List<String>> parseSchema(List<String> filedNames) {
+        HashMap<String, List<String>> deviceMeasurementMap = new HashMap<>();
+        filedNames.stream()
+                .forEach(
+                        filed -> {
+                            String[] paths = filed.split("\\.");
+                            String device = StringUtils.join(paths, ".", 0, paths.length - 1);
+
+                            if (!deviceMeasurementMap.containsKey(device)) {
+                                deviceMeasurementMap.put(device, new ArrayList<>());
+                            }
+                            deviceMeasurementMap.get(device).add(paths[paths.length - 1]);
+                        });
+
+        return deviceMeasurementMap;
+    }
+
+    protected HashMap<String, Tablet> generateTablets(Schema schema, int maxRowNumber) {
+        Map<String, List<String>> deviceMeasurementMap = parseSchema(schema.getFieldNames());
+        HashMap<String, Tablet> tablets = new HashMap<>();
+        deviceMeasurementMap.forEach(
+                (device, measurements) -> {
+                    ArrayList<MeasurementSchema> schemas = new ArrayList<>();
+                    for (String measurement : measurements) {
+                        String tsName = device + "." + measurement;
+                        TSDataType dataType = schema.getDataType(tsName);
+                        TSEncoding encoding = schema.getEncodingType(tsName);
+                        CompressionType compressionType = schema.getCompressionType(tsName);
+                        if (encoding == null) {
+                            schemas.add(new MeasurementSchema(measurement, dataType));
+                        } else if (compressionType == null) {
+                            schemas.add(new MeasurementSchema(measurement, dataType, encoding));
+                        } else {
+                            schemas.add(new MeasurementSchema(measurement, dataType, encoding, compressionType));
+                        }
+                    }
+                    Tablet tablet = new Tablet(device, schemas, maxRowNumber);
+                    tablets.put(device, tablet);
+                });
+        return tablets;
+    }
+
+    protected SimpleDateFormat initFormatter(String time) {

Review Comment:
   `SimpleDateFormat` is not thread-safe, and Java 8 provides better alternatives in `DateTimeFormatter`. Recommend replacing all references to `SimpleDateFormat` with `DateTimeFormatter`.



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDB.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.io.InputStream;
+import java.text.SimpleDateFormat;
+import java.util.Set;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.processors.model.Schema;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.Tuple;
+
+@Tags({"iotdb", "insert", "tablet"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "This is a record aware processor that reads the content of the incoming FlowFile as individual records using the "
+                + "configured 'Record Reader' and writes them to Apache IoTDB using native interface.")
+public class PutIoTDB extends AbstractIoTDB {
+
+    static final PropertyDescriptor RECORD_READER_FACTORY =
+            new PropertyDescriptor.Builder()
+                    .name("Record Reader")
+                    .description(
+                            "Specifies the type of Record Reader controller service to use for parsing the incoming data "
+                                    + "and determining the schema")
+                    .identifiesControllerService(RecordReaderFactory.class)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor TIME_FIELD =
+            new PropertyDescriptor.Builder()
+                    .name("Time Field")
+                    .description(
+                            "The field name which represents time. It can be updated by expression language.")
+                    .defaultValue("Time")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor SCHEMA =
+            new PropertyDescriptor.Builder()
+                    .name("Schema")
+                    .description(
+                            "The schema that IoTDB needs doesn't support good by NiFi.\n"
+                                    + "Therefore, you can define the schema here.\n"
+                                    + "Besides, you can set encoding type and compression type by this method.\n"
+                                    + "If you don't set this property, the inferred schema will be used.\n"
+                                    + "It can be updated by expression language.")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor ALIGNED =
+            new PropertyDescriptor.Builder()
+                    .name("Aligned")
+                    .description("Whether using aligned interface? It can be updated by expression language.")
+                    .allowableValues(new String[] {"true", "false"})
+                    .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor MAX_ROW_NUMBER =
+            new PropertyDescriptor.Builder()
+                    .name("Max Row Number")
+                    .description(
+                            "Specifies the max row number of each tablet. It can be updated by expression language.")
+                    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+                    .defaultValue("false")

Review Comment:
   The default value should be removed if this property is not required.
   ```suggestion
   ```



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDB.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.io.InputStream;
+import java.text.SimpleDateFormat;
+import java.util.Set;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.processors.model.Schema;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.Tuple;
+
+@Tags({"iotdb", "insert", "tablet"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "This is a record aware processor that reads the content of the incoming FlowFile as individual records using the "
+                + "configured 'Record Reader' and writes them to Apache IoTDB using native interface.")
+public class PutIoTDB extends AbstractIoTDB {
+
+    static final PropertyDescriptor RECORD_READER_FACTORY =
+            new PropertyDescriptor.Builder()
+                    .name("Record Reader")
+                    .description(
+                            "Specifies the type of Record Reader controller service to use for parsing the incoming data "
+                                    + "and determining the schema")
+                    .identifiesControllerService(RecordReaderFactory.class)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor TIME_FIELD =
+            new PropertyDescriptor.Builder()
+                    .name("Time Field")
+                    .description(
+                            "The field name which represents time. It can be updated by expression language.")
+                    .defaultValue("Time")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor SCHEMA =
+            new PropertyDescriptor.Builder()
+                    .name("Schema")
+                    .description(
+                            "The schema that IoTDB needs doesn't support good by NiFi.\n"
+                                    + "Therefore, you can define the schema here.\n"
+                                    + "Besides, you can set encoding type and compression type by this method.\n"
+                                    + "If you don't set this property, the inferred schema will be used.\n"
+                                    + "It can be updated by expression language.")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor ALIGNED =
+            new PropertyDescriptor.Builder()
+                    .name("Aligned")
+                    .description("Whether using aligned interface? It can be updated by expression language.")

Review Comment:
   ```suggestion
                       .description("Whether to use the Apache IoTDB Aligned Timeseries interface")
   ```



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDB.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.io.InputStream;
+import java.text.SimpleDateFormat;
+import java.util.Set;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.processors.model.Schema;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.Tuple;
+
+@Tags({"iotdb", "insert", "tablet"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "This is a record aware processor that reads the content of the incoming FlowFile as individual records using the "
+                + "configured 'Record Reader' and writes them to Apache IoTDB using native interface.")
+public class PutIoTDB extends AbstractIoTDB {
+
+    static final PropertyDescriptor RECORD_READER_FACTORY =
+            new PropertyDescriptor.Builder()
+                    .name("Record Reader")
+                    .description(
+                            "Specifies the type of Record Reader controller service to use for parsing the incoming data "
+                                    + "and determining the schema")
+                    .identifiesControllerService(RecordReaderFactory.class)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor TIME_FIELD =
+            new PropertyDescriptor.Builder()
+                    .name("Time Field")
+                    .description(
+                            "The field name which represents time. It can be updated by expression language.")
+                    .defaultValue("Time")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor SCHEMA =
+            new PropertyDescriptor.Builder()
+                    .name("Schema")
+                    .description(
+                            "The schema that IoTDB needs doesn't support good by NiFi.\n"
+                                    + "Therefore, you can define the schema here.\n"
+                                    + "Besides, you can set encoding type and compression type by this method.\n"
+                                    + "If you don't set this property, the inferred schema will be used.\n"
+                                    + "It can be updated by expression language.")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor ALIGNED =
+            new PropertyDescriptor.Builder()
+                    .name("Aligned")
+                    .description("Whether using aligned interface? It can be updated by expression language.")
+                    .allowableValues(new String[] {"true", "false"})
+                    .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor MAX_ROW_NUMBER =
+            new PropertyDescriptor.Builder()
+                    .name("Max Row Number")
+                    .description(
+                            "Specifies the max row number of each tablet. It can be updated by expression language.")
+                    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static {
+        descriptors.add(RECORD_READER_FACTORY);
+        descriptors.add(TIME_FIELD);
+        descriptors.add(SCHEMA);
+        descriptors.add(ALIGNED);
+        descriptors.add(MAX_ROW_NUMBER);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext processContext, ProcessSession processSession)
+            throws ProcessException {
+        final RecordReaderFactory recordParserFactory =
+                processContext
+                        .getProperty(RECORD_READER_FACTORY)
+                        .asControllerService(RecordReaderFactory.class);
+
+        String timeFieldProperty = processContext.getProperty(TIME_FIELD).getValue();
+        String schemaProperty = processContext.getProperty(SCHEMA).getValue();
+        String alignedProperty = processContext.getProperty(ALIGNED).getValue();
+        String maxRowNumberProperty = processContext.getProperty(MAX_ROW_NUMBER).getValue();
+
+        FlowFile flowFile = processSession.get();
+
+        String timeField = timeFieldProperty != null ? timeFieldProperty : "Time";
+        Boolean aligned = alignedProperty != null ? Boolean.valueOf(alignedProperty) : false;
+        int maxRowNumber = maxRowNumberProperty != null ? Integer.valueOf(maxRowNumberProperty) : 1024;
+
+        try (final InputStream inputStream = processSession.read(flowFile);
+             final RecordReader recordReader =
+                     recordParserFactory.createRecordReader(flowFile, inputStream, getLogger())) {
+            if (flowFile == null) {

Review Comment:
   This null check should be moved right after `processSession.get()`, instead of inside this try-catch block.



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDB.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.io.InputStream;
+import java.text.SimpleDateFormat;
+import java.util.Set;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.processors.model.Schema;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.Tuple;
+
+@Tags({"iotdb", "insert", "tablet"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "This is a record aware processor that reads the content of the incoming FlowFile as individual records using the "
+                + "configured 'Record Reader' and writes them to Apache IoTDB using native interface.")
+public class PutIoTDB extends AbstractIoTDB {
+
+    static final PropertyDescriptor RECORD_READER_FACTORY =
+            new PropertyDescriptor.Builder()
+                    .name("Record Reader")
+                    .description(
+                            "Specifies the type of Record Reader controller service to use for parsing the incoming data "
+                                    + "and determining the schema")
+                    .identifiesControllerService(RecordReaderFactory.class)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor TIME_FIELD =
+            new PropertyDescriptor.Builder()
+                    .name("Time Field")
+                    .description(
+                            "The field name which represents time. It can be updated by expression language.")
+                    .defaultValue("Time")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor SCHEMA =
+            new PropertyDescriptor.Builder()
+                    .name("Schema")
+                    .description(
+                            "The schema that IoTDB needs doesn't support good by NiFi.\n"
+                                    + "Therefore, you can define the schema here.\n"
+                                    + "Besides, you can set encoding type and compression type by this method.\n"
+                                    + "If you don't set this property, the inferred schema will be used.\n"
+                                    + "It can be updated by expression language.")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor ALIGNED =
+            new PropertyDescriptor.Builder()
+                    .name("Aligned")
+                    .description("Whether using aligned interface? It can be updated by expression language.")
+                    .allowableValues(new String[] {"true", "false"})
+                    .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor MAX_ROW_NUMBER =
+            new PropertyDescriptor.Builder()
+                    .name("Max Row Number")
+                    .description(
+                            "Specifies the max row number of each tablet. It can be updated by expression language.")
+                    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static {
+        descriptors.add(RECORD_READER_FACTORY);
+        descriptors.add(TIME_FIELD);
+        descriptors.add(SCHEMA);
+        descriptors.add(ALIGNED);
+        descriptors.add(MAX_ROW_NUMBER);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext processContext, ProcessSession processSession)
+            throws ProcessException {
+        final RecordReaderFactory recordParserFactory =
+                processContext
+                        .getProperty(RECORD_READER_FACTORY)
+                        .asControllerService(RecordReaderFactory.class);
+
+        String timeFieldProperty = processContext.getProperty(TIME_FIELD).getValue();
+        String schemaProperty = processContext.getProperty(SCHEMA).getValue();
+        String alignedProperty = processContext.getProperty(ALIGNED).getValue();
+        String maxRowNumberProperty = processContext.getProperty(MAX_ROW_NUMBER).getValue();

Review Comment:
   These `getProperty()` calls need to include `evaluateAttributeExpressions(flowFile)` in order to resolve FlowFile-based attributes.



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDB.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.io.InputStream;
+import java.text.SimpleDateFormat;
+import java.util.Set;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.processors.model.Schema;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.Tuple;
+
+@Tags({"iotdb", "insert", "tablet"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "This is a record aware processor that reads the content of the incoming FlowFile as individual records using the "
+                + "configured 'Record Reader' and writes them to Apache IoTDB using native interface.")
+public class PutIoTDB extends AbstractIoTDB {
+
+    static final PropertyDescriptor RECORD_READER_FACTORY =
+            new PropertyDescriptor.Builder()
+                    .name("Record Reader")
+                    .description(
+                            "Specifies the type of Record Reader controller service to use for parsing the incoming data "
+                                    + "and determining the schema")
+                    .identifiesControllerService(RecordReaderFactory.class)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor TIME_FIELD =
+            new PropertyDescriptor.Builder()
+                    .name("Time Field")
+                    .description(
+                            "The field name which represents time. It can be updated by expression language.")
+                    .defaultValue("Time")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor SCHEMA =
+            new PropertyDescriptor.Builder()
+                    .name("Schema")
+                    .description(
+                            "The schema that IoTDB needs doesn't support good by NiFi.\n"
+                                    + "Therefore, you can define the schema here.\n"
+                                    + "Besides, you can set encoding type and compression type by this method.\n"
+                                    + "If you don't set this property, the inferred schema will be used.\n"
+                                    + "It can be updated by expression language.")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor ALIGNED =
+            new PropertyDescriptor.Builder()
+                    .name("Aligned")
+                    .description("Whether using aligned interface? It can be updated by expression language.")
+                    .allowableValues(new String[] {"true", "false"})
+                    .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor MAX_ROW_NUMBER =
+            new PropertyDescriptor.Builder()
+                    .name("Max Row Number")
+                    .description(
+                            "Specifies the max row number of each tablet. It can be updated by expression language.")
+                    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static {
+        descriptors.add(RECORD_READER_FACTORY);
+        descriptors.add(TIME_FIELD);
+        descriptors.add(SCHEMA);
+        descriptors.add(ALIGNED);
+        descriptors.add(MAX_ROW_NUMBER);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext processContext, ProcessSession processSession)
+            throws ProcessException {
+        final RecordReaderFactory recordParserFactory =
+                processContext
+                        .getProperty(RECORD_READER_FACTORY)
+                        .asControllerService(RecordReaderFactory.class);
+
+        String timeFieldProperty = processContext.getProperty(TIME_FIELD).getValue();
+        String schemaProperty = processContext.getProperty(SCHEMA).getValue();
+        String alignedProperty = processContext.getProperty(ALIGNED).getValue();
+        String maxRowNumberProperty = processContext.getProperty(MAX_ROW_NUMBER).getValue();
+
+        FlowFile flowFile = processSession.get();
+
+        String timeField = timeFieldProperty != null ? timeFieldProperty : "Time";
+        Boolean aligned = alignedProperty != null ? Boolean.valueOf(alignedProperty) : false;
+        int maxRowNumber = maxRowNumberProperty != null ? Integer.valueOf(maxRowNumberProperty) : 1024;
+
+        try (final InputStream inputStream = processSession.read(flowFile);
+             final RecordReader recordReader =
+                     recordParserFactory.createRecordReader(flowFile, inputStream, getLogger())) {
+            if (flowFile == null) {
+                inputStream.close();
+                recordReader.close();
+                processSession.transfer(flowFile, REL_SUCCESS);
+                return;
+            }
+
+            HashMap<String, Tablet> tablets;
+            boolean needInitFormatter;
+            Schema schema;
+            Tuple<Boolean, String> result;
+
+            result =
+                    schemaProperty != null
+                            ? validateSchemaAttribute(schemaProperty)
+                            : validateSchema(recordReader.getSchema());
+
+            if (!result.getKey()) {
+                getLogger().error(result.getValue());
+                inputStream.close();
+                recordReader.close();
+                processSession.transfer(flowFile, REL_FAILURE);
+                return;
+            } else {
+                if (result.getValue() != null) {
+                    getLogger().warn(result.getValue());
+                }
+            }
+
+            schema =
+                    schemaProperty != null
+                            ? new ObjectMapper().readValue(schemaProperty, Schema.class)
+                            : convertSchema(recordReader.getSchema());
+
+            List<String> fieldNames = schema.getFieldNames();
+
+            needInitFormatter = schema.getTimeType() != Schema.TimeType.LONG;
+
+            tablets = generateTablets(schema, maxRowNumber);
+            SimpleDateFormat timeFormatter = null;
+
+            Record record;
+
+            while ((record = recordReader.nextRecord()) != null) {
+                Object[] values = record.getValues();
+                if (timeFormatter == null && needInitFormatter) {
+                    timeFormatter = initFormatter((String) values[0]);
+                    if (timeFormatter == null) {
+                        getLogger().error("The format of time is not supported.");

Review Comment:
   This message should include some additional details, such as the FlowFile and record number:
   ```suggestion
                           getLogger().error("{} Record [{}] time format not supported", flowFile, recordNumber);
   ```



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDB.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.io.InputStream;
+import java.text.SimpleDateFormat;
+import java.util.Set;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.processors.model.Schema;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.Tuple;
+
+@Tags({"iotdb", "insert", "tablet"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "This is a record aware processor that reads the content of the incoming FlowFile as individual records using the "
+                + "configured 'Record Reader' and writes them to Apache IoTDB using native interface.")
+public class PutIoTDB extends AbstractIoTDB {
+
+    static final PropertyDescriptor RECORD_READER_FACTORY =
+            new PropertyDescriptor.Builder()
+                    .name("Record Reader")
+                    .description(
+                            "Specifies the type of Record Reader controller service to use for parsing the incoming data "
+                                    + "and determining the schema")
+                    .identifiesControllerService(RecordReaderFactory.class)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor TIME_FIELD =
+            new PropertyDescriptor.Builder()
+                    .name("Time Field")
+                    .description(
+                            "The field name which represents time. It can be updated by expression language.")
+                    .defaultValue("Time")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor SCHEMA =
+            new PropertyDescriptor.Builder()
+                    .name("Schema")
+                    .description(
+                            "The schema that IoTDB needs doesn't support good by NiFi.\n"
+                                    + "Therefore, you can define the schema here.\n"
+                                    + "Besides, you can set encoding type and compression type by this method.\n"
+                                    + "If you don't set this property, the inferred schema will be used.\n"
+                                    + "It can be updated by expression language.")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor ALIGNED =
+            new PropertyDescriptor.Builder()
+                    .name("Aligned")
+                    .description("Whether using aligned interface? It can be updated by expression language.")
+                    .allowableValues(new String[] {"true", "false"})
+                    .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor MAX_ROW_NUMBER =
+            new PropertyDescriptor.Builder()
+                    .name("Max Row Number")
+                    .description(
+                            "Specifies the max row number of each tablet. It can be updated by expression language.")
+                    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static {
+        descriptors.add(RECORD_READER_FACTORY);
+        descriptors.add(TIME_FIELD);
+        descriptors.add(SCHEMA);
+        descriptors.add(ALIGNED);
+        descriptors.add(MAX_ROW_NUMBER);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext processContext, ProcessSession processSession)
+            throws ProcessException {
+        final RecordReaderFactory recordParserFactory =
+                processContext
+                        .getProperty(RECORD_READER_FACTORY)
+                        .asControllerService(RecordReaderFactory.class);
+
+        String timeFieldProperty = processContext.getProperty(TIME_FIELD).getValue();
+        String schemaProperty = processContext.getProperty(SCHEMA).getValue();
+        String alignedProperty = processContext.getProperty(ALIGNED).getValue();
+        String maxRowNumberProperty = processContext.getProperty(MAX_ROW_NUMBER).getValue();
+
+        FlowFile flowFile = processSession.get();
+
+        String timeField = timeFieldProperty != null ? timeFieldProperty : "Time";
+        Boolean aligned = alignedProperty != null ? Boolean.valueOf(alignedProperty) : false;
+        int maxRowNumber = maxRowNumberProperty != null ? Integer.valueOf(maxRowNumberProperty) : 1024;
+
+        try (final InputStream inputStream = processSession.read(flowFile);
+             final RecordReader recordReader =
+                     recordParserFactory.createRecordReader(flowFile, inputStream, getLogger())) {
+            if (flowFile == null) {
+                inputStream.close();
+                recordReader.close();
+                processSession.transfer(flowFile, REL_SUCCESS);
+                return;
+            }
+
+            HashMap<String, Tablet> tablets;
+            boolean needInitFormatter;
+            Schema schema;
+            Tuple<Boolean, String> result;
+
+            result =
+                    schemaProperty != null
+                            ? validateSchemaAttribute(schemaProperty)
+                            : validateSchema(recordReader.getSchema());
+
+            if (!result.getKey()) {
+                getLogger().error(result.getValue());
+                inputStream.close();
+                recordReader.close();
+                processSession.transfer(flowFile, REL_FAILURE);
+                return;
+            } else {
+                if (result.getValue() != null) {
+                    getLogger().warn(result.getValue());
+                }
+            }
+
+            schema =
+                    schemaProperty != null
+                            ? new ObjectMapper().readValue(schemaProperty, Schema.class)
+                            : convertSchema(recordReader.getSchema());
+
+            List<String> fieldNames = schema.getFieldNames();
+
+            needInitFormatter = schema.getTimeType() != Schema.TimeType.LONG;
+
+            tablets = generateTablets(schema, maxRowNumber);
+            SimpleDateFormat timeFormatter = null;
+
+            Record record;
+
+            while ((record = recordReader.nextRecord()) != null) {
+                Object[] values = record.getValues();
+                if (timeFormatter == null && needInitFormatter) {
+                    timeFormatter = initFormatter((String) values[0]);
+                    if (timeFormatter == null) {
+                        getLogger().error("The format of time is not supported.");
+                        inputStream.close();
+                        recordReader.close();
+                        processSession.transfer(flowFile, REL_FAILURE);
+                        return;
+                    }
+                }
+
+                long timestamp;
+                if (needInitFormatter) {
+                    timestamp = timeFormatter.parse((String) values[0]).getTime();
+                } else {
+                    timestamp = (Long) values[0];
+                }
+
+                boolean flag = false;
+
+                for (Map.Entry<String, Tablet> entry : tablets.entrySet()) {
+                    String device = entry.getKey();
+                    Tablet tablet = entry.getValue();
+                    int rowIndex = tablet.rowSize++;
+
+                    tablet.addTimestamp(rowIndex, timestamp);
+                    List<MeasurementSchema> measurements = tablet.getSchemas();
+                    for (MeasurementSchema measurement : measurements) {
+                        String tsName =
+                                new StringBuilder()
+                                        .append(device)
+                                        .append(".")
+                                        .append(measurement.getMeasurementId())
+                                        .toString();
+                        int valueIndex = fieldNames.indexOf(tsName) + 1;
+                        Object value;
+                        try {
+                            TSDataType type = measurement.getType();
+                            value = convertType(values[valueIndex], type);
+                        } catch (Exception e) {

Review Comment:
   It looks like this exception should be logged, at least at the debug level.



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/AbstractIoTDBUT.java:
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.processors.model.Schema;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.Tuple;
+import org.junit.Assert;

Review Comment:
   The JUnit 5 `Assertions` static methods should be used instead of JUnit 4.



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDB.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.io.InputStream;
+import java.text.SimpleDateFormat;
+import java.util.Set;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.processors.model.Schema;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.Tuple;
+
+@Tags({"iotdb", "insert", "tablet"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "This is a record aware processor that reads the content of the incoming FlowFile as individual records using the "
+                + "configured 'Record Reader' and writes them to Apache IoTDB using native interface.")
+public class PutIoTDB extends AbstractIoTDB {
+
+    static final PropertyDescriptor RECORD_READER_FACTORY =
+            new PropertyDescriptor.Builder()
+                    .name("Record Reader")
+                    .description(
+                            "Specifies the type of Record Reader controller service to use for parsing the incoming data "
+                                    + "and determining the schema")
+                    .identifiesControllerService(RecordReaderFactory.class)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor TIME_FIELD =
+            new PropertyDescriptor.Builder()
+                    .name("Time Field")
+                    .description(
+                            "The field name which represents time. It can be updated by expression language.")
+                    .defaultValue("Time")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor SCHEMA =
+            new PropertyDescriptor.Builder()
+                    .name("Schema")
+                    .description(
+                            "The schema that IoTDB needs doesn't support good by NiFi.\n"
+                                    + "Therefore, you can define the schema here.\n"
+                                    + "Besides, you can set encoding type and compression type by this method.\n"
+                                    + "If you don't set this property, the inferred schema will be used.\n"
+                                    + "It can be updated by expression language.")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor ALIGNED =
+            new PropertyDescriptor.Builder()
+                    .name("Aligned")
+                    .description("Whether using aligned interface? It can be updated by expression language.")
+                    .allowableValues(new String[] {"true", "false"})
+                    .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor MAX_ROW_NUMBER =
+            new PropertyDescriptor.Builder()
+                    .name("Max Row Number")
+                    .description(
+                            "Specifies the max row number of each tablet. It can be updated by expression language.")
+                    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static {
+        descriptors.add(RECORD_READER_FACTORY);
+        descriptors.add(TIME_FIELD);
+        descriptors.add(SCHEMA);
+        descriptors.add(ALIGNED);
+        descriptors.add(MAX_ROW_NUMBER);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext processContext, ProcessSession processSession)
+            throws ProcessException {
+        final RecordReaderFactory recordParserFactory =
+                processContext
+                        .getProperty(RECORD_READER_FACTORY)
+                        .asControllerService(RecordReaderFactory.class);
+
+        String timeFieldProperty = processContext.getProperty(TIME_FIELD).getValue();
+        String schemaProperty = processContext.getProperty(SCHEMA).getValue();
+        String alignedProperty = processContext.getProperty(ALIGNED).getValue();
+        String maxRowNumberProperty = processContext.getProperty(MAX_ROW_NUMBER).getValue();
+
+        FlowFile flowFile = processSession.get();
+
+        String timeField = timeFieldProperty != null ? timeFieldProperty : "Time";
+        Boolean aligned = alignedProperty != null ? Boolean.valueOf(alignedProperty) : false;

Review Comment:
   ```suggestion
           final boolean aligned = alignedProperty != null ? Boolean.valueOf(alignedProperty) : false;
   ```



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDB.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.io.InputStream;
+import java.text.SimpleDateFormat;
+import java.util.Set;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.processors.model.Schema;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.Tuple;
+
+@Tags({"iotdb", "insert", "tablet"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "This is a record aware processor that reads the content of the incoming FlowFile as individual records using the "
+                + "configured 'Record Reader' and writes them to Apache IoTDB using native interface.")
+public class PutIoTDB extends AbstractIoTDB {
+
+    static final PropertyDescriptor RECORD_READER_FACTORY =
+            new PropertyDescriptor.Builder()
+                    .name("Record Reader")
+                    .description(
+                            "Specifies the type of Record Reader controller service to use for parsing the incoming data "
+                                    + "and determining the schema")
+                    .identifiesControllerService(RecordReaderFactory.class)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor TIME_FIELD =
+            new PropertyDescriptor.Builder()
+                    .name("Time Field")
+                    .description(
+                            "The field name which represents time. It can be updated by expression language.")
+                    .defaultValue("Time")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor SCHEMA =
+            new PropertyDescriptor.Builder()
+                    .name("Schema")
+                    .description(
+                            "The schema that IoTDB needs doesn't support good by NiFi.\n"
+                                    + "Therefore, you can define the schema here.\n"
+                                    + "Besides, you can set encoding type and compression type by this method.\n"
+                                    + "If you don't set this property, the inferred schema will be used.\n"
+                                    + "It can be updated by expression language.")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor ALIGNED =
+            new PropertyDescriptor.Builder()
+                    .name("Aligned")
+                    .description("Whether using aligned interface? It can be updated by expression language.")
+                    .allowableValues(new String[] {"true", "false"})
+                    .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor MAX_ROW_NUMBER =
+            new PropertyDescriptor.Builder()
+                    .name("Max Row Number")
+                    .description(
+                            "Specifies the max row number of each tablet. It can be updated by expression language.")
+                    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static {
+        descriptors.add(RECORD_READER_FACTORY);
+        descriptors.add(TIME_FIELD);
+        descriptors.add(SCHEMA);
+        descriptors.add(ALIGNED);
+        descriptors.add(MAX_ROW_NUMBER);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext processContext, ProcessSession processSession)
+            throws ProcessException {
+        final RecordReaderFactory recordParserFactory =
+                processContext
+                        .getProperty(RECORD_READER_FACTORY)
+                        .asControllerService(RecordReaderFactory.class);
+
+        String timeFieldProperty = processContext.getProperty(TIME_FIELD).getValue();
+        String schemaProperty = processContext.getProperty(SCHEMA).getValue();
+        String alignedProperty = processContext.getProperty(ALIGNED).getValue();
+        String maxRowNumberProperty = processContext.getProperty(MAX_ROW_NUMBER).getValue();
+
+        FlowFile flowFile = processSession.get();
+
+        String timeField = timeFieldProperty != null ? timeFieldProperty : "Time";
+        Boolean aligned = alignedProperty != null ? Boolean.valueOf(alignedProperty) : false;
+        int maxRowNumber = maxRowNumberProperty != null ? Integer.valueOf(maxRowNumberProperty) : 1024;
+
+        try (final InputStream inputStream = processSession.read(flowFile);
+             final RecordReader recordReader =
+                     recordParserFactory.createRecordReader(flowFile, inputStream, getLogger())) {
+            if (flowFile == null) {
+                inputStream.close();
+                recordReader.close();
+                processSession.transfer(flowFile, REL_SUCCESS);
+                return;
+            }
+
+            HashMap<String, Tablet> tablets;
+            boolean needInitFormatter;
+            Schema schema;
+            Tuple<Boolean, String> result;
+
+            result =
+                    schemaProperty != null
+                            ? validateSchemaAttribute(schemaProperty)
+                            : validateSchema(recordReader.getSchema());
+
+            if (!result.getKey()) {
+                getLogger().error(result.getValue());
+                inputStream.close();
+                recordReader.close();
+                processSession.transfer(flowFile, REL_FAILURE);
+                return;
+            } else {
+                if (result.getValue() != null) {
+                    getLogger().warn(result.getValue());
+                }
+            }
+
+            schema =
+                    schemaProperty != null
+                            ? new ObjectMapper().readValue(schemaProperty, Schema.class)
+                            : convertSchema(recordReader.getSchema());
+
+            List<String> fieldNames = schema.getFieldNames();
+
+            needInitFormatter = schema.getTimeType() != Schema.TimeType.LONG;
+
+            tablets = generateTablets(schema, maxRowNumber);
+            SimpleDateFormat timeFormatter = null;
+
+            Record record;
+
+            while ((record = recordReader.nextRecord()) != null) {
+                Object[] values = record.getValues();
+                if (timeFormatter == null && needInitFormatter) {
+                    timeFormatter = initFormatter((String) values[0]);
+                    if (timeFormatter == null) {
+                        getLogger().error("The format of time is not supported.");
+                        inputStream.close();
+                        recordReader.close();
+                        processSession.transfer(flowFile, REL_FAILURE);
+                        return;
+                    }
+                }
+
+                long timestamp;
+                if (needInitFormatter) {
+                    timestamp = timeFormatter.parse((String) values[0]).getTime();
+                } else {
+                    timestamp = (Long) values[0];
+                }
+
+                boolean flag = false;
+
+                for (Map.Entry<String, Tablet> entry : tablets.entrySet()) {
+                    String device = entry.getKey();
+                    Tablet tablet = entry.getValue();
+                    int rowIndex = tablet.rowSize++;
+
+                    tablet.addTimestamp(rowIndex, timestamp);
+                    List<MeasurementSchema> measurements = tablet.getSchemas();
+                    for (MeasurementSchema measurement : measurements) {
+                        String tsName =
+                                new StringBuilder()
+                                        .append(device)
+                                        .append(".")
+                                        .append(measurement.getMeasurementId())
+                                        .toString();
+                        int valueIndex = fieldNames.indexOf(tsName) + 1;
+                        Object value;
+                        try {
+                            TSDataType type = measurement.getType();
+                            value = convertType(values[valueIndex], type);
+                        } catch (Exception e) {
+                            value = null;
+                        }
+                        tablet.addValue(measurement.getMeasurementId(), rowIndex, value);
+                    }
+                    flag = tablet.rowSize == tablet.getMaxRowNumber();
+                }

Review Comment:
   It would be helpful to refactor this method into smaller component parts to avoid deeply-nested loops.



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDB.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.io.InputStream;
+import java.text.SimpleDateFormat;
+import java.util.Set;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.processors.model.Schema;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.Tuple;
+
+@Tags({"iotdb", "insert", "tablet"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "This is a record aware processor that reads the content of the incoming FlowFile as individual records using the "
+                + "configured 'Record Reader' and writes them to Apache IoTDB using native interface.")
+public class PutIoTDB extends AbstractIoTDB {
+
+    static final PropertyDescriptor RECORD_READER_FACTORY =
+            new PropertyDescriptor.Builder()
+                    .name("Record Reader")
+                    .description(
+                            "Specifies the type of Record Reader controller service to use for parsing the incoming data "
+                                    + "and determining the schema")
+                    .identifiesControllerService(RecordReaderFactory.class)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor TIME_FIELD =
+            new PropertyDescriptor.Builder()
+                    .name("Time Field")
+                    .description(
+                            "The field name which represents time. It can be updated by expression language.")
+                    .defaultValue("Time")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor SCHEMA =
+            new PropertyDescriptor.Builder()
+                    .name("Schema")
+                    .description(
+                            "The schema that IoTDB needs doesn't support good by NiFi.\n"
+                                    + "Therefore, you can define the schema here.\n"
+                                    + "Besides, you can set encoding type and compression type by this method.\n"
+                                    + "If you don't set this property, the inferred schema will be used.\n"
+                                    + "It can be updated by expression language.")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor ALIGNED =
+            new PropertyDescriptor.Builder()
+                    .name("Aligned")
+                    .description("Whether using aligned interface? It can be updated by expression language.")
+                    .allowableValues(new String[] {"true", "false"})
+                    .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor MAX_ROW_NUMBER =
+            new PropertyDescriptor.Builder()
+                    .name("Max Row Number")
+                    .description(
+                            "Specifies the max row number of each tablet. It can be updated by expression language.")
+                    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static {
+        descriptors.add(RECORD_READER_FACTORY);
+        descriptors.add(TIME_FIELD);
+        descriptors.add(SCHEMA);
+        descriptors.add(ALIGNED);
+        descriptors.add(MAX_ROW_NUMBER);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext processContext, ProcessSession processSession)
+            throws ProcessException {
+        final RecordReaderFactory recordParserFactory =
+                processContext
+                        .getProperty(RECORD_READER_FACTORY)
+                        .asControllerService(RecordReaderFactory.class);
+
+        String timeFieldProperty = processContext.getProperty(TIME_FIELD).getValue();
+        String schemaProperty = processContext.getProperty(SCHEMA).getValue();
+        String alignedProperty = processContext.getProperty(ALIGNED).getValue();
+        String maxRowNumberProperty = processContext.getProperty(MAX_ROW_NUMBER).getValue();
+
+        FlowFile flowFile = processSession.get();
+
+        String timeField = timeFieldProperty != null ? timeFieldProperty : "Time";
+        Boolean aligned = alignedProperty != null ? Boolean.valueOf(alignedProperty) : false;
+        int maxRowNumber = maxRowNumberProperty != null ? Integer.valueOf(maxRowNumberProperty) : 1024;
+
+        try (final InputStream inputStream = processSession.read(flowFile);
+             final RecordReader recordReader =
+                     recordParserFactory.createRecordReader(flowFile, inputStream, getLogger())) {
+            if (flowFile == null) {
+                inputStream.close();
+                recordReader.close();
+                processSession.transfer(flowFile, REL_SUCCESS);
+                return;
+            }
+
+            HashMap<String, Tablet> tablets;
+            boolean needInitFormatter;
+            Schema schema;
+            Tuple<Boolean, String> result;
+
+            result =
+                    schemaProperty != null
+                            ? validateSchemaAttribute(schemaProperty)
+                            : validateSchema(recordReader.getSchema());
+
+            if (!result.getKey()) {
+                getLogger().error(result.getValue());
+                inputStream.close();
+                recordReader.close();
+                processSession.transfer(flowFile, REL_FAILURE);
+                return;
+            } else {
+                if (result.getValue() != null) {
+                    getLogger().warn(result.getValue());
+                }
+            }
+
+            schema =
+                    schemaProperty != null
+                            ? new ObjectMapper().readValue(schemaProperty, Schema.class)
+                            : convertSchema(recordReader.getSchema());
+
+            List<String> fieldNames = schema.getFieldNames();
+
+            needInitFormatter = schema.getTimeType() != Schema.TimeType.LONG;
+
+            tablets = generateTablets(schema, maxRowNumber);
+            SimpleDateFormat timeFormatter = null;
+
+            Record record;
+
+            while ((record = recordReader.nextRecord()) != null) {
+                Object[] values = record.getValues();
+                if (timeFormatter == null && needInitFormatter) {
+                    timeFormatter = initFormatter((String) values[0]);
+                    if (timeFormatter == null) {
+                        getLogger().error("The format of time is not supported.");
+                        inputStream.close();
+                        recordReader.close();
+                        processSession.transfer(flowFile, REL_FAILURE);
+                        return;
+                    }
+                }
+
+                long timestamp;
+                if (needInitFormatter) {
+                    timestamp = timeFormatter.parse((String) values[0]).getTime();
+                } else {
+                    timestamp = (Long) values[0];
+                }
+
+                boolean flag = false;
+
+                for (Map.Entry<String, Tablet> entry : tablets.entrySet()) {
+                    String device = entry.getKey();
+                    Tablet tablet = entry.getValue();
+                    int rowIndex = tablet.rowSize++;
+
+                    tablet.addTimestamp(rowIndex, timestamp);
+                    List<MeasurementSchema> measurements = tablet.getSchemas();
+                    for (MeasurementSchema measurement : measurements) {
+                        String tsName =
+                                new StringBuilder()
+                                        .append(device)
+                                        .append(".")
+                                        .append(measurement.getMeasurementId())
+                                        .toString();
+                        int valueIndex = fieldNames.indexOf(tsName) + 1;
+                        Object value;
+                        try {
+                            TSDataType type = measurement.getType();
+                            value = convertType(values[valueIndex], type);
+                        } catch (Exception e) {
+                            value = null;
+                        }
+                        tablet.addValue(measurement.getMeasurementId(), rowIndex, value);
+                    }
+                    flag = tablet.rowSize == tablet.getMaxRowNumber();
+                }
+                if (flag) {
+                    if (aligned) {
+                        session.get().insertAlignedTablets(tablets);
+                    } else {
+                        session.get().insertTablets(tablets);
+                    }
+                    tablets.values().forEach(tablet -> tablet.reset());
+                }
+            }
+
+            AtomicBoolean hasRest = new AtomicBoolean(false);
+            tablets.forEach(
+                    (device, tablet) -> {
+                        if (hasRest.get() == false && tablet.rowSize != 0) {
+                            hasRest.set(true);
+                        }
+                    });
+            if (hasRest.get()) {
+                if (aligned) {
+                    session.get().insertAlignedTablets(tablets);
+                } else {
+                    session.get().insertTablets(tablets);
+                }
+            }
+
+            inputStream.close();
+            recordReader.close();
+            processSession.transfer(flowFile, REL_SUCCESS);
+        } catch (Exception e) {
+            getLogger().error(e.getMessage());

Review Comment:
   The exception should be logged with a message to ensure the full stack trace is available:
   ```suggestion
               getLogger().error("Processing failed {}", flowFile, e);
   ```



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDB.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.io.InputStream;
+import java.text.SimpleDateFormat;
+import java.util.Set;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.processors.model.Schema;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.Tuple;
+
+@Tags({"iotdb", "insert", "tablet"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "This is a record aware processor that reads the content of the incoming FlowFile as individual records using the "
+                + "configured 'Record Reader' and writes them to Apache IoTDB using native interface.")
+public class PutIoTDB extends AbstractIoTDB {
+
+    static final PropertyDescriptor RECORD_READER_FACTORY =
+            new PropertyDescriptor.Builder()
+                    .name("Record Reader")
+                    .description(
+                            "Specifies the type of Record Reader controller service to use for parsing the incoming data "
+                                    + "and determining the schema")
+                    .identifiesControllerService(RecordReaderFactory.class)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor TIME_FIELD =
+            new PropertyDescriptor.Builder()
+                    .name("Time Field")
+                    .description(
+                            "The field name which represents time. It can be updated by expression language.")
+                    .defaultValue("Time")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor SCHEMA =
+            new PropertyDescriptor.Builder()
+                    .name("Schema")
+                    .description(
+                            "The schema that IoTDB needs doesn't support good by NiFi.\n"
+                                    + "Therefore, you can define the schema here.\n"
+                                    + "Besides, you can set encoding type and compression type by this method.\n"
+                                    + "If you don't set this property, the inferred schema will be used.\n"
+                                    + "It can be updated by expression language.")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor ALIGNED =
+            new PropertyDescriptor.Builder()
+                    .name("Aligned")
+                    .description("Whether using aligned interface? It can be updated by expression language.")
+                    .allowableValues(new String[] {"true", "false"})
+                    .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor MAX_ROW_NUMBER =
+            new PropertyDescriptor.Builder()
+                    .name("Max Row Number")
+                    .description(
+                            "Specifies the max row number of each tablet. It can be updated by expression language.")
+                    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static {
+        descriptors.add(RECORD_READER_FACTORY);
+        descriptors.add(TIME_FIELD);
+        descriptors.add(SCHEMA);
+        descriptors.add(ALIGNED);
+        descriptors.add(MAX_ROW_NUMBER);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext processContext, ProcessSession processSession)
+            throws ProcessException {
+        final RecordReaderFactory recordParserFactory =
+                processContext
+                        .getProperty(RECORD_READER_FACTORY)
+                        .asControllerService(RecordReaderFactory.class);
+
+        String timeFieldProperty = processContext.getProperty(TIME_FIELD).getValue();
+        String schemaProperty = processContext.getProperty(SCHEMA).getValue();
+        String alignedProperty = processContext.getProperty(ALIGNED).getValue();
+        String maxRowNumberProperty = processContext.getProperty(MAX_ROW_NUMBER).getValue();
+
+        FlowFile flowFile = processSession.get();
+
+        String timeField = timeFieldProperty != null ? timeFieldProperty : "Time";
+        Boolean aligned = alignedProperty != null ? Boolean.valueOf(alignedProperty) : false;
+        int maxRowNumber = maxRowNumberProperty != null ? Integer.valueOf(maxRowNumberProperty) : 1024;
+
+        try (final InputStream inputStream = processSession.read(flowFile);
+             final RecordReader recordReader =
+                     recordParserFactory.createRecordReader(flowFile, inputStream, getLogger())) {
+            if (flowFile == null) {
+                inputStream.close();
+                recordReader.close();
+                processSession.transfer(flowFile, REL_SUCCESS);
+                return;
+            }
+
+            HashMap<String, Tablet> tablets;
+            boolean needInitFormatter;
+            Schema schema;
+            Tuple<Boolean, String> result;
+
+            result =
+                    schemaProperty != null
+                            ? validateSchemaAttribute(schemaProperty)
+                            : validateSchema(recordReader.getSchema());
+
+            if (!result.getKey()) {
+                getLogger().error(result.getValue());
+                inputStream.close();
+                recordReader.close();

Review Comment:
   Closing these objects should be unnecessary with the use of try-with-resources.



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDB.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.io.InputStream;
+import java.text.SimpleDateFormat;
+import java.util.Set;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.processors.model.Schema;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.Tuple;
+
+@Tags({"iotdb", "insert", "tablet"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "This is a record aware processor that reads the content of the incoming FlowFile as individual records using the "
+                + "configured 'Record Reader' and writes them to Apache IoTDB using native interface.")
+public class PutIoTDB extends AbstractIoTDB {
+
+    static final PropertyDescriptor RECORD_READER_FACTORY =
+            new PropertyDescriptor.Builder()
+                    .name("Record Reader")
+                    .description(
+                            "Specifies the type of Record Reader controller service to use for parsing the incoming data "
+                                    + "and determining the schema")
+                    .identifiesControllerService(RecordReaderFactory.class)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor TIME_FIELD =
+            new PropertyDescriptor.Builder()
+                    .name("Time Field")
+                    .description(
+                            "The field name which represents time. It can be updated by expression language.")
+                    .defaultValue("Time")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor SCHEMA =
+            new PropertyDescriptor.Builder()
+                    .name("Schema")
+                    .description(
+                            "The schema that IoTDB needs doesn't support good by NiFi.\n"
+                                    + "Therefore, you can define the schema here.\n"
+                                    + "Besides, you can set encoding type and compression type by this method.\n"
+                                    + "If you don't set this property, the inferred schema will be used.\n"
+                                    + "It can be updated by expression language.")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor ALIGNED =
+            new PropertyDescriptor.Builder()
+                    .name("Aligned")
+                    .description("Whether using aligned interface? It can be updated by expression language.")
+                    .allowableValues(new String[] {"true", "false"})
+                    .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor MAX_ROW_NUMBER =
+            new PropertyDescriptor.Builder()
+                    .name("Max Row Number")
+                    .description(
+                            "Specifies the max row number of each tablet. It can be updated by expression language.")
+                    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static {
+        descriptors.add(RECORD_READER_FACTORY);
+        descriptors.add(TIME_FIELD);
+        descriptors.add(SCHEMA);
+        descriptors.add(ALIGNED);
+        descriptors.add(MAX_ROW_NUMBER);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext processContext, ProcessSession processSession)
+            throws ProcessException {
+        final RecordReaderFactory recordParserFactory =
+                processContext
+                        .getProperty(RECORD_READER_FACTORY)
+                        .asControllerService(RecordReaderFactory.class);
+
+        String timeFieldProperty = processContext.getProperty(TIME_FIELD).getValue();
+        String schemaProperty = processContext.getProperty(SCHEMA).getValue();
+        String alignedProperty = processContext.getProperty(ALIGNED).getValue();
+        String maxRowNumberProperty = processContext.getProperty(MAX_ROW_NUMBER).getValue();
+
+        FlowFile flowFile = processSession.get();
+
+        String timeField = timeFieldProperty != null ? timeFieldProperty : "Time";
+        Boolean aligned = alignedProperty != null ? Boolean.valueOf(alignedProperty) : false;
+        int maxRowNumber = maxRowNumberProperty != null ? Integer.valueOf(maxRowNumberProperty) : 1024;
+
+        try (final InputStream inputStream = processSession.read(flowFile);
+             final RecordReader recordReader =
+                     recordParserFactory.createRecordReader(flowFile, inputStream, getLogger())) {
+            if (flowFile == null) {
+                inputStream.close();
+                recordReader.close();
+                processSession.transfer(flowFile, REL_SUCCESS);
+                return;
+            }
+
+            HashMap<String, Tablet> tablets;

Review Comment:
   This variable, and perhaps others, should be declared and assigned in the same location for better readability.



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/PutIoTDBIT.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;

Review Comment:
   Should be changed to JUnit 5 `Assertions` and it would be better to use static imports for assert methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org