You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/02/18 17:42:30 UTC

[GitHub] [nifi] markap14 commented on a change in pull request #4821: NIFI-8113 Adding persistent status history repository backed by embed…

markap14 commented on a change in pull request #4821:
URL: https://github.com/apache/nifi/pull/4821#discussion_r578557060



##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
##########
@@ -120,11 +120,40 @@ nifi.provenance.repository.concurrent.merge.threads=${nifi.provenance.repository
 # Volatile Provenance Respository Properties
 nifi.provenance.repository.buffer.size=${nifi.provenance.repository.buffer.size}
 
-# Component Status Repository
+# Component and Node Status Repository
+
+# Implementation based specification. Must be the classname of org.apache.nifi.controller.status.history.StatusRepository implementation.
+# This property specifies both the Component and the Node Status Repositories.
+# This approach takes precedence over the builder based specification if presents.
 nifi.components.status.repository.implementation=${nifi.components.status.repository.implementation}
+
+# Builder based specification. Gives the possibility to store Node and Component Status History information in different storage solutions.

Review comment:
       All properties that get added to this file need to be fully documented in the  `administration-guide.adoc` in `nifi-docs`

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
##########
@@ -120,11 +120,40 @@ nifi.provenance.repository.concurrent.merge.threads=${nifi.provenance.repository
 # Volatile Provenance Respository Properties
 nifi.provenance.repository.buffer.size=${nifi.provenance.repository.buffer.size}
 
-# Component Status Repository
+# Component and Node Status Repository
+
+# Implementation based specification. Must be the classname of org.apache.nifi.controller.status.history.StatusRepository implementation.
+# This property specifies both the Component and the Node Status Repositories.
+# This approach takes precedence over the builder based specification if presents.
 nifi.components.status.repository.implementation=${nifi.components.status.repository.implementation}
+
+# Builder based specification. Gives the possibility to store Node and Component Status History information in different storage solutions.
+# nifi.status.repository.builder.inmemory=org.apache.nifi.controller.status.history.InMemoryStatusRepositoryBuilder
+# nifi.status.repository.builder.persistent=org.apache.nifi.controller.status.history.EmbeddedQuestDbStatusRepositoryBuilder
+# nifi.status.repository.roles.component=persistent
+# nifi.status.repository.roles.node=inmemory

Review comment:
       I'm not sure that I see the benefit to adding these properties at all. If the user wants to persist the data, it should be persisted. If they want to keep it in memory, it should be kept in memory. These properties become confusing and add dubious value. We should lean more toward simple configuration vs. more raw power when we're able to.
   
   Recommend removing all 4 of these properties. Instead, just allow the QuestDB Status Repository to be configured via the `nifi.components.status.repository.implementation` property, in which case all stats are persistent. If Volatile repo is used, store everything in memory.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
##########
@@ -1124,20 +1129,77 @@ private ProvenanceRepository createProvenanceRepository(final NiFiProperties pro
         }
     }
 
-    private ComponentStatusRepository createComponentStatusRepository() {
-        final String implementationClassName = nifiProperties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION, DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION);
-        if (implementationClassName == null) {
-            throw new RuntimeException("Cannot create Component Status Repository because the NiFi Properties is missing the following property: "
-                    + NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
+    private StatusRepository createStatusRepositories() {
+        // Creating status repository based on implementation class takes precedence over creation based on builder
+        final String implementationClassName = nifiProperties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
+
+        if (implementationClassName != null) {

Review comment:
       Any time that we fetch a property value from nifi properties, we need to treat `null` the same as empty strings or strings with only white space. If the property name exists but with no value, you'll get back an empty string here instead of null.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
##########
@@ -120,11 +120,40 @@ nifi.provenance.repository.concurrent.merge.threads=${nifi.provenance.repository
 # Volatile Provenance Respository Properties
 nifi.provenance.repository.buffer.size=${nifi.provenance.repository.buffer.size}
 
-# Component Status Repository
+# Component and Node Status Repository
+
+# Implementation based specification. Must be the classname of org.apache.nifi.controller.status.history.StatusRepository implementation.
+# This property specifies both the Component and the Node Status Repositories.
+# This approach takes precedence over the builder based specification if presents.
 nifi.components.status.repository.implementation=${nifi.components.status.repository.implementation}
+
+# Builder based specification. Gives the possibility to store Node and Component Status History information in different storage solutions.
+# nifi.status.repository.builder.inmemory=org.apache.nifi.controller.status.history.InMemoryStatusRepositoryBuilder
+# nifi.status.repository.builder.persistent=org.apache.nifi.controller.status.history.EmbeddedQuestDbStatusRepositoryBuilder
+# nifi.status.repository.roles.component=persistent
+# nifi.status.repository.roles.node=inmemory
+
+# Volatile Status Repository Properties
 nifi.components.status.repository.buffer.size=${nifi.components.status.repository.buffer.size}
 nifi.components.status.snapshot.frequency=${nifi.components.status.snapshot.frequency}
 
+# QuestDB Status Repository Properties
+# nifi.status.repository.questdb.persist.frequency=${nifi.status.repository.questdb.persist.frequency}
+# nifi.status.repository.questdb.persist.roll.frequency=${nifi.status.repository.questdb.persist.roll.frequency}
+# nifi.status.repository.questdb.persist.batch.size=${nifi.status.repository.questdb.persist.batch.size}
+# nifi.status.repository.questdb.persist.node.days=${nifi.status.repository.questdb.persist.node.days}
+# nifi.status.repository.questdb.persist.component.days=${nifi.status.repository.questdb.persist.component.days}
+# nifi.status.repository.questdb.persist.location=${nifi.status.repository.questdb.persist.location}
+
+# The properties below are used for optimize QuesDB performance. For further details please see https://questdb.io/docs/reference/sql/create-table
+# An estimation of the number of components per component type, serves as hint for QuestDB performance optimization
+# nifi.status.repository.questdb.component.id.distinctvalues=${nifi.status.repository.questdb.component.id.distinctvalues}
+# If true, it turns on Java heap based caching for quicker lookup. This increases selection speed but consumes heap memory.
+# nifi.status.repository.questdb.component.id.cached=${nifi.status.repository.questdb.component.id.cached}
+# Turns on indexing of the component id field. For further details please see https://questdb.io/docs/concept/indexes/

Review comment:
       While referencing the QuestDB docs may provide some additional insights, we should not expect users to understand how QuestDB works. That is simply an implementation detail. We need to ensure that we fully document exactly how this property will affect the user, given the context of NiFi. We should do this in the administration guide, though, rather than add too much to the nifi.properties.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbRolloverHandler.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import io.questdb.cairo.sql.Record;
+import io.questdb.cairo.sql.RecordCursor;
+import io.questdb.cairo.sql.RecordCursorFactory;
+import io.questdb.griffin.SqlCompiler;
+import io.questdb.griffin.SqlExecutionContext;
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.nifi.controller.status.history.questdb.QuestDbContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * QuestDB does not provide the possibility for deleting individual lines. Instead there is the option to drop older
+ * partitions. In order to clean up older status information, the partitions are outside of the scope of data we intend
+ * to keep will be deleted.
+ */
+public class EmbeddedQuestDbRolloverHandler implements Runnable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedQuestDbRolloverHandler.class);
+
+    // Drop keyword is intentionally not uppercase as the query parser only recognizes it in this way
+    private static final String DELETION_QUERY = "ALTER TABLE %s drop PARTITION '%s'";
+    // Distinct keyword is not recognized if the date mapping is not within an inner query
+    static final String SELECTION_QUERY = "SELECT DISTINCT * FROM (SELECT (to_str(capturedAt, 'yyyy-MM-dd')) AS partitionName FROM %s)";
+
+    static final FastDateFormat DATE_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd");
+
+    private final QuestDbContext dbContext;
+    private final List<String> tables = new ArrayList<>();
+    private final int daysToKeepData;
+
+    public EmbeddedQuestDbRolloverHandler(final Collection<String> tables, final int daysToKeepData, final QuestDbContext dbContext) {
+        this.tables.addAll(tables);
+        this.dbContext = dbContext;
+        this.daysToKeepData = daysToKeepData;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.debug("Starting rollover");
+        tables.forEach(tableName -> rollOverTable(tableName));
+        LOGGER.debug("Finishing rollover");
+    }
+
+    private void rollOverTable(final CharSequence tableName) {
+        try {
+            final Set<String> partitions = getPartitions(tableName);
+            final Set<String> partitionToKeep = getPartitionsToKeep();
+
+            for (final String partition : partitions) {
+                if (!partitionToKeep.contains(partition)) {
+                    deletePartition(tableName, partition);
+                }
+            }
+        } catch (final Exception e) {
+            LOGGER.error("Could not rollover table " + tableName, e);
+        }
+    }
+
+    private void deletePartition(final CharSequence tableName, final String partition) {
+        try (final SqlCompiler compiler = dbContext.getCompiler()) {
+            compiler.compile(String.format(DELETION_QUERY, new Object[]{tableName, partition}), dbContext.getSqlExecutionContext());
+        } catch (final Exception e) {
+            LOGGER.error("Dropping partition " + partition + " of table " + tableName + " failed", e);
+        }
+    }
+
+    private Set<String> getPartitions(final CharSequence tableName) throws Exception {
+        final SqlExecutionContext executionContext = dbContext.getSqlExecutionContext();
+        final Set<String> result = new HashSet<>();
+
+        try (
+            final SqlCompiler compiler = dbContext.getCompiler();
+            final RecordCursorFactory recordCursorFactory = compiler.compile(String.format(SELECTION_QUERY, new Object[]{tableName}), executionContext).getRecordCursorFactory();
+            final RecordCursor cursor = recordCursorFactory.getCursor(executionContext);
+        ) {
+            while (cursor.hasNext()) {
+                final Record record = cursor.getRecord();
+                result.add(new StringBuilder(record.getStr(0)).toString());

Review comment:
       Why are we creating a StringBuilder here?

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
##########
@@ -120,11 +120,40 @@ nifi.provenance.repository.concurrent.merge.threads=${nifi.provenance.repository
 # Volatile Provenance Respository Properties
 nifi.provenance.repository.buffer.size=${nifi.provenance.repository.buffer.size}
 
-# Component Status Repository
+# Component and Node Status Repository
+
+# Implementation based specification. Must be the classname of org.apache.nifi.controller.status.history.StatusRepository implementation.
+# This property specifies both the Component and the Node Status Repositories.
+# This approach takes precedence over the builder based specification if presents.
 nifi.components.status.repository.implementation=${nifi.components.status.repository.implementation}
+
+# Builder based specification. Gives the possibility to store Node and Component Status History information in different storage solutions.
+# nifi.status.repository.builder.inmemory=org.apache.nifi.controller.status.history.InMemoryStatusRepositoryBuilder
+# nifi.status.repository.builder.persistent=org.apache.nifi.controller.status.history.EmbeddedQuestDbStatusRepositoryBuilder
+# nifi.status.repository.roles.component=persistent
+# nifi.status.repository.roles.node=inmemory
+
+# Volatile Status Repository Properties
 nifi.components.status.repository.buffer.size=${nifi.components.status.repository.buffer.size}
 nifi.components.status.snapshot.frequency=${nifi.components.status.snapshot.frequency}
 
+# QuestDB Status Repository Properties
+# nifi.status.repository.questdb.persist.frequency=${nifi.status.repository.questdb.persist.frequency}
+# nifi.status.repository.questdb.persist.roll.frequency=${nifi.status.repository.questdb.persist.roll.frequency}
+# nifi.status.repository.questdb.persist.batch.size=${nifi.status.repository.questdb.persist.batch.size}
+# nifi.status.repository.questdb.persist.node.days=${nifi.status.repository.questdb.persist.node.days}
+# nifi.status.repository.questdb.persist.component.days=${nifi.status.repository.questdb.persist.component.days}
+# nifi.status.repository.questdb.persist.location=${nifi.status.repository.questdb.persist.location}
+
+# The properties below are used for optimize QuesDB performance. For further details please see https://questdb.io/docs/reference/sql/create-table

Review comment:
       Typo in "QuestDB" - missing the T. Should also read "properties are used to optimize" rather than "for optimize"

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java
##########
@@ -113,16 +113,16 @@ public String getField() {
 
 
     private static long calculateTaskMillis(final ProcessGroupStatus status) {
-        long nanos = 0L;

Review comment:
       Why are these nanos being changed to millis? This leads to a lot of rounding errors, resulting in the data being both less precise and less accurate. By holding onto nanos and converting once at the end, it's also more efficient.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
##########
@@ -120,11 +120,40 @@ nifi.provenance.repository.concurrent.merge.threads=${nifi.provenance.repository
 # Volatile Provenance Respository Properties
 nifi.provenance.repository.buffer.size=${nifi.provenance.repository.buffer.size}
 
-# Component Status Repository
+# Component and Node Status Repository
+
+# Implementation based specification. Must be the classname of org.apache.nifi.controller.status.history.StatusRepository implementation.
+# This property specifies both the Component and the Node Status Repositories.
+# This approach takes precedence over the builder based specification if presents.
 nifi.components.status.repository.implementation=${nifi.components.status.repository.implementation}
+
+# Builder based specification. Gives the possibility to store Node and Component Status History information in different storage solutions.
+# nifi.status.repository.builder.inmemory=org.apache.nifi.controller.status.history.InMemoryStatusRepositoryBuilder
+# nifi.status.repository.builder.persistent=org.apache.nifi.controller.status.history.EmbeddedQuestDbStatusRepositoryBuilder
+# nifi.status.repository.roles.component=persistent
+# nifi.status.repository.roles.node=inmemory
+
+# Volatile Status Repository Properties
 nifi.components.status.repository.buffer.size=${nifi.components.status.repository.buffer.size}
 nifi.components.status.snapshot.frequency=${nifi.components.status.snapshot.frequency}
 
+# QuestDB Status Repository Properties
+# nifi.status.repository.questdb.persist.frequency=${nifi.status.repository.questdb.persist.frequency}
+# nifi.status.repository.questdb.persist.roll.frequency=${nifi.status.repository.questdb.persist.roll.frequency}
+# nifi.status.repository.questdb.persist.batch.size=${nifi.status.repository.questdb.persist.batch.size}

Review comment:
       These properties also seem too complex to me. Admins shouldn't need to guess what an appropriate "batch size" is for storing metrics. We should try to keep this as simple as possible and just configure how frequently we capture a snapshot. Can always add in additional properties later, if necessary, for tuning. Just don't want to overwhelm users with 15 additional properties when all the user really cares about is "I want this persisted for longer and across restarts."




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org