You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2023/02/19 02:01:01 UTC

[nifi] branch main updated: NIFI-10784 Added QueryIoTDBRecord Processor

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 7cb86dd42d NIFI-10784 Added QueryIoTDBRecord Processor
7cb86dd42d is described below

commit 7cb86dd42d52923bae57e2dc0763799f682222f5
Author: lizhizhou <li...@gmail.com>
AuthorDate: Thu Jan 12 16:47:51 2023 +0800

    NIFI-10784 Added QueryIoTDBRecord Processor
    
    This closes #6844
    
    Co-authored-by: David Handermann <ex...@apache.org>
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../nifi-iotdb-processors/pom.xml                  |  12 ++
 .../org/apache/nifi/processors/AbstractIoTDB.java  |  10 ++
 .../apache/nifi/processors/QueryIoTDBRecord.java   | 174 +++++++++++++++++++++
 .../services/org.apache.nifi.processor.Processor   |   1 +
 .../org/apache/nifi/processors/QueryIoTDBIT.java   | 101 ++++++++++++
 5 files changed, 298 insertions(+)

diff --git a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/pom.xml b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/pom.xml
index 3f0b9ed15e..42e6ab5100 100644
--- a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/pom.xml
@@ -77,6 +77,12 @@
             <artifactId>iotdb-server</artifactId>
             <version>${iotdb.sdk.version}</version>
             <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>ch.qos.logback</groupId>
+                    <artifactId>logback-classic</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.iotdb</groupId>
@@ -84,6 +90,12 @@
             <version>${iotdb.sdk.version}</version>
             <type>test-jar</type>
             <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>ch.qos.logback</groupId>
+                    <artifactId>logback-classic</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.glassfish.jersey.inject</groupId>
diff --git a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java
index f45275a22d..70fa43cd81 100755
--- a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java
+++ b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java
@@ -64,6 +64,9 @@ public abstract class AbstractIoTDB extends AbstractProcessor {
     private static final Map<RecordFieldType, TSDataType> typeMap =
             new HashMap<>();
 
+    private static final Map<String, RecordFieldType> reversedTypeMap =
+            new HashMap<>();
+
     static final Set<RecordFieldType> supportedType =
             new HashSet<>();
 
@@ -126,6 +129,9 @@ public abstract class AbstractIoTDB extends AbstractProcessor {
         typeMap.put(RecordFieldType.LONG, TSDataType.INT64);
         typeMap.put(RecordFieldType.FLOAT, TSDataType.FLOAT);
         typeMap.put(RecordFieldType.DOUBLE, TSDataType.DOUBLE);
+        for (Map.Entry<RecordFieldType, TSDataType> it : typeMap.entrySet()) {
+            reversedTypeMap.put(String.valueOf(it.getValue()),it.getKey());
+        }
 
         supportedType.add(RecordFieldType.BOOLEAN);
         supportedType.add(RecordFieldType.STRING);
@@ -184,6 +190,10 @@ public abstract class AbstractIoTDB extends AbstractProcessor {
         return typeMap.get(type);
     }
 
+    protected RecordFieldType getType(String type) {
+        return reversedTypeMap.get(type);
+    }
+
     protected ValidationResult validateSchemaAttribute(String schemaAttribute) {
         JsonNode schema;
         try {
diff --git a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/QueryIoTDBRecord.java b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/QueryIoTDBRecord.java
new file mode 100755
index 0000000000..023b46980c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/QueryIoTDBRecord.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+
+import java.io.OutputStream;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Collections;
+
+@SupportsBatching
+@Tags({"IoT", "Timeseries"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Query Apache IoTDB and write results as Records")
+@WritesAttributes({
+        @WritesAttribute(attribute = QueryIoTDBRecord.IOTDB_ERROR_MESSAGE, description = "Error message written on query failures"),
+        @WritesAttribute(attribute = QueryIoTDBRecord.MIME_TYPE, description = "Content Type based on configured Record Set Writer")
+})
+public class QueryIoTDBRecord extends AbstractIoTDB {
+
+    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
+            .name("Query")
+            .displayName("Query")
+            .description("IoTDB query to be executed")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
+            .name("Fetch Size")
+            .displayName("Fetch Size")
+            .description("Maximum number of results to return in a single chunk. Configuring 1 or more enables result set chunking")
+            .defaultValue(String.valueOf(10_000))
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.createLongValidator(0, 100_000, true))
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder()
+            .name("Record Writer")
+            .displayName("Record Writer")
+            .description("Service for writing IoTDB query results as records")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(true)
+            .build();
+
+    public static final String IOTDB_ERROR_MESSAGE = "iotdb.error.message";
+
+    public static final String MIME_TYPE = "mime.type";
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> propertyDescriptors = new ArrayList<>(super.getSupportedPropertyDescriptors());
+        propertyDescriptors.add(QUERY);
+        propertyDescriptors.add(FETCH_SIZE);
+        propertyDescriptors.add(RECORD_WRITER_FACTORY);
+        return Collections.unmodifiableList(propertyDescriptors);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue();
+        final int fetchSize = context.getProperty(FETCH_SIZE).asInteger();
+        final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+
+        try (
+                SessionDataSet sessionDataSet = this.session.get().executeQueryStatement(query);
+                OutputStream outputStream = session.write(flowFile)
+        ) {
+            sessionDataSet.setFetchSize(fetchSize);
+
+            final RecordSchema recordSchema = getRecordSchema(sessionDataSet);
+            final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(getLogger(), recordSchema, outputStream, flowFile);
+            while (sessionDataSet.hasNext()) {
+                final RowRecord rowRecord = sessionDataSet.next();
+                final Record record = getRecord(recordSchema, rowRecord);
+                recordSetWriter.write(record);
+            }
+
+            recordSetWriter.close();
+            flowFile = session.putAttribute(flowFile, MIME_TYPE, recordSetWriter.getMimeType());
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (final Exception e) {
+            flowFile = session.putAttribute(flowFile, IOTDB_ERROR_MESSAGE, e.getMessage());
+            getLogger().error("IoTDB query failed {}", flowFile, e);
+            session.transfer(flowFile, REL_FAILURE);
+        }
+    }
+
+    private Record getRecord(final RecordSchema schema, final RowRecord rowRecord) {
+        final Map<String, Object> row = new LinkedHashMap<>();
+        final Iterator<String> recordFieldNames = schema.getFieldNames().iterator();
+
+        // Put Timestamp as first field
+        row.put(recordFieldNames.next(), rowRecord.getTimestamp());
+
+        final Iterator<Field> rowRecordFields = rowRecord.getFields().iterator();
+        while (recordFieldNames.hasNext()) {
+            final String recordFieldName = recordFieldNames.next();
+            if (rowRecordFields.hasNext()) {
+                final Field rowRecordField = rowRecordFields.next();
+                final TSDataType dataType = rowRecordField.getDataType();
+                final Object objectValue = rowRecordField.getObjectValue(dataType);
+                row.put(recordFieldName, objectValue);
+            }
+        }
+        return new MapRecord(schema, row);
+    }
+
+    private RecordSchema getRecordSchema(final SessionDataSet sessionDataSet) {
+        final Iterator<String> columnTypes = sessionDataSet.getColumnTypes().iterator();
+        final Iterator<String> columnNames = sessionDataSet.getColumnNames().iterator();
+
+        final List<RecordField> recordFields = new ArrayList<>();
+        while (columnNames.hasNext()) {
+            final String recordFieldName = columnNames.next();
+            final String columnType = columnTypes.next();
+            final RecordFieldType recordFieldType = getType(columnType);
+            final DataType recordDataType = recordFieldType.getDataType();
+            final RecordField recordField = new RecordField(recordFieldName, recordDataType);
+            recordFields.add(recordField);
+        }
+        return new SimpleRecordSchema(recordFields);
+    }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 4a19720633..962f2a5cf3 100755
--- a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -13,3 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 org.apache.nifi.processors.PutIoTDBRecord
+org.apache.nifi.processors.QueryIoTDBRecord
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/QueryIoTDBIT.java b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/QueryIoTDBIT.java
new file mode 100755
index 0000000000..6e641b268b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/QueryIoTDBIT.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.MockFlowFile;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class QueryIoTDBIT {
+    private static final String WRITER_SERVICE_ID = "writer";
+
+    private static final String DEVICE_ID = "root.sg7.d1";
+
+    private static final String FIRST_MEASUREMENT = "s0";
+
+    private static final String SECOND_MEASUREMENT = "s1";
+
+    private static final long TIMESTAMP = 1;
+
+    private TestRunner testRunner;
+    private MockRecordWriter recordWriter;
+    private Session session;
+
+    @BeforeEach
+    public void setRunner() throws IoTDBConnectionException, StatementExecutionException {
+        testRunner = TestRunners.newTestRunner(QueryIoTDBRecord.class);
+        recordWriter = new MockRecordWriter("header", true);
+        testRunner.setProperty(QueryIoTDBRecord.RECORD_WRITER_FACTORY, WRITER_SERVICE_ID);
+        testRunner.setProperty(QueryIoTDBRecord.IOTDB_HOST, "127.0.0.1");
+        testRunner.setProperty(QueryIoTDBRecord.USERNAME, "root");
+        testRunner.setProperty(QueryIoTDBRecord.PASSWORD, "root");
+        session = new Session.Builder().build();
+        session.open();
+
+        List<String> measurements = new ArrayList<>(2);
+        measurements.add(FIRST_MEASUREMENT);
+        measurements.add(SECOND_MEASUREMENT);
+
+        List<String> values = new ArrayList<>(2);
+        values.add("5.0");
+        values.add("6.0");
+        session.insertRecord(DEVICE_ID, TIMESTAMP, measurements, values);
+    }
+
+    @AfterEach
+    public void shutdown() throws Exception {
+        testRunner.shutdown();
+        recordWriter.disabled();
+        session.close();
+        EnvironmentUtils.cleanEnv();
+        EnvironmentUtils.shutdownDaemon();
+    }
+
+    @Test
+    public void testQueryIoTDBbyProperty() throws InitializationException {
+        setUpStandardTestConfig();
+
+        final String query = String.format("SELECT %s, %s FROM %s", FIRST_MEASUREMENT, SECOND_MEASUREMENT, DEVICE_ID);
+        testRunner.setProperty(QueryIoTDBRecord.QUERY, query);
+        testRunner.enqueue(new byte[]{});
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_SUCCESS, 1);
+
+        final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(PutIoTDBRecord.REL_SUCCESS).get(0);
+        flowFile.assertContentEquals("header\n\"1\",\"5.0\",\"6.0\"\n");
+        flowFile.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
+    }
+
+    private void setUpStandardTestConfig() throws InitializationException {
+        testRunner.addControllerService(WRITER_SERVICE_ID, recordWriter);
+        testRunner.enableControllerService(recordWriter);
+    }
+}