You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/10/07 20:10:30 UTC

[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6416: NIFI-10234 implement PutIoTDB

exceptionfactory commented on code in PR #6416:
URL: https://github.com/apache/nifi/pull/6416#discussion_r990412122


##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processors.model.Field;
+import org.apache.nifi.processors.model.IoTDBSchema;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.model.ValidationResult;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public abstract class AbstractIoTDB extends AbstractProcessor {
+    private static final int DEFAULT_IOTDB_PORT = 6667;
+
+    protected static ObjectMapper mapper = new ObjectMapper();
+
+    private static final String TIME_TYPE = "timeType";
+    private static final String FIELDS = "fields";
+    private static final String TIME = "Time";
+    private static final String PREFIX = "root.";
+
+    private static Map<RecordFieldType, TSDataType> typeMap =
+            new HashMap<>();
+
+    static final Set<RecordFieldType> supportedType =
+            new HashSet<>();
+
+    static final Set<RecordFieldType> supportedTimeType =
+            new HashSet<>();
+
+    protected static final String[] STRING_TIME_FORMAT =
+            new String[]{
+                    "yyyy-MM-dd HH:mm:ss.SSSX",
+                    "yyyy/MM/dd HH:mm:ss.SSSX",
+                    "yyyy.MM.dd HH:mm:ss.SSSX",
+                    "yyyy-MM-dd HH:mm:ssX",
+                    "yyyy/MM/dd HH:mm:ssX",
+                    "yyyy.MM.dd HH:mm:ssX",
+                    "yyyy-MM-dd HH:mm:ss.SSSz",
+                    "yyyy/MM/dd HH:mm:ss.SSSz",
+                    "yyyy.MM.dd HH:mm:ss.SSSz",
+                    "yyyy-MM-dd HH:mm:ssz",
+                    "yyyy/MM/dd HH:mm:ssz",
+                    "yyyy.MM.dd HH:mm:ssz",
+                    "yyyy-MM-dd HH:mm:ss.SSS",
+                    "yyyy/MM/dd HH:mm:ss.SSS",
+                    "yyyy.MM.dd HH:mm:ss.SSS",
+                    "yyyy-MM-dd HH:mm:ss",
+                    "yyyy/MM/dd HH:mm:ss",
+                    "yyyy.MM.dd HH:mm:ss",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSX",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSX",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSX",
+                    "yyyy-MM-dd'T'HH:mm:ssX",
+                    "yyyy/MM/dd'T'HH:mm:ssX",
+                    "yyyy.MM.dd'T'HH:mm:ssX",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSz",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSz",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSz",
+                    "yyyy-MM-dd'T'HH:mm:ssz",
+                    "yyyy/MM/dd'T'HH:mm:ssz",
+                    "yyyy.MM.dd'T'HH:mm:ssz",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSS",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSS",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSS",
+                    "yyyy-MM-dd'T'HH:mm:ss",
+                    "yyyy/MM/dd'T'HH:mm:ss",
+                    "yyyy.MM.dd'T'HH:mm:ss"
+            };
+
+    static final PropertyDescriptor IOTDB_HOST =
+            new PropertyDescriptor.Builder()
+                    .name("Host")
+                    .description("The host of IoTDB.")
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor IOTDB_PORT =
+            new PropertyDescriptor.Builder()
+                    .name("Port")
+                    .description("The port of IoTDB.")
+                    .defaultValue(String.valueOf(DEFAULT_IOTDB_PORT))
+                    .required(true)

Review Comment:
   The `Port` property needs a validator associated, see `StandardValidators.PORT_VALIDATOR`.
   ```suggestion
                       .required(true)
                       .addValidator(StandardValidators.PORT_VALIDATOR)
   ```



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processors.model.Field;
+import org.apache.nifi.processors.model.IoTDBSchema;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.model.ValidationResult;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public abstract class AbstractIoTDB extends AbstractProcessor {
+    private static final int DEFAULT_IOTDB_PORT = 6667;
+
+    protected static ObjectMapper mapper = new ObjectMapper();
+
+    private static final String TIME_TYPE = "timeType";
+    private static final String FIELDS = "fields";
+    private static final String TIME = "Time";
+    private static final String PREFIX = "root.";
+
+    private static Map<RecordFieldType, TSDataType> typeMap =
+            new HashMap<>();
+
+    static final Set<RecordFieldType> supportedType =
+            new HashSet<>();
+
+    static final Set<RecordFieldType> supportedTimeType =
+            new HashSet<>();
+
+    protected static final String[] STRING_TIME_FORMAT =
+            new String[]{
+                    "yyyy-MM-dd HH:mm:ss.SSSX",
+                    "yyyy/MM/dd HH:mm:ss.SSSX",
+                    "yyyy.MM.dd HH:mm:ss.SSSX",
+                    "yyyy-MM-dd HH:mm:ssX",
+                    "yyyy/MM/dd HH:mm:ssX",
+                    "yyyy.MM.dd HH:mm:ssX",
+                    "yyyy-MM-dd HH:mm:ss.SSSz",
+                    "yyyy/MM/dd HH:mm:ss.SSSz",
+                    "yyyy.MM.dd HH:mm:ss.SSSz",
+                    "yyyy-MM-dd HH:mm:ssz",
+                    "yyyy/MM/dd HH:mm:ssz",
+                    "yyyy.MM.dd HH:mm:ssz",
+                    "yyyy-MM-dd HH:mm:ss.SSS",
+                    "yyyy/MM/dd HH:mm:ss.SSS",
+                    "yyyy.MM.dd HH:mm:ss.SSS",
+                    "yyyy-MM-dd HH:mm:ss",
+                    "yyyy/MM/dd HH:mm:ss",
+                    "yyyy.MM.dd HH:mm:ss",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSX",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSX",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSX",
+                    "yyyy-MM-dd'T'HH:mm:ssX",
+                    "yyyy/MM/dd'T'HH:mm:ssX",
+                    "yyyy.MM.dd'T'HH:mm:ssX",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSz",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSz",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSz",
+                    "yyyy-MM-dd'T'HH:mm:ssz",
+                    "yyyy/MM/dd'T'HH:mm:ssz",
+                    "yyyy.MM.dd'T'HH:mm:ssz",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSS",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSS",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSS",
+                    "yyyy-MM-dd'T'HH:mm:ss",
+                    "yyyy/MM/dd'T'HH:mm:ss",
+                    "yyyy.MM.dd'T'HH:mm:ss"
+            };
+
+    static final PropertyDescriptor IOTDB_HOST =
+            new PropertyDescriptor.Builder()
+                    .name("Host")
+                    .description("The host of IoTDB.")
+                    .required(true)

Review Comment:
   The `Host` property needs a validator configured, see `StandardValidators.NON_BLANK_VALIDATOR`.



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processors.model.Field;
+import org.apache.nifi.processors.model.IoTDBSchema;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.model.ValidationResult;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public abstract class AbstractIoTDB extends AbstractProcessor {
+    private static final int DEFAULT_IOTDB_PORT = 6667;
+
+    protected static ObjectMapper mapper = new ObjectMapper();
+
+    private static final String TIME_TYPE = "timeType";
+    private static final String FIELDS = "fields";
+    private static final String TIME = "Time";
+    private static final String PREFIX = "root.";
+
+    private static Map<RecordFieldType, TSDataType> typeMap =
+            new HashMap<>();
+
+    static final Set<RecordFieldType> supportedType =
+            new HashSet<>();
+
+    static final Set<RecordFieldType> supportedTimeType =
+            new HashSet<>();
+
+    protected static final String[] STRING_TIME_FORMAT =
+            new String[]{
+                    "yyyy-MM-dd HH:mm:ss.SSSX",
+                    "yyyy/MM/dd HH:mm:ss.SSSX",
+                    "yyyy.MM.dd HH:mm:ss.SSSX",
+                    "yyyy-MM-dd HH:mm:ssX",
+                    "yyyy/MM/dd HH:mm:ssX",
+                    "yyyy.MM.dd HH:mm:ssX",
+                    "yyyy-MM-dd HH:mm:ss.SSSz",
+                    "yyyy/MM/dd HH:mm:ss.SSSz",
+                    "yyyy.MM.dd HH:mm:ss.SSSz",
+                    "yyyy-MM-dd HH:mm:ssz",
+                    "yyyy/MM/dd HH:mm:ssz",
+                    "yyyy.MM.dd HH:mm:ssz",
+                    "yyyy-MM-dd HH:mm:ss.SSS",
+                    "yyyy/MM/dd HH:mm:ss.SSS",
+                    "yyyy.MM.dd HH:mm:ss.SSS",
+                    "yyyy-MM-dd HH:mm:ss",
+                    "yyyy/MM/dd HH:mm:ss",
+                    "yyyy.MM.dd HH:mm:ss",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSX",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSX",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSX",
+                    "yyyy-MM-dd'T'HH:mm:ssX",
+                    "yyyy/MM/dd'T'HH:mm:ssX",
+                    "yyyy.MM.dd'T'HH:mm:ssX",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSz",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSz",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSz",
+                    "yyyy-MM-dd'T'HH:mm:ssz",
+                    "yyyy/MM/dd'T'HH:mm:ssz",
+                    "yyyy.MM.dd'T'HH:mm:ssz",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSS",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSS",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSS",
+                    "yyyy-MM-dd'T'HH:mm:ss",
+                    "yyyy/MM/dd'T'HH:mm:ss",
+                    "yyyy.MM.dd'T'HH:mm:ss"
+            };
+
+    static final PropertyDescriptor IOTDB_HOST =
+            new PropertyDescriptor.Builder()
+                    .name("Host")
+                    .description("The host of IoTDB.")
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor IOTDB_PORT =
+            new PropertyDescriptor.Builder()
+                    .name("Port")
+                    .description("The port of IoTDB.")
+                    .defaultValue(String.valueOf(DEFAULT_IOTDB_PORT))
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor USERNAME =
+            new PropertyDescriptor.Builder()
+                    .name("Username")
+                    .description("Username to access the IoTDB.")
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor PASSWORD =
+            new PropertyDescriptor.Builder()
+                    .name("Password")
+                    .description("Password to access the IoTDB.")
+                    .required(true)
+                    .sensitive(true)
+                    .build();
+
+    protected static List<PropertyDescriptor> descriptors = new ArrayList<>();
+
+    protected static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("files that were successfully processed")
+                    .build();
+    protected static final Relationship REL_FAILURE =
+            new Relationship.Builder()
+                    .name("failure")
+                    .description("files that were not successfully processed")
+                    .build();
+
+    protected static Set<Relationship> relationships = new HashSet<>();
+
+    static {
+        descriptors.add(IOTDB_HOST);
+        descriptors.add(IOTDB_PORT);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+
+        typeMap.put(RecordFieldType.STRING, TSDataType.TEXT);
+        typeMap.put(RecordFieldType.BOOLEAN, TSDataType.BOOLEAN);
+        typeMap.put(RecordFieldType.INT, TSDataType.INT32);
+        typeMap.put(RecordFieldType.LONG, TSDataType.INT64);
+        typeMap.put(RecordFieldType.FLOAT, TSDataType.FLOAT);
+        typeMap.put(RecordFieldType.DOUBLE, TSDataType.DOUBLE);
+
+        supportedType.add(RecordFieldType.BOOLEAN);
+        supportedType.add(RecordFieldType.STRING);
+        supportedType.add(RecordFieldType.INT);
+        supportedType.add(RecordFieldType.LONG);
+        supportedType.add(RecordFieldType.FLOAT);
+        supportedType.add(RecordFieldType.DOUBLE);
+
+        supportedTimeType.add(RecordFieldType.STRING);
+        supportedTimeType.add(RecordFieldType.LONG);
+    }
+
+    protected final AtomicReference<Session> session = new AtomicReference<>(null);
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) throws IoTDBConnectionException {
+        connectToIoTDB(context);
+    }
+
+    void connectToIoTDB(ProcessContext context) throws IoTDBConnectionException {
+        if (session.get() == null) {
+            final String host = context.getProperty(IOTDB_HOST).getValue();
+            final int port = Integer.parseInt(context.getProperty(IOTDB_PORT).getValue());
+            final String username = context.getProperty(USERNAME).getValue();
+            final String password = context.getProperty(PASSWORD).getValue();
+
+            session.set(
+                    new Session.Builder()
+                            .host(host)
+                            .port(port)
+                            .username(username)
+                            .password(password)
+                            .build());
+            session.get().open();
+        }
+    }
+
+    public void stop(ProcessContext context) {
+        if (session.get() != null) {
+            try {
+                session.get().close();
+            } catch (IoTDBConnectionException e) {
+                getLogger().error("IoTDB disconnection failed", e);
+            }
+            session.set(null);
+        }
+    }
+
+    protected TSDataType getType(RecordFieldType type) {
+        return typeMap.get(type);
+    }
+
+    protected ValidationResult validateSchemaAttribute(String schemaAttribute) {
+        JsonNode schema = null;
+        try {
+            schema = mapper.readTree(schemaAttribute);
+        } catch (JsonProcessingException e) {
+            return new ValidationResult(false, e.getMessage());
+        }
+        Set<String> keySet = new HashSet<>();
+        schema.fieldNames().forEachRemaining(field -> keySet.add(field));
+
+        if (!keySet.contains(TIME_TYPE) || !keySet.contains(FIELDS)) {
+            String msg = "The JSON of schema must contain `timeType` and `fields`";
+            return new ValidationResult(false, msg);
+        }
+
+        if (!IoTDBSchema.getSupportedTimeType().contains(schema.get(TIME_TYPE).asText())) {
+            String msg =
+                    String.format(
+                            "Unknown `timeType`: %s, there are only two options `LONG` and `STRING` for this property",
+                            schema.get(TIME_TYPE).asText());
+            return new ValidationResult(false, msg);
+        }
+
+        for (int i = 0; i < schema.get(FIELDS).size(); i++) {
+            JsonNode field = schema.get(FIELDS).get(i);
+            Set<String> fieldKeySet = new HashSet<>();
+
+            field.fieldNames().forEachRemaining(fieldName -> fieldKeySet.add(fieldName));
+            if (!fieldKeySet.contains("tsName") || !fieldKeySet.contains("dataType")) {
+                String msg = "`tsName` or `dataType` has not been set";
+                return new ValidationResult(false, msg);
+            }
+
+            if (!field.get("tsName").asText().startsWith(PREFIX)) {
+                String msg =
+                        String.format("The tsName `%s` is not start with 'root.'", field.get("tsName").asText());
+                return new ValidationResult(false, msg);
+            }
+
+            if (!Field.getSupportedDataType().contains(field.get("dataType").asText())) {
+                String msg =
+                        String.format(
+                                "Unknown `dataType`: %s. The supported dataTypes are %s",
+                                field.get("dataType").asText(), Field.getSupportedDataType());
+                return new ValidationResult(false, msg);
+            }
+
+            Set<String> supportedKeySet = new HashSet<>();
+            supportedKeySet.add("tsName");
+            supportedKeySet.add("dataType");
+            supportedKeySet.add("encoding");
+            supportedKeySet.add("compressionType");
+
+            HashSet<String> tmpKetSet = new HashSet<>();
+            tmpKetSet.addAll(supportedKeySet);
+            tmpKetSet.addAll(fieldKeySet);
+            tmpKetSet.removeAll(supportedKeySet);
+            if (!tmpKetSet.isEmpty()) {
+                String msg = "Unknown property or properties: " + tmpKetSet;
+                return new ValidationResult(false, msg);
+            }
+
+            if (fieldKeySet.contains("compressionType") && !fieldKeySet.contains("encoding")) {
+                String msg =
+                        "The `compressionType` has been set, but the `encoding` has not. The property `compressionType` will not take effect";
+                return new ValidationResult(true, msg);
+            }
+
+            if (field.get("encoding") != null
+                    && !Field.getSupportedEncoding().contains(field.get("encoding").asText())) {
+                String msg =
+                        String.format(
+                                "Unknown `encoding`: %s, The supported encoding types are %s",
+                                field.get("encoding").asText(), Field.getSupportedEncoding());
+                return new ValidationResult(false, msg);
+            }
+
+            if (field.get("compressionType") != null
+                    && !Field.getSupportedCompressionType().contains(field.get("compressionType"))) {
+                String msg =
+                        String.format(
+                                "Unknown `compressionType`: %s, The supported compressionType are %s",
+                                field.get("compressionType").asText(), Field.getSupportedCompressionType());
+                return new ValidationResult(false, msg);
+            }
+        }
+
+        return new ValidationResult(true, null);
+    }
+
+    protected ValidationResult validateSchema(RecordSchema recordSchema) {
+        List<String> fieldNames = recordSchema.getFieldNames();
+        List<DataType> dataTypes = recordSchema.getDataTypes();
+        if (!fieldNames.contains(TIME) || !TIME.equals(fieldNames.get(0))) {
+            return new ValidationResult(false, "The fields must contain `Time`, and it must be the first");
+        }
+        if (!supportedTimeType.contains(recordSchema.getDataType(TIME).get().getFieldType())) {
+            return new ValidationResult(false, "The dataType of `Time` must be `STRING` or `LONG`");
+        }
+        fieldNames.remove(TIME);
+        for (String fieldName : fieldNames) {
+            if (!fieldName.startsWith(PREFIX)) {
+                String msg = String.format("The tsName `%s` is not start with 'root.'", fieldName);
+                return new ValidationResult(false, msg);
+            }
+        }
+        for (DataType type : dataTypes) {
+            RecordFieldType dataType = type.getFieldType();
+            if (!supportedType.contains(dataType)) {
+                String msg =
+                        String.format(
+                                "Unknown `dataType`: %s. The supported dataTypes are %s",
+                                dataType.toString(), supportedType);
+                return new ValidationResult(false, msg);
+            }
+        }
+
+        return new ValidationResult(true, null);
+    }
+
+    protected Map<String, List<String>> parseSchema(List<String> filedNames) {
+        HashMap<String, List<String>> deviceMeasurementMap = new HashMap<>();
+        filedNames.stream()
+                .forEach(
+                        filed -> {
+                            String[] paths = filed.split("\\.");
+                            String device = StringUtils.join(paths, ".", 0, paths.length - 1);
+
+                            if (!deviceMeasurementMap.containsKey(device)) {
+                                deviceMeasurementMap.put(device, new ArrayList<>());
+                            }
+                            deviceMeasurementMap.get(device).add(paths[paths.length - 1]);
+                        });
+
+        return deviceMeasurementMap;
+    }
+
+    protected HashMap<String, Tablet> generateTablets(IoTDBSchema schema, int maxRowNumber) {
+        Map<String, List<String>> deviceMeasurementMap = parseSchema(schema.getFieldNames());
+        HashMap<String, Tablet> tablets = new HashMap<>();
+        deviceMeasurementMap.forEach(
+                (device, measurements) -> {
+                    ArrayList<MeasurementSchema> schemas = new ArrayList<>();
+                    for (String measurement : measurements) {
+                        String tsName = device + "." + measurement;
+                        TSDataType dataType = schema.getDataType(tsName);
+                        TSEncoding encoding = schema.getEncodingType(tsName);
+                        CompressionType compressionType = schema.getCompressionType(tsName);
+                        if (encoding == null) {
+                            schemas.add(new MeasurementSchema(measurement, dataType));
+                        } else if (compressionType == null) {
+                            schemas.add(new MeasurementSchema(measurement, dataType, encoding));
+                        } else {
+                            schemas.add(new MeasurementSchema(measurement, dataType, encoding, compressionType));
+                        }
+                    }
+                    Tablet tablet = new Tablet(device, schemas, maxRowNumber);
+                    tablets.put(device, tablet);
+                });
+        return tablets;
+    }
+
+    protected DateTimeFormatter initFormatter(String time) {
+        for (String format : STRING_TIME_FORMAT) {
+            try {
+                DateTimeFormatter.ofPattern(format).parse(time);
+                return DateTimeFormatter.ofPattern(format);
+            } catch (DateTimeParseException e) {
+
+            }
+        }
+        return null;
+    }
+
+    protected Object convertType(Object value, TSDataType type) {
+        switch (type) {
+            case TEXT:
+                return Binary.valueOf(String.valueOf(value));
+            case INT32:
+                return Integer.parseInt(value.toString());
+            case INT64:
+                return Long.parseLong(value.toString());
+            case FLOAT:
+                return Float.parseFloat(value.toString());
+            case DOUBLE:
+                return Double.parseDouble(value.toString());
+            case BOOLEAN:
+                return Boolean.parseBoolean(value.toString());
+            default:
+                return null;
+        }
+    }
+
+    protected IoTDBSchema convertSchema(RecordSchema recordSchema) {
+        String timeType =
+                recordSchema.getDataType(TIME).get().getFieldType() == RecordFieldType.LONG

Review Comment:
   The `get()` call does not include an `isPresent()` check, so it looks like this logic should be adjusted.



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processors.model.Field;
+import org.apache.nifi.processors.model.IoTDBSchema;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.model.ValidationResult;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public abstract class AbstractIoTDB extends AbstractProcessor {
+    private static final int DEFAULT_IOTDB_PORT = 6667;
+
+    protected static ObjectMapper mapper = new ObjectMapper();
+
+    private static final String TIME_TYPE = "timeType";
+    private static final String FIELDS = "fields";
+    private static final String TIME = "Time";
+    private static final String PREFIX = "root.";
+
+    private static Map<RecordFieldType, TSDataType> typeMap =
+            new HashMap<>();
+
+    static final Set<RecordFieldType> supportedType =
+            new HashSet<>();
+
+    static final Set<RecordFieldType> supportedTimeType =
+            new HashSet<>();
+
+    protected static final String[] STRING_TIME_FORMAT =
+            new String[]{
+                    "yyyy-MM-dd HH:mm:ss.SSSX",
+                    "yyyy/MM/dd HH:mm:ss.SSSX",
+                    "yyyy.MM.dd HH:mm:ss.SSSX",
+                    "yyyy-MM-dd HH:mm:ssX",
+                    "yyyy/MM/dd HH:mm:ssX",
+                    "yyyy.MM.dd HH:mm:ssX",
+                    "yyyy-MM-dd HH:mm:ss.SSSz",
+                    "yyyy/MM/dd HH:mm:ss.SSSz",
+                    "yyyy.MM.dd HH:mm:ss.SSSz",
+                    "yyyy-MM-dd HH:mm:ssz",
+                    "yyyy/MM/dd HH:mm:ssz",
+                    "yyyy.MM.dd HH:mm:ssz",
+                    "yyyy-MM-dd HH:mm:ss.SSS",
+                    "yyyy/MM/dd HH:mm:ss.SSS",
+                    "yyyy.MM.dd HH:mm:ss.SSS",
+                    "yyyy-MM-dd HH:mm:ss",
+                    "yyyy/MM/dd HH:mm:ss",
+                    "yyyy.MM.dd HH:mm:ss",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSX",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSX",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSX",
+                    "yyyy-MM-dd'T'HH:mm:ssX",
+                    "yyyy/MM/dd'T'HH:mm:ssX",
+                    "yyyy.MM.dd'T'HH:mm:ssX",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSz",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSz",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSz",
+                    "yyyy-MM-dd'T'HH:mm:ssz",
+                    "yyyy/MM/dd'T'HH:mm:ssz",
+                    "yyyy.MM.dd'T'HH:mm:ssz",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSS",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSS",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSS",
+                    "yyyy-MM-dd'T'HH:mm:ss",
+                    "yyyy/MM/dd'T'HH:mm:ss",
+                    "yyyy.MM.dd'T'HH:mm:ss"
+            };
+
+    static final PropertyDescriptor IOTDB_HOST =
+            new PropertyDescriptor.Builder()
+                    .name("Host")
+                    .description("The host of IoTDB.")
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor IOTDB_PORT =
+            new PropertyDescriptor.Builder()
+                    .name("Port")
+                    .description("The port of IoTDB.")
+                    .defaultValue(String.valueOf(DEFAULT_IOTDB_PORT))
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor USERNAME =
+            new PropertyDescriptor.Builder()
+                    .name("Username")
+                    .description("Username to access the IoTDB.")
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor PASSWORD =
+            new PropertyDescriptor.Builder()
+                    .name("Password")
+                    .description("Password to access the IoTDB.")
+                    .required(true)
+                    .sensitive(true)
+                    .build();
+
+    protected static List<PropertyDescriptor> descriptors = new ArrayList<>();
+
+    protected static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("files that were successfully processed")
+                    .build();
+    protected static final Relationship REL_FAILURE =
+            new Relationship.Builder()
+                    .name("failure")
+                    .description("files that were not successfully processed")
+                    .build();
+
+    protected static Set<Relationship> relationships = new HashSet<>();
+
+    static {
+        descriptors.add(IOTDB_HOST);
+        descriptors.add(IOTDB_PORT);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+
+        typeMap.put(RecordFieldType.STRING, TSDataType.TEXT);
+        typeMap.put(RecordFieldType.BOOLEAN, TSDataType.BOOLEAN);
+        typeMap.put(RecordFieldType.INT, TSDataType.INT32);
+        typeMap.put(RecordFieldType.LONG, TSDataType.INT64);
+        typeMap.put(RecordFieldType.FLOAT, TSDataType.FLOAT);
+        typeMap.put(RecordFieldType.DOUBLE, TSDataType.DOUBLE);
+
+        supportedType.add(RecordFieldType.BOOLEAN);
+        supportedType.add(RecordFieldType.STRING);
+        supportedType.add(RecordFieldType.INT);
+        supportedType.add(RecordFieldType.LONG);
+        supportedType.add(RecordFieldType.FLOAT);
+        supportedType.add(RecordFieldType.DOUBLE);
+
+        supportedTimeType.add(RecordFieldType.STRING);
+        supportedTimeType.add(RecordFieldType.LONG);
+    }
+
+    protected final AtomicReference<Session> session = new AtomicReference<>(null);
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) throws IoTDBConnectionException {
+        connectToIoTDB(context);
+    }
+
+    void connectToIoTDB(ProcessContext context) throws IoTDBConnectionException {
+        if (session.get() == null) {
+            final String host = context.getProperty(IOTDB_HOST).getValue();
+            final int port = Integer.parseInt(context.getProperty(IOTDB_PORT).getValue());
+            final String username = context.getProperty(USERNAME).getValue();
+            final String password = context.getProperty(PASSWORD).getValue();
+
+            session.set(
+                    new Session.Builder()
+                            .host(host)
+                            .port(port)
+                            .username(username)
+                            .password(password)
+                            .build());
+            session.get().open();
+        }
+    }
+
+    public void stop(ProcessContext context) {
+        if (session.get() != null) {
+            try {
+                session.get().close();
+            } catch (IoTDBConnectionException e) {
+                getLogger().error("IoTDB disconnection failed", e);
+            }
+            session.set(null);
+        }
+    }
+
+    protected TSDataType getType(RecordFieldType type) {
+        return typeMap.get(type);
+    }
+
+    protected ValidationResult validateSchemaAttribute(String schemaAttribute) {
+        JsonNode schema = null;
+        try {
+            schema = mapper.readTree(schemaAttribute);
+        } catch (JsonProcessingException e) {
+            return new ValidationResult(false, e.getMessage());
+        }
+        Set<String> keySet = new HashSet<>();
+        schema.fieldNames().forEachRemaining(field -> keySet.add(field));
+
+        if (!keySet.contains(TIME_TYPE) || !keySet.contains(FIELDS)) {
+            String msg = "The JSON of schema must contain `timeType` and `fields`";
+            return new ValidationResult(false, msg);
+        }
+
+        if (!IoTDBSchema.getSupportedTimeType().contains(schema.get(TIME_TYPE).asText())) {
+            String msg =
+                    String.format(
+                            "Unknown `timeType`: %s, there are only two options `LONG` and `STRING` for this property",
+                            schema.get(TIME_TYPE).asText());
+            return new ValidationResult(false, msg);
+        }
+
+        for (int i = 0; i < schema.get(FIELDS).size(); i++) {
+            JsonNode field = schema.get(FIELDS).get(i);
+            Set<String> fieldKeySet = new HashSet<>();
+
+            field.fieldNames().forEachRemaining(fieldName -> fieldKeySet.add(fieldName));
+            if (!fieldKeySet.contains("tsName") || !fieldKeySet.contains("dataType")) {
+                String msg = "`tsName` or `dataType` has not been set";
+                return new ValidationResult(false, msg);
+            }
+
+            if (!field.get("tsName").asText().startsWith(PREFIX)) {
+                String msg =
+                        String.format("The tsName `%s` is not start with 'root.'", field.get("tsName").asText());
+                return new ValidationResult(false, msg);
+            }
+
+            if (!Field.getSupportedDataType().contains(field.get("dataType").asText())) {
+                String msg =
+                        String.format(
+                                "Unknown `dataType`: %s. The supported dataTypes are %s",
+                                field.get("dataType").asText(), Field.getSupportedDataType());
+                return new ValidationResult(false, msg);
+            }
+
+            Set<String> supportedKeySet = new HashSet<>();
+            supportedKeySet.add("tsName");
+            supportedKeySet.add("dataType");
+            supportedKeySet.add("encoding");
+            supportedKeySet.add("compressionType");
+
+            HashSet<String> tmpKetSet = new HashSet<>();
+            tmpKetSet.addAll(supportedKeySet);
+            tmpKetSet.addAll(fieldKeySet);
+            tmpKetSet.removeAll(supportedKeySet);
+            if (!tmpKetSet.isEmpty()) {
+                String msg = "Unknown property or properties: " + tmpKetSet;
+                return new ValidationResult(false, msg);
+            }
+
+            if (fieldKeySet.contains("compressionType") && !fieldKeySet.contains("encoding")) {
+                String msg =
+                        "The `compressionType` has been set, but the `encoding` has not. The property `compressionType` will not take effect";
+                return new ValidationResult(true, msg);
+            }
+
+            if (field.get("encoding") != null
+                    && !Field.getSupportedEncoding().contains(field.get("encoding").asText())) {
+                String msg =
+                        String.format(
+                                "Unknown `encoding`: %s, The supported encoding types are %s",
+                                field.get("encoding").asText(), Field.getSupportedEncoding());
+                return new ValidationResult(false, msg);
+            }
+
+            if (field.get("compressionType") != null
+                    && !Field.getSupportedCompressionType().contains(field.get("compressionType"))) {
+                String msg =
+                        String.format(
+                                "Unknown `compressionType`: %s, The supported compressionType are %s",
+                                field.get("compressionType").asText(), Field.getSupportedCompressionType());
+                return new ValidationResult(false, msg);
+            }
+        }
+
+        return new ValidationResult(true, null);
+    }
+
+    protected ValidationResult validateSchema(RecordSchema recordSchema) {
+        List<String> fieldNames = recordSchema.getFieldNames();
+        List<DataType> dataTypes = recordSchema.getDataTypes();
+        if (!fieldNames.contains(TIME) || !TIME.equals(fieldNames.get(0))) {
+            return new ValidationResult(false, "The fields must contain `Time`, and it must be the first");
+        }
+        if (!supportedTimeType.contains(recordSchema.getDataType(TIME).get().getFieldType())) {
+            return new ValidationResult(false, "The dataType of `Time` must be `STRING` or `LONG`");
+        }
+        fieldNames.remove(TIME);
+        for (String fieldName : fieldNames) {
+            if (!fieldName.startsWith(PREFIX)) {
+                String msg = String.format("The tsName `%s` is not start with 'root.'", fieldName);
+                return new ValidationResult(false, msg);
+            }
+        }
+        for (DataType type : dataTypes) {
+            RecordFieldType dataType = type.getFieldType();
+            if (!supportedType.contains(dataType)) {
+                String msg =
+                        String.format(
+                                "Unknown `dataType`: %s. The supported dataTypes are %s",
+                                dataType.toString(), supportedType);
+                return new ValidationResult(false, msg);
+            }
+        }
+
+        return new ValidationResult(true, null);
+    }
+
+    protected Map<String, List<String>> parseSchema(List<String> filedNames) {
+        HashMap<String, List<String>> deviceMeasurementMap = new HashMap<>();
+        filedNames.stream()
+                .forEach(
+                        filed -> {
+                            String[] paths = filed.split("\\.");
+                            String device = StringUtils.join(paths, ".", 0, paths.length - 1);
+
+                            if (!deviceMeasurementMap.containsKey(device)) {
+                                deviceMeasurementMap.put(device, new ArrayList<>());
+                            }
+                            deviceMeasurementMap.get(device).add(paths[paths.length - 1]);
+                        });
+
+        return deviceMeasurementMap;
+    }
+
+    protected HashMap<String, Tablet> generateTablets(IoTDBSchema schema, int maxRowNumber) {
+        Map<String, List<String>> deviceMeasurementMap = parseSchema(schema.getFieldNames());
+        HashMap<String, Tablet> tablets = new HashMap<>();
+        deviceMeasurementMap.forEach(
+                (device, measurements) -> {
+                    ArrayList<MeasurementSchema> schemas = new ArrayList<>();
+                    for (String measurement : measurements) {
+                        String tsName = device + "." + measurement;
+                        TSDataType dataType = schema.getDataType(tsName);
+                        TSEncoding encoding = schema.getEncodingType(tsName);
+                        CompressionType compressionType = schema.getCompressionType(tsName);
+                        if (encoding == null) {
+                            schemas.add(new MeasurementSchema(measurement, dataType));
+                        } else if (compressionType == null) {
+                            schemas.add(new MeasurementSchema(measurement, dataType, encoding));
+                        } else {
+                            schemas.add(new MeasurementSchema(measurement, dataType, encoding, compressionType));
+                        }
+                    }
+                    Tablet tablet = new Tablet(device, schemas, maxRowNumber);
+                    tablets.put(device, tablet);
+                });
+        return tablets;
+    }
+
+    protected DateTimeFormatter initFormatter(String time) {
+        for (String format : STRING_TIME_FORMAT) {
+            try {
+                DateTimeFormatter.ofPattern(format).parse(time);
+                return DateTimeFormatter.ofPattern(format);
+            } catch (DateTimeParseException e) {
+
+            }
+        }
+        return null;
+    }
+
+    protected Object convertType(Object value, TSDataType type) {
+        switch (type) {
+            case TEXT:
+                return Binary.valueOf(String.valueOf(value));
+            case INT32:
+                return Integer.parseInt(value.toString());
+            case INT64:
+                return Long.parseLong(value.toString());
+            case FLOAT:
+                return Float.parseFloat(value.toString());
+            case DOUBLE:
+                return Double.parseDouble(value.toString());
+            case BOOLEAN:
+                return Boolean.parseBoolean(value.toString());
+            default:
+                return null;
+        }
+    }
+
+    protected IoTDBSchema convertSchema(RecordSchema recordSchema) {
+        String timeType =
+                recordSchema.getDataType(TIME).get().getFieldType() == RecordFieldType.LONG
+                        ? "LONG"
+                        : "STRING";
+        List<String> fieldNames = recordSchema.getFieldNames();
+        fieldNames.remove(TIME);
+
+        ArrayList<Field> fields = new ArrayList<>();
+        fieldNames.forEach(
+                fieldName ->
+                        fields.add(
+                                new Field(
+                                        fieldName, getType(recordSchema.getDataType(fieldName).get().getFieldType()))));

Review Comment:
   As above, `get()` is called without `isPresent()`.



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDB.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.io.InputStream;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Set;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.nifi.processors.model.IoTDBSchema;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.model.ValidationResult;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+
+@Tags({"iotdb", "insert", "tablet"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "This is a record aware processor that reads the content of the incoming FlowFile as individual records using the "
+                + "configured 'Record Reader' and writes them to Apache IoTDB using native interface.")
+public class PutIoTDB extends AbstractIoTDB {
+
+    static final PropertyDescriptor RECORD_READER_FACTORY =
+            new PropertyDescriptor.Builder()
+                    .name("Record Reader")
+                    .description(
+                            "Specifies the type of Record Reader controller service to use for parsing the incoming data "
+                                    + "and determining the schema")
+                    .identifiesControllerService(RecordReaderFactory.class)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor SCHEMA =
+            new PropertyDescriptor.Builder()
+                    .name("Schema")
+                    .description(
+                            "The schema that IoTDB needs doesn't support good by NiFi.\n"
+                                    + "Therefore, you can define the schema here.\n"
+                                    + "Besides, you can set encoding type and compression type by this method.\n"
+                                    + "If you don't set this property, the inferred schema will be used.\n")

Review Comment:
   The newline characters should be removed because they are ignored when rendering the HTML. It seems like the Schema documentation link is missing.
   ```suggestion
                               "The IoTDB Schema definition specified as JSON. The Schema will be inferred when not specified. ")
   ```



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDB.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.io.InputStream;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Set;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.nifi.processors.model.IoTDBSchema;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.model.ValidationResult;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+
+@Tags({"iotdb", "insert", "tablet"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "This is a record aware processor that reads the content of the incoming FlowFile as individual records using the "
+                + "configured 'Record Reader' and writes them to Apache IoTDB using native interface.")
+public class PutIoTDB extends AbstractIoTDB {
+
+    static final PropertyDescriptor RECORD_READER_FACTORY =
+            new PropertyDescriptor.Builder()
+                    .name("Record Reader")
+                    .description(
+                            "Specifies the type of Record Reader controller service to use for parsing the incoming data "
+                                    + "and determining the schema")
+                    .identifiesControllerService(RecordReaderFactory.class)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor SCHEMA =
+            new PropertyDescriptor.Builder()
+                    .name("Schema")
+                    .description(
+                            "The schema that IoTDB needs doesn't support good by NiFi.\n"
+                                    + "Therefore, you can define the schema here.\n"
+                                    + "Besides, you can set encoding type and compression type by this method.\n"
+                                    + "If you don't set this property, the inferred schema will be used.\n")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor ALIGNED =
+            new PropertyDescriptor.Builder()
+                    .name("Aligned")
+                    .description("Whether to use the Apache IoTDB Aligned Timeseries interface")
+                    .allowableValues("true", "false")
+                    .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor MAX_ROW_NUMBER =
+            new PropertyDescriptor.Builder()
+                    .name("Max Row Number")
+                    .description(
+                            "Specifies the max row number of each Apache IoTDB Tablet")
+                    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)

Review Comment:
   This should be changed to `true` and the default value of `1024` should be added. This makes the behavior more obvious as opposed to the internal hard-coded default of 1024.



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processors.model.Field;
+import org.apache.nifi.processors.model.IoTDBSchema;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.model.ValidationResult;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public abstract class AbstractIoTDB extends AbstractProcessor {
+    private static final int DEFAULT_IOTDB_PORT = 6667;
+
+    protected static ObjectMapper mapper = new ObjectMapper();
+
+    private static final String TIME_TYPE = "timeType";
+    private static final String FIELDS = "fields";
+    private static final String TIME = "Time";
+    private static final String PREFIX = "root.";
+
+    private static Map<RecordFieldType, TSDataType> typeMap =

Review Comment:
   This should be declared `final`:
   ```suggestion
       private static final Map<RecordFieldType, TSDataType> typeMap =
   ```



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processors.model.Field;
+import org.apache.nifi.processors.model.IoTDBSchema;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.model.ValidationResult;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public abstract class AbstractIoTDB extends AbstractProcessor {
+    private static final int DEFAULT_IOTDB_PORT = 6667;
+
+    protected static ObjectMapper mapper = new ObjectMapper();
+
+    private static final String TIME_TYPE = "timeType";
+    private static final String FIELDS = "fields";
+    private static final String TIME = "Time";
+    private static final String PREFIX = "root.";
+
+    private static Map<RecordFieldType, TSDataType> typeMap =
+            new HashMap<>();
+
+    static final Set<RecordFieldType> supportedType =
+            new HashSet<>();
+
+    static final Set<RecordFieldType> supportedTimeType =
+            new HashSet<>();
+
+    protected static final String[] STRING_TIME_FORMAT =
+            new String[]{
+                    "yyyy-MM-dd HH:mm:ss.SSSX",
+                    "yyyy/MM/dd HH:mm:ss.SSSX",
+                    "yyyy.MM.dd HH:mm:ss.SSSX",
+                    "yyyy-MM-dd HH:mm:ssX",
+                    "yyyy/MM/dd HH:mm:ssX",
+                    "yyyy.MM.dd HH:mm:ssX",
+                    "yyyy-MM-dd HH:mm:ss.SSSz",
+                    "yyyy/MM/dd HH:mm:ss.SSSz",
+                    "yyyy.MM.dd HH:mm:ss.SSSz",
+                    "yyyy-MM-dd HH:mm:ssz",
+                    "yyyy/MM/dd HH:mm:ssz",
+                    "yyyy.MM.dd HH:mm:ssz",
+                    "yyyy-MM-dd HH:mm:ss.SSS",
+                    "yyyy/MM/dd HH:mm:ss.SSS",
+                    "yyyy.MM.dd HH:mm:ss.SSS",
+                    "yyyy-MM-dd HH:mm:ss",
+                    "yyyy/MM/dd HH:mm:ss",
+                    "yyyy.MM.dd HH:mm:ss",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSX",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSX",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSX",
+                    "yyyy-MM-dd'T'HH:mm:ssX",
+                    "yyyy/MM/dd'T'HH:mm:ssX",
+                    "yyyy.MM.dd'T'HH:mm:ssX",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSz",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSz",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSz",
+                    "yyyy-MM-dd'T'HH:mm:ssz",
+                    "yyyy/MM/dd'T'HH:mm:ssz",
+                    "yyyy.MM.dd'T'HH:mm:ssz",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSS",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSS",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSS",
+                    "yyyy-MM-dd'T'HH:mm:ss",
+                    "yyyy/MM/dd'T'HH:mm:ss",
+                    "yyyy.MM.dd'T'HH:mm:ss"
+            };
+
+    static final PropertyDescriptor IOTDB_HOST =
+            new PropertyDescriptor.Builder()
+                    .name("Host")
+                    .description("The host of IoTDB.")
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor IOTDB_PORT =
+            new PropertyDescriptor.Builder()
+                    .name("Port")
+                    .description("The port of IoTDB.")
+                    .defaultValue(String.valueOf(DEFAULT_IOTDB_PORT))
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor USERNAME =
+            new PropertyDescriptor.Builder()
+                    .name("Username")
+                    .description("Username to access the IoTDB.")
+                    .required(true)

Review Comment:
   The `Username` property needs a validator, see `NON_BLANK_VALIDATOR`



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processors.model.Field;
+import org.apache.nifi.processors.model.IoTDBSchema;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.model.ValidationResult;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public abstract class AbstractIoTDB extends AbstractProcessor {
+    private static final int DEFAULT_IOTDB_PORT = 6667;
+
+    protected static ObjectMapper mapper = new ObjectMapper();
+
+    private static final String TIME_TYPE = "timeType";
+    private static final String FIELDS = "fields";
+    private static final String TIME = "Time";

Review Comment:
   Is the requirement for a field named `Time` a constraint of IoTDB? It seems like this should be configurable or derived from the incoming NiFi Record Schema.



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDB.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.io.InputStream;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Set;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.nifi.processors.model.IoTDBSchema;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.model.ValidationResult;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+
+@Tags({"iotdb", "insert", "tablet"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "This is a record aware processor that reads the content of the incoming FlowFile as individual records using the "
+                + "configured 'Record Reader' and writes them to Apache IoTDB using native interface.")
+public class PutIoTDB extends AbstractIoTDB {
+
+    static final PropertyDescriptor RECORD_READER_FACTORY =
+            new PropertyDescriptor.Builder()
+                    .name("Record Reader")
+                    .description(
+                            "Specifies the type of Record Reader controller service to use for parsing the incoming data "
+                                    + "and determining the schema")
+                    .identifiesControllerService(RecordReaderFactory.class)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor SCHEMA =
+            new PropertyDescriptor.Builder()
+                    .name("Schema")
+                    .description(
+                            "The schema that IoTDB needs doesn't support good by NiFi.\n"
+                                    + "Therefore, you can define the schema here.\n"
+                                    + "Besides, you can set encoding type and compression type by this method.\n"
+                                    + "If you don't set this property, the inferred schema will be used.\n")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor ALIGNED =
+            new PropertyDescriptor.Builder()
+                    .name("Aligned")
+                    .description("Whether to use the Apache IoTDB Aligned Timeseries interface")
+                    .allowableValues("true", "false")
+                    .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor MAX_ROW_NUMBER =
+            new PropertyDescriptor.Builder()
+                    .name("Max Row Number")
+                    .description(
+                            "Specifies the max row number of each Apache IoTDB Tablet")
+                    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static {
+        descriptors.add(RECORD_READER_FACTORY);
+        descriptors.add(SCHEMA);
+        descriptors.add(ALIGNED);
+        descriptors.add(MAX_ROW_NUMBER);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext processContext, ProcessSession processSession)
+            throws ProcessException {
+        final RecordReaderFactory recordParserFactory =
+                processContext
+                        .getProperty(RECORD_READER_FACTORY)
+                        .asControllerService(RecordReaderFactory.class);
+
+        FlowFile flowFile = processSession.get();
+
+        if (flowFile == null) {
+            processSession.transfer(flowFile, REL_SUCCESS);
+            return;
+        }
+
+        String schemaProperty = processContext.getProperty(SCHEMA).evaluateAttributeExpressions(flowFile).getValue();
+        String alignedProperty = processContext.getProperty(ALIGNED).evaluateAttributeExpressions(flowFile).getValue();
+        String maxRowNumberProperty = processContext.getProperty(MAX_ROW_NUMBER).evaluateAttributeExpressions(flowFile).getValue();
+
+        final boolean aligned = alignedProperty != null ? Boolean.valueOf(alignedProperty) : false;
+        int maxRowNumber = maxRowNumberProperty != null ? Integer.valueOf(maxRowNumberProperty) : 1024;
+
+        try (final InputStream inputStream = processSession.read(flowFile);
+             final RecordReader recordReader =
+                     recordParserFactory.createRecordReader(flowFile, inputStream, getLogger())) {
+            boolean needInitFormatter;
+            IoTDBSchema schema;
+            ValidationResult result;
+
+            result =
+                    schemaProperty != null
+                            ? validateSchemaAttribute(schemaProperty)
+                            : validateSchema(recordReader.getSchema());
+
+            if (!result.getKey()) {
+                getLogger().error(String.format("The property `schema` has an error: %s", result.getValue()));
+                inputStream.close();
+                recordReader.close();
+                processSession.transfer(flowFile, REL_FAILURE);
+                return;
+            } else {
+                if (result.getValue() != null) {
+                    getLogger().warn(String.format("The property `schema` has a warn: %s", result.getValue()));

Review Comment:
   ```suggestion
                       getLogger().warn("The property `schema` has a warn: {}", result.getValue());
   ```



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processors.model.Field;
+import org.apache.nifi.processors.model.IoTDBSchema;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.model.ValidationResult;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public abstract class AbstractIoTDB extends AbstractProcessor {
+    private static final int DEFAULT_IOTDB_PORT = 6667;
+
+    protected static ObjectMapper mapper = new ObjectMapper();
+
+    private static final String TIME_TYPE = "timeType";
+    private static final String FIELDS = "fields";
+    private static final String TIME = "Time";
+    private static final String PREFIX = "root.";
+
+    private static Map<RecordFieldType, TSDataType> typeMap =
+            new HashMap<>();
+
+    static final Set<RecordFieldType> supportedType =
+            new HashSet<>();
+
+    static final Set<RecordFieldType> supportedTimeType =
+            new HashSet<>();
+
+    protected static final String[] STRING_TIME_FORMAT =
+            new String[]{
+                    "yyyy-MM-dd HH:mm:ss.SSSX",
+                    "yyyy/MM/dd HH:mm:ss.SSSX",
+                    "yyyy.MM.dd HH:mm:ss.SSSX",
+                    "yyyy-MM-dd HH:mm:ssX",
+                    "yyyy/MM/dd HH:mm:ssX",
+                    "yyyy.MM.dd HH:mm:ssX",
+                    "yyyy-MM-dd HH:mm:ss.SSSz",
+                    "yyyy/MM/dd HH:mm:ss.SSSz",
+                    "yyyy.MM.dd HH:mm:ss.SSSz",
+                    "yyyy-MM-dd HH:mm:ssz",
+                    "yyyy/MM/dd HH:mm:ssz",
+                    "yyyy.MM.dd HH:mm:ssz",
+                    "yyyy-MM-dd HH:mm:ss.SSS",
+                    "yyyy/MM/dd HH:mm:ss.SSS",
+                    "yyyy.MM.dd HH:mm:ss.SSS",
+                    "yyyy-MM-dd HH:mm:ss",
+                    "yyyy/MM/dd HH:mm:ss",
+                    "yyyy.MM.dd HH:mm:ss",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSX",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSX",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSX",
+                    "yyyy-MM-dd'T'HH:mm:ssX",
+                    "yyyy/MM/dd'T'HH:mm:ssX",
+                    "yyyy.MM.dd'T'HH:mm:ssX",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSz",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSz",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSz",
+                    "yyyy-MM-dd'T'HH:mm:ssz",
+                    "yyyy/MM/dd'T'HH:mm:ssz",
+                    "yyyy.MM.dd'T'HH:mm:ssz",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSS",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSS",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSS",
+                    "yyyy-MM-dd'T'HH:mm:ss",
+                    "yyyy/MM/dd'T'HH:mm:ss",
+                    "yyyy.MM.dd'T'HH:mm:ss"
+            };
+
+    static final PropertyDescriptor IOTDB_HOST =
+            new PropertyDescriptor.Builder()
+                    .name("Host")
+                    .description("The host of IoTDB.")
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor IOTDB_PORT =
+            new PropertyDescriptor.Builder()
+                    .name("Port")
+                    .description("The port of IoTDB.")
+                    .defaultValue(String.valueOf(DEFAULT_IOTDB_PORT))
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor USERNAME =
+            new PropertyDescriptor.Builder()
+                    .name("Username")
+                    .description("Username to access the IoTDB.")
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor PASSWORD =
+            new PropertyDescriptor.Builder()
+                    .name("Password")
+                    .description("Password to access the IoTDB.")
+                    .required(true)
+                    .sensitive(true)

Review Comment:
   The `Password` needs a validator, see `NON_BLANK_VALIDATOR`



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDB.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.io.InputStream;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Set;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.nifi.processors.model.IoTDBSchema;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.model.ValidationResult;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+
+@Tags({"iotdb", "insert", "tablet"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "This is a record aware processor that reads the content of the incoming FlowFile as individual records using the "
+                + "configured 'Record Reader' and writes them to Apache IoTDB using native interface.")
+public class PutIoTDB extends AbstractIoTDB {
+
+    static final PropertyDescriptor RECORD_READER_FACTORY =
+            new PropertyDescriptor.Builder()
+                    .name("Record Reader")
+                    .description(
+                            "Specifies the type of Record Reader controller service to use for parsing the incoming data "
+                                    + "and determining the schema")
+                    .identifiesControllerService(RecordReaderFactory.class)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor SCHEMA =
+            new PropertyDescriptor.Builder()
+                    .name("Schema")
+                    .description(
+                            "The schema that IoTDB needs doesn't support good by NiFi.\n"
+                                    + "Therefore, you can define the schema here.\n"
+                                    + "Besides, you can set encoding type and compression type by this method.\n"
+                                    + "If you don't set this property, the inferred schema will be used.\n")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor ALIGNED =
+            new PropertyDescriptor.Builder()
+                    .name("Aligned")
+                    .description("Whether to use the Apache IoTDB Aligned Timeseries interface")
+                    .allowableValues("true", "false")
+                    .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor MAX_ROW_NUMBER =
+            new PropertyDescriptor.Builder()
+                    .name("Max Row Number")
+                    .description(
+                            "Specifies the max row number of each Apache IoTDB Tablet")
+                    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)

Review Comment:
   Is it necessary to support FlowFile attributes?



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDB.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.io.InputStream;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Set;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.nifi.processors.model.IoTDBSchema;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.model.ValidationResult;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+
+@Tags({"iotdb", "insert", "tablet"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "This is a record aware processor that reads the content of the incoming FlowFile as individual records using the "
+                + "configured 'Record Reader' and writes them to Apache IoTDB using native interface.")
+public class PutIoTDB extends AbstractIoTDB {
+
+    static final PropertyDescriptor RECORD_READER_FACTORY =
+            new PropertyDescriptor.Builder()
+                    .name("Record Reader")
+                    .description(
+                            "Specifies the type of Record Reader controller service to use for parsing the incoming data "
+                                    + "and determining the schema")
+                    .identifiesControllerService(RecordReaderFactory.class)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor SCHEMA =
+            new PropertyDescriptor.Builder()
+                    .name("Schema")
+                    .description(
+                            "The schema that IoTDB needs doesn't support good by NiFi.\n"
+                                    + "Therefore, you can define the schema here.\n"
+                                    + "Besides, you can set encoding type and compression type by this method.\n"
+                                    + "If you don't set this property, the inferred schema will be used.\n")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor ALIGNED =
+            new PropertyDescriptor.Builder()
+                    .name("Aligned")
+                    .description("Whether to use the Apache IoTDB Aligned Timeseries interface")
+                    .allowableValues("true", "false")
+                    .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor MAX_ROW_NUMBER =
+            new PropertyDescriptor.Builder()
+                    .name("Max Row Number")
+                    .description(
+                            "Specifies the max row number of each Apache IoTDB Tablet")
+                    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static {
+        descriptors.add(RECORD_READER_FACTORY);
+        descriptors.add(SCHEMA);
+        descriptors.add(ALIGNED);
+        descriptors.add(MAX_ROW_NUMBER);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext processContext, ProcessSession processSession)
+            throws ProcessException {
+        final RecordReaderFactory recordParserFactory =
+                processContext
+                        .getProperty(RECORD_READER_FACTORY)
+                        .asControllerService(RecordReaderFactory.class);
+
+        FlowFile flowFile = processSession.get();
+
+        if (flowFile == null) {
+            processSession.transfer(flowFile, REL_SUCCESS);
+            return;
+        }
+
+        String schemaProperty = processContext.getProperty(SCHEMA).evaluateAttributeExpressions(flowFile).getValue();
+        String alignedProperty = processContext.getProperty(ALIGNED).evaluateAttributeExpressions(flowFile).getValue();
+        String maxRowNumberProperty = processContext.getProperty(MAX_ROW_NUMBER).evaluateAttributeExpressions(flowFile).getValue();
+
+        final boolean aligned = alignedProperty != null ? Boolean.valueOf(alignedProperty) : false;
+        int maxRowNumber = maxRowNumberProperty != null ? Integer.valueOf(maxRowNumberProperty) : 1024;
+
+        try (final InputStream inputStream = processSession.read(flowFile);
+             final RecordReader recordReader =
+                     recordParserFactory.createRecordReader(flowFile, inputStream, getLogger())) {
+            boolean needInitFormatter;
+            IoTDBSchema schema;
+            ValidationResult result;
+
+            result =
+                    schemaProperty != null
+                            ? validateSchemaAttribute(schemaProperty)
+                            : validateSchema(recordReader.getSchema());
+
+            if (!result.getKey()) {
+                getLogger().error(String.format("The property `schema` has an error: %s", result.getValue()));

Review Comment:
   The `%s` placeholder should be replaced with `{}` for all logging statements.
   ```suggestion
                   getLogger().error("The property `schema` has an error: {}", result.getValue());
   ```



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDB.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.io.InputStream;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Set;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.nifi.processors.model.IoTDBSchema;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.model.ValidationResult;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+
+@Tags({"iotdb", "insert", "tablet"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "This is a record aware processor that reads the content of the incoming FlowFile as individual records using the "
+                + "configured 'Record Reader' and writes them to Apache IoTDB using native interface.")
+public class PutIoTDB extends AbstractIoTDB {
+
+    static final PropertyDescriptor RECORD_READER_FACTORY =
+            new PropertyDescriptor.Builder()
+                    .name("Record Reader")
+                    .description(
+                            "Specifies the type of Record Reader controller service to use for parsing the incoming data "
+                                    + "and determining the schema")
+                    .identifiesControllerService(RecordReaderFactory.class)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor SCHEMA =
+            new PropertyDescriptor.Builder()
+                    .name("Schema")
+                    .description(
+                            "The schema that IoTDB needs doesn't support good by NiFi.\n"
+                                    + "Therefore, you can define the schema here.\n"
+                                    + "Besides, you can set encoding type and compression type by this method.\n"
+                                    + "If you don't set this property, the inferred schema will be used.\n")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor ALIGNED =
+            new PropertyDescriptor.Builder()
+                    .name("Aligned")
+                    .description("Whether to use the Apache IoTDB Aligned Timeseries interface")
+                    .allowableValues("true", "false")
+                    .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor MAX_ROW_NUMBER =
+            new PropertyDescriptor.Builder()
+                    .name("Max Row Number")
+                    .description(
+                            "Specifies the max row number of each Apache IoTDB Tablet")
+                    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static {
+        descriptors.add(RECORD_READER_FACTORY);
+        descriptors.add(SCHEMA);
+        descriptors.add(ALIGNED);
+        descriptors.add(MAX_ROW_NUMBER);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext processContext, ProcessSession processSession)
+            throws ProcessException {
+        final RecordReaderFactory recordParserFactory =
+                processContext
+                        .getProperty(RECORD_READER_FACTORY)
+                        .asControllerService(RecordReaderFactory.class);
+
+        FlowFile flowFile = processSession.get();
+
+        if (flowFile == null) {
+            processSession.transfer(flowFile, REL_SUCCESS);
+            return;
+        }
+
+        String schemaProperty = processContext.getProperty(SCHEMA).evaluateAttributeExpressions(flowFile).getValue();
+        String alignedProperty = processContext.getProperty(ALIGNED).evaluateAttributeExpressions(flowFile).getValue();
+        String maxRowNumberProperty = processContext.getProperty(MAX_ROW_NUMBER).evaluateAttributeExpressions(flowFile).getValue();
+
+        final boolean aligned = alignedProperty != null ? Boolean.valueOf(alignedProperty) : false;
+        int maxRowNumber = maxRowNumberProperty != null ? Integer.valueOf(maxRowNumberProperty) : 1024;
+
+        try (final InputStream inputStream = processSession.read(flowFile);
+             final RecordReader recordReader =
+                     recordParserFactory.createRecordReader(flowFile, inputStream, getLogger())) {
+            boolean needInitFormatter;
+            IoTDBSchema schema;
+            ValidationResult result;
+
+            result =
+                    schemaProperty != null
+                            ? validateSchemaAttribute(schemaProperty)
+                            : validateSchema(recordReader.getSchema());
+
+            if (!result.getKey()) {
+                getLogger().error(String.format("The property `schema` has an error: %s", result.getValue()));
+                inputStream.close();
+                recordReader.close();
+                processSession.transfer(flowFile, REL_FAILURE);
+                return;
+            } else {
+                if (result.getValue() != null) {
+                    getLogger().warn(String.format("The property `schema` has a warn: %s", result.getValue()));
+                }
+            }
+
+            schema =
+                    schemaProperty != null
+                            ? mapper.readValue(schemaProperty, IoTDBSchema.class)
+                            : convertSchema(recordReader.getSchema());
+
+            List<String> fieldNames = schema.getFieldNames();
+
+            needInitFormatter = schema.getTimeType() != IoTDBSchema.TimeType.LONG;
+
+            HashMap<String, Tablet> tablets = generateTablets(schema, maxRowNumber);
+            DateTimeFormatter format = null;
+
+            Record record;
+
+            while ((record = recordReader.nextRecord()) != null) {
+                Object[] values = record.getValues();
+                if (format == null && needInitFormatter) {
+                    format = initFormatter((String) values[0]);
+                    if (format == null) {
+                        getLogger().error("{} Record [{}] time format not supported\", flowFile, recordNumber");

Review Comment:
   ```suggestion
                           getLogger().error("{} Record [{}] time format not supported", flowFile, recordNumber");
   ```



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDB.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.io.InputStream;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Set;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.nifi.processors.model.IoTDBSchema;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.model.ValidationResult;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+
+@Tags({"iotdb", "insert", "tablet"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "This is a record aware processor that reads the content of the incoming FlowFile as individual records using the "
+                + "configured 'Record Reader' and writes them to Apache IoTDB using native interface.")
+public class PutIoTDB extends AbstractIoTDB {
+
+    static final PropertyDescriptor RECORD_READER_FACTORY =
+            new PropertyDescriptor.Builder()
+                    .name("Record Reader")
+                    .description(
+                            "Specifies the type of Record Reader controller service to use for parsing the incoming data "
+                                    + "and determining the schema")
+                    .identifiesControllerService(RecordReaderFactory.class)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor SCHEMA =
+            new PropertyDescriptor.Builder()
+                    .name("Schema")
+                    .description(
+                            "The schema that IoTDB needs doesn't support good by NiFi.\n"
+                                    + "Therefore, you can define the schema here.\n"
+                                    + "Besides, you can set encoding type and compression type by this method.\n"
+                                    + "If you don't set this property, the inferred schema will be used.\n")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor ALIGNED =
+            new PropertyDescriptor.Builder()
+                    .name("Aligned")
+                    .description("Whether to use the Apache IoTDB Aligned Timeseries interface")
+                    .allowableValues("true", "false")
+                    .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor MAX_ROW_NUMBER =
+            new PropertyDescriptor.Builder()
+                    .name("Max Row Number")
+                    .description(
+                            "Specifies the max row number of each Apache IoTDB Tablet")
+                    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static {
+        descriptors.add(RECORD_READER_FACTORY);
+        descriptors.add(SCHEMA);
+        descriptors.add(ALIGNED);
+        descriptors.add(MAX_ROW_NUMBER);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext processContext, ProcessSession processSession)
+            throws ProcessException {
+        final RecordReaderFactory recordParserFactory =
+                processContext
+                        .getProperty(RECORD_READER_FACTORY)
+                        .asControllerService(RecordReaderFactory.class);
+
+        FlowFile flowFile = processSession.get();
+
+        if (flowFile == null) {
+            processSession.transfer(flowFile, REL_SUCCESS);
+            return;
+        }
+
+        String schemaProperty = processContext.getProperty(SCHEMA).evaluateAttributeExpressions(flowFile).getValue();
+        String alignedProperty = processContext.getProperty(ALIGNED).evaluateAttributeExpressions(flowFile).getValue();
+        String maxRowNumberProperty = processContext.getProperty(MAX_ROW_NUMBER).evaluateAttributeExpressions(flowFile).getValue();
+
+        final boolean aligned = alignedProperty != null ? Boolean.valueOf(alignedProperty) : false;
+        int maxRowNumber = maxRowNumberProperty != null ? Integer.valueOf(maxRowNumberProperty) : 1024;
+
+        try (final InputStream inputStream = processSession.read(flowFile);
+             final RecordReader recordReader =
+                     recordParserFactory.createRecordReader(flowFile, inputStream, getLogger())) {
+            boolean needInitFormatter;
+            IoTDBSchema schema;
+            ValidationResult result;
+
+            result =
+                    schemaProperty != null
+                            ? validateSchemaAttribute(schemaProperty)
+                            : validateSchema(recordReader.getSchema());
+
+            if (!result.getKey()) {
+                getLogger().error(String.format("The property `schema` has an error: %s", result.getValue()));
+                inputStream.close();
+                recordReader.close();
+                processSession.transfer(flowFile, REL_FAILURE);
+                return;
+            } else {
+                if (result.getValue() != null) {
+                    getLogger().warn(String.format("The property `schema` has a warn: %s", result.getValue()));
+                }
+            }
+
+            schema =
+                    schemaProperty != null
+                            ? mapper.readValue(schemaProperty, IoTDBSchema.class)
+                            : convertSchema(recordReader.getSchema());
+
+            List<String> fieldNames = schema.getFieldNames();
+
+            needInitFormatter = schema.getTimeType() != IoTDBSchema.TimeType.LONG;
+
+            HashMap<String, Tablet> tablets = generateTablets(schema, maxRowNumber);
+            DateTimeFormatter format = null;
+
+            Record record;
+
+            while ((record = recordReader.nextRecord()) != null) {
+                Object[] values = record.getValues();
+                if (format == null && needInitFormatter) {
+                    format = initFormatter((String) values[0]);
+                    if (format == null) {
+                        getLogger().error("{} Record [{}] time format not supported\", flowFile, recordNumber");
+                        inputStream.close();
+                        recordReader.close();
+                        processSession.transfer(flowFile, REL_FAILURE);
+                        return;
+                    }
+                }
+
+                long timestamp;
+                if (needInitFormatter) {
+                    timestamp = Timestamp.valueOf(LocalDateTime.parse((String) values[0], format)).getTime();
+                } else {
+                    timestamp = (Long) values[0];
+                }
+
+                boolean isFulled = false;
+
+                for (Map.Entry<String, Tablet> entry : tablets.entrySet()) {
+                    String device = entry.getKey();
+                    Tablet tablet = entry.getValue();
+                    int rowIndex = tablet.rowSize++;
+
+                    tablet.addTimestamp(rowIndex, timestamp);
+                    List<MeasurementSchema> measurements = tablet.getSchemas();
+                    for (MeasurementSchema measurement : measurements) {
+                        String tsName =
+                                new StringBuilder()
+                                        .append(device)
+                                        .append(".")
+                                        .append(measurement.getMeasurementId())
+                                        .toString();
+                        int valueIndex = fieldNames.indexOf(tsName) + 1;
+                        Object value;
+                        TSDataType type = measurement.getType();
+                        try {
+                            value = convertType(values[valueIndex], type);
+                        } catch (Exception e) {
+                            getLogger().warn(String.format("The value `%s` can't be converted to the type `%s`", values[valueIndex], type));

Review Comment:
   ```suggestion
                               getLogger().warn("The value [{}] can't be converted to the type [{}]", values[valueIndex], type);
   ```



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDB.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.io.InputStream;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Set;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.nifi.processors.model.IoTDBSchema;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.model.ValidationResult;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+
+@Tags({"iotdb", "insert", "tablet"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "This is a record aware processor that reads the content of the incoming FlowFile as individual records using the "
+                + "configured 'Record Reader' and writes them to Apache IoTDB using native interface.")
+public class PutIoTDB extends AbstractIoTDB {
+
+    static final PropertyDescriptor RECORD_READER_FACTORY =
+            new PropertyDescriptor.Builder()
+                    .name("Record Reader")
+                    .description(
+                            "Specifies the type of Record Reader controller service to use for parsing the incoming data "
+                                    + "and determining the schema")
+                    .identifiesControllerService(RecordReaderFactory.class)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor SCHEMA =
+            new PropertyDescriptor.Builder()
+                    .name("Schema")
+                    .description(
+                            "The schema that IoTDB needs doesn't support good by NiFi.\n"
+                                    + "Therefore, you can define the schema here.\n"
+                                    + "Besides, you can set encoding type and compression type by this method.\n"
+                                    + "If you don't set this property, the inferred schema will be used.\n")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor ALIGNED =
+            new PropertyDescriptor.Builder()
+                    .name("Aligned")
+                    .description("Whether to use the Apache IoTDB Aligned Timeseries interface")
+                    .allowableValues("true", "false")
+                    .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor MAX_ROW_NUMBER =
+            new PropertyDescriptor.Builder()
+                    .name("Max Row Number")
+                    .description(
+                            "Specifies the max row number of each Apache IoTDB Tablet")
+                    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static {
+        descriptors.add(RECORD_READER_FACTORY);
+        descriptors.add(SCHEMA);
+        descriptors.add(ALIGNED);
+        descriptors.add(MAX_ROW_NUMBER);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext processContext, ProcessSession processSession)
+            throws ProcessException {
+        final RecordReaderFactory recordParserFactory =
+                processContext
+                        .getProperty(RECORD_READER_FACTORY)
+                        .asControllerService(RecordReaderFactory.class);
+
+        FlowFile flowFile = processSession.get();
+
+        if (flowFile == null) {
+            processSession.transfer(flowFile, REL_SUCCESS);
+            return;
+        }
+
+        String schemaProperty = processContext.getProperty(SCHEMA).evaluateAttributeExpressions(flowFile).getValue();
+        String alignedProperty = processContext.getProperty(ALIGNED).evaluateAttributeExpressions(flowFile).getValue();
+        String maxRowNumberProperty = processContext.getProperty(MAX_ROW_NUMBER).evaluateAttributeExpressions(flowFile).getValue();
+
+        final boolean aligned = alignedProperty != null ? Boolean.valueOf(alignedProperty) : false;
+        int maxRowNumber = maxRowNumberProperty != null ? Integer.valueOf(maxRowNumberProperty) : 1024;

Review Comment:
   As mentioned above, changing the properties to required avoids the need to set hard-coded defaults.



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processors.model.Field;
+import org.apache.nifi.processors.model.IoTDBSchema;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.model.ValidationResult;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public abstract class AbstractIoTDB extends AbstractProcessor {
+    private static final int DEFAULT_IOTDB_PORT = 6667;
+
+    protected static ObjectMapper mapper = new ObjectMapper();
+
+    private static final String TIME_TYPE = "timeType";
+    private static final String FIELDS = "fields";
+    private static final String TIME = "Time";
+    private static final String PREFIX = "root.";
+
+    private static Map<RecordFieldType, TSDataType> typeMap =
+            new HashMap<>();
+
+    static final Set<RecordFieldType> supportedType =
+            new HashSet<>();
+
+    static final Set<RecordFieldType> supportedTimeType =
+            new HashSet<>();
+
+    protected static final String[] STRING_TIME_FORMAT =
+            new String[]{
+                    "yyyy-MM-dd HH:mm:ss.SSSX",
+                    "yyyy/MM/dd HH:mm:ss.SSSX",
+                    "yyyy.MM.dd HH:mm:ss.SSSX",
+                    "yyyy-MM-dd HH:mm:ssX",
+                    "yyyy/MM/dd HH:mm:ssX",
+                    "yyyy.MM.dd HH:mm:ssX",
+                    "yyyy-MM-dd HH:mm:ss.SSSz",
+                    "yyyy/MM/dd HH:mm:ss.SSSz",
+                    "yyyy.MM.dd HH:mm:ss.SSSz",
+                    "yyyy-MM-dd HH:mm:ssz",
+                    "yyyy/MM/dd HH:mm:ssz",
+                    "yyyy.MM.dd HH:mm:ssz",
+                    "yyyy-MM-dd HH:mm:ss.SSS",
+                    "yyyy/MM/dd HH:mm:ss.SSS",
+                    "yyyy.MM.dd HH:mm:ss.SSS",
+                    "yyyy-MM-dd HH:mm:ss",
+                    "yyyy/MM/dd HH:mm:ss",
+                    "yyyy.MM.dd HH:mm:ss",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSX",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSX",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSX",
+                    "yyyy-MM-dd'T'HH:mm:ssX",
+                    "yyyy/MM/dd'T'HH:mm:ssX",
+                    "yyyy.MM.dd'T'HH:mm:ssX",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSSz",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSSz",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSSz",
+                    "yyyy-MM-dd'T'HH:mm:ssz",
+                    "yyyy/MM/dd'T'HH:mm:ssz",
+                    "yyyy.MM.dd'T'HH:mm:ssz",
+                    "yyyy-MM-dd'T'HH:mm:ss.SSS",
+                    "yyyy/MM/dd'T'HH:mm:ss.SSS",
+                    "yyyy.MM.dd'T'HH:mm:ss.SSS",
+                    "yyyy-MM-dd'T'HH:mm:ss",
+                    "yyyy/MM/dd'T'HH:mm:ss",
+                    "yyyy.MM.dd'T'HH:mm:ss"
+            };
+
+    static final PropertyDescriptor IOTDB_HOST =
+            new PropertyDescriptor.Builder()
+                    .name("Host")
+                    .description("The host of IoTDB.")
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor IOTDB_PORT =
+            new PropertyDescriptor.Builder()
+                    .name("Port")
+                    .description("The port of IoTDB.")
+                    .defaultValue(String.valueOf(DEFAULT_IOTDB_PORT))
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor USERNAME =
+            new PropertyDescriptor.Builder()
+                    .name("Username")
+                    .description("Username to access the IoTDB.")
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor PASSWORD =
+            new PropertyDescriptor.Builder()
+                    .name("Password")
+                    .description("Password to access the IoTDB.")
+                    .required(true)
+                    .sensitive(true)
+                    .build();
+
+    protected static List<PropertyDescriptor> descriptors = new ArrayList<>();
+
+    protected static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("files that were successfully processed")
+                    .build();
+    protected static final Relationship REL_FAILURE =
+            new Relationship.Builder()
+                    .name("failure")
+                    .description("files that were not successfully processed")
+                    .build();
+
+    protected static Set<Relationship> relationships = new HashSet<>();
+
+    static {
+        descriptors.add(IOTDB_HOST);
+        descriptors.add(IOTDB_PORT);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+
+        typeMap.put(RecordFieldType.STRING, TSDataType.TEXT);
+        typeMap.put(RecordFieldType.BOOLEAN, TSDataType.BOOLEAN);
+        typeMap.put(RecordFieldType.INT, TSDataType.INT32);
+        typeMap.put(RecordFieldType.LONG, TSDataType.INT64);
+        typeMap.put(RecordFieldType.FLOAT, TSDataType.FLOAT);
+        typeMap.put(RecordFieldType.DOUBLE, TSDataType.DOUBLE);
+
+        supportedType.add(RecordFieldType.BOOLEAN);
+        supportedType.add(RecordFieldType.STRING);
+        supportedType.add(RecordFieldType.INT);
+        supportedType.add(RecordFieldType.LONG);
+        supportedType.add(RecordFieldType.FLOAT);
+        supportedType.add(RecordFieldType.DOUBLE);
+
+        supportedTimeType.add(RecordFieldType.STRING);
+        supportedTimeType.add(RecordFieldType.LONG);
+    }
+
+    protected final AtomicReference<Session> session = new AtomicReference<>(null);
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) throws IoTDBConnectionException {
+        connectToIoTDB(context);
+    }
+
+    void connectToIoTDB(ProcessContext context) throws IoTDBConnectionException {
+        if (session.get() == null) {
+            final String host = context.getProperty(IOTDB_HOST).getValue();
+            final int port = Integer.parseInt(context.getProperty(IOTDB_PORT).getValue());
+            final String username = context.getProperty(USERNAME).getValue();
+            final String password = context.getProperty(PASSWORD).getValue();
+
+            session.set(
+                    new Session.Builder()
+                            .host(host)
+                            .port(port)
+                            .username(username)
+                            .password(password)
+                            .build());
+            session.get().open();
+        }
+    }
+
+    public void stop(ProcessContext context) {
+        if (session.get() != null) {
+            try {
+                session.get().close();
+            } catch (IoTDBConnectionException e) {
+                getLogger().error("IoTDB disconnection failed", e);
+            }
+            session.set(null);
+        }
+    }
+
+    protected TSDataType getType(RecordFieldType type) {
+        return typeMap.get(type);
+    }
+
+    protected ValidationResult validateSchemaAttribute(String schemaAttribute) {
+        JsonNode schema = null;
+        try {
+            schema = mapper.readTree(schemaAttribute);
+        } catch (JsonProcessingException e) {
+            return new ValidationResult(false, e.getMessage());
+        }
+        Set<String> keySet = new HashSet<>();
+        schema.fieldNames().forEachRemaining(field -> keySet.add(field));
+
+        if (!keySet.contains(TIME_TYPE) || !keySet.contains(FIELDS)) {
+            String msg = "The JSON of schema must contain `timeType` and `fields`";
+            return new ValidationResult(false, msg);
+        }
+
+        if (!IoTDBSchema.getSupportedTimeType().contains(schema.get(TIME_TYPE).asText())) {
+            String msg =
+                    String.format(
+                            "Unknown `timeType`: %s, there are only two options `LONG` and `STRING` for this property",
+                            schema.get(TIME_TYPE).asText());
+            return new ValidationResult(false, msg);
+        }
+
+        for (int i = 0; i < schema.get(FIELDS).size(); i++) {
+            JsonNode field = schema.get(FIELDS).get(i);
+            Set<String> fieldKeySet = new HashSet<>();
+
+            field.fieldNames().forEachRemaining(fieldName -> fieldKeySet.add(fieldName));
+            if (!fieldKeySet.contains("tsName") || !fieldKeySet.contains("dataType")) {
+                String msg = "`tsName` or `dataType` has not been set";
+                return new ValidationResult(false, msg);
+            }
+
+            if (!field.get("tsName").asText().startsWith(PREFIX)) {
+                String msg =
+                        String.format("The tsName `%s` is not start with 'root.'", field.get("tsName").asText());
+                return new ValidationResult(false, msg);
+            }
+
+            if (!Field.getSupportedDataType().contains(field.get("dataType").asText())) {
+                String msg =
+                        String.format(
+                                "Unknown `dataType`: %s. The supported dataTypes are %s",
+                                field.get("dataType").asText(), Field.getSupportedDataType());
+                return new ValidationResult(false, msg);
+            }
+
+            Set<String> supportedKeySet = new HashSet<>();
+            supportedKeySet.add("tsName");
+            supportedKeySet.add("dataType");
+            supportedKeySet.add("encoding");
+            supportedKeySet.add("compressionType");
+
+            HashSet<String> tmpKetSet = new HashSet<>();
+            tmpKetSet.addAll(supportedKeySet);
+            tmpKetSet.addAll(fieldKeySet);
+            tmpKetSet.removeAll(supportedKeySet);
+            if (!tmpKetSet.isEmpty()) {
+                String msg = "Unknown property or properties: " + tmpKetSet;
+                return new ValidationResult(false, msg);
+            }
+
+            if (fieldKeySet.contains("compressionType") && !fieldKeySet.contains("encoding")) {
+                String msg =
+                        "The `compressionType` has been set, but the `encoding` has not. The property `compressionType` will not take effect";
+                return new ValidationResult(true, msg);
+            }
+
+            if (field.get("encoding") != null
+                    && !Field.getSupportedEncoding().contains(field.get("encoding").asText())) {
+                String msg =
+                        String.format(
+                                "Unknown `encoding`: %s, The supported encoding types are %s",
+                                field.get("encoding").asText(), Field.getSupportedEncoding());
+                return new ValidationResult(false, msg);
+            }
+
+            if (field.get("compressionType") != null
+                    && !Field.getSupportedCompressionType().contains(field.get("compressionType"))) {
+                String msg =
+                        String.format(
+                                "Unknown `compressionType`: %s, The supported compressionType are %s",
+                                field.get("compressionType").asText(), Field.getSupportedCompressionType());
+                return new ValidationResult(false, msg);
+            }
+        }
+
+        return new ValidationResult(true, null);
+    }
+
+    protected ValidationResult validateSchema(RecordSchema recordSchema) {
+        List<String> fieldNames = recordSchema.getFieldNames();
+        List<DataType> dataTypes = recordSchema.getDataTypes();
+        if (!fieldNames.contains(TIME) || !TIME.equals(fieldNames.get(0))) {
+            return new ValidationResult(false, "The fields must contain `Time`, and it must be the first");
+        }
+        if (!supportedTimeType.contains(recordSchema.getDataType(TIME).get().getFieldType())) {
+            return new ValidationResult(false, "The dataType of `Time` must be `STRING` or `LONG`");
+        }
+        fieldNames.remove(TIME);
+        for (String fieldName : fieldNames) {
+            if (!fieldName.startsWith(PREFIX)) {
+                String msg = String.format("The tsName `%s` is not start with 'root.'", fieldName);
+                return new ValidationResult(false, msg);
+            }
+        }
+        for (DataType type : dataTypes) {
+            RecordFieldType dataType = type.getFieldType();
+            if (!supportedType.contains(dataType)) {
+                String msg =
+                        String.format(
+                                "Unknown `dataType`: %s. The supported dataTypes are %s",
+                                dataType.toString(), supportedType);
+                return new ValidationResult(false, msg);
+            }
+        }
+
+        return new ValidationResult(true, null);
+    }
+
+    protected Map<String, List<String>> parseSchema(List<String> filedNames) {
+        HashMap<String, List<String>> deviceMeasurementMap = new HashMap<>();
+        filedNames.stream()
+                .forEach(
+                        filed -> {
+                            String[] paths = filed.split("\\.");
+                            String device = StringUtils.join(paths, ".", 0, paths.length - 1);
+
+                            if (!deviceMeasurementMap.containsKey(device)) {
+                                deviceMeasurementMap.put(device, new ArrayList<>());
+                            }
+                            deviceMeasurementMap.get(device).add(paths[paths.length - 1]);
+                        });
+
+        return deviceMeasurementMap;
+    }
+
+    protected HashMap<String, Tablet> generateTablets(IoTDBSchema schema, int maxRowNumber) {
+        Map<String, List<String>> deviceMeasurementMap = parseSchema(schema.getFieldNames());
+        HashMap<String, Tablet> tablets = new HashMap<>();
+        deviceMeasurementMap.forEach(
+                (device, measurements) -> {
+                    ArrayList<MeasurementSchema> schemas = new ArrayList<>();
+                    for (String measurement : measurements) {
+                        String tsName = device + "." + measurement;
+                        TSDataType dataType = schema.getDataType(tsName);
+                        TSEncoding encoding = schema.getEncodingType(tsName);
+                        CompressionType compressionType = schema.getCompressionType(tsName);
+                        if (encoding == null) {
+                            schemas.add(new MeasurementSchema(measurement, dataType));
+                        } else if (compressionType == null) {
+                            schemas.add(new MeasurementSchema(measurement, dataType, encoding));
+                        } else {
+                            schemas.add(new MeasurementSchema(measurement, dataType, encoding, compressionType));
+                        }
+                    }
+                    Tablet tablet = new Tablet(device, schemas, maxRowNumber);
+                    tablets.put(device, tablet);
+                });
+        return tablets;
+    }
+
+    protected DateTimeFormatter initFormatter(String time) {
+        for (String format : STRING_TIME_FORMAT) {
+            try {
+                DateTimeFormatter.ofPattern(format).parse(time);
+                return DateTimeFormatter.ofPattern(format);
+            } catch (DateTimeParseException e) {
+
+            }
+        }
+        return null;
+    }
+
+    protected Object convertType(Object value, TSDataType type) {
+        switch (type) {
+            case TEXT:
+                return Binary.valueOf(String.valueOf(value));
+            case INT32:
+                return Integer.parseInt(value.toString());
+            case INT64:
+                return Long.parseLong(value.toString());
+            case FLOAT:
+                return Float.parseFloat(value.toString());
+            case DOUBLE:
+                return Double.parseDouble(value.toString());
+            case BOOLEAN:
+                return Boolean.parseBoolean(value.toString());
+            default:
+                return null;
+        }
+    }
+
+    protected IoTDBSchema convertSchema(RecordSchema recordSchema) {
+        String timeType =
+                recordSchema.getDataType(TIME).get().getFieldType() == RecordFieldType.LONG
+                        ? "LONG"
+                        : "STRING";
+        List<String> fieldNames = recordSchema.getFieldNames();
+        fieldNames.remove(TIME);
+
+        ArrayList<Field> fields = new ArrayList<>();
+        fieldNames.forEach(
+                fieldName ->
+                        fields.add(
+                                new Field(
+                                        fieldName, getType(recordSchema.getDataType(fieldName).get().getFieldType()))));
+        IoTDBSchema schema = new IoTDBSchema(timeType, fields);
+        return schema;

Review Comment:
   ```suggestion
           return new IoTDBSchema(timeType, fields);
   ```



##########
nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDB.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.io.InputStream;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Set;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.nifi.processors.model.IoTDBSchema;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.model.ValidationResult;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+
+@Tags({"iotdb", "insert", "tablet"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "This is a record aware processor that reads the content of the incoming FlowFile as individual records using the "
+                + "configured 'Record Reader' and writes them to Apache IoTDB using native interface.")
+public class PutIoTDB extends AbstractIoTDB {
+
+    static final PropertyDescriptor RECORD_READER_FACTORY =
+            new PropertyDescriptor.Builder()
+                    .name("Record Reader")
+                    .description(
+                            "Specifies the type of Record Reader controller service to use for parsing the incoming data "
+                                    + "and determining the schema")
+                    .identifiesControllerService(RecordReaderFactory.class)
+                    .required(true)
+                    .build();
+
+    static final PropertyDescriptor SCHEMA =
+            new PropertyDescriptor.Builder()
+                    .name("Schema")
+                    .description(
+                            "The schema that IoTDB needs doesn't support good by NiFi.\n"
+                                    + "Therefore, you can define the schema here.\n"
+                                    + "Besides, you can set encoding type and compression type by this method.\n"
+                                    + "If you don't set this property, the inferred schema will be used.\n")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)
+                    .build();
+
+    static final PropertyDescriptor ALIGNED =
+            new PropertyDescriptor.Builder()
+                    .name("Aligned")
+                    .description("Whether to use the Apache IoTDB Aligned Timeseries interface")
+                    .allowableValues("true", "false")
+                    .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+                    .defaultValue("false")
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .required(false)

Review Comment:
   This should be changed to `true` since the value must be either `true` or `false`. That should make the default clearer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org