You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/02/11 09:14:11 UTC

[GitHub] [nifi] simonbence opened a new pull request #4821: NIFI-8113 Adding persistent status history repository backed by embed…

simonbence opened a new pull request #4821:
URL: https://github.com/apache/nifi/pull/4821


   This would be a proposal for having persistent status history for both component and node level metrics. During the implementation I tried to balance between introducing some flexibility in usage and supporting the existing behaviour. As an end result, I did split the component status repository (now it is StatusRepository) into one which is responsible for component metrics and an other responsible for node level metrics. They might be configured independently on some level (like it is possible to store data into a different storage or for a different amount of time window), but in order to support the previous configuration, there is a facade for them which provides the same composite service as it did before (component and node).
   
   As for code organisational part I worked with three concepts: repository is the top level entity, provides the service for the clients (FlowController, etc.). In general, this is the same as before, only split into two parts: node and component. The storage classes are part of the repositories and are responsible for manage the details of a given type, like processor status, node status, etc. Finally the WriterTemplates and ReaderTemplates are merely helpers exist to deal with QuestDB API calls.
   
   Some remarks on design decisions:
   - I kept the name for VolatileComponentStatusRepository, but the actual repositories are using the prefix InMemory to show more contrast with QuestDB
   - The PR contains a small fix on ProcessGroupStatusDescriptor#calculateTaskMillis(): previously every subsequent level makes a nano->milli conversion, which accumulates, reducing the task time in children groups into 0. Now this should be fixed (The QuestDB tests are implicitly proves this as well)
   - Configuration contains the new parameters but they are commented out. At this point I think, the VolatileComponentStatusRepository should be kept as default
   - The implementation depends on the latest of the 4.X version of QuestDB. Currently QuestDB is at 5.0.5, but the 5.X line depends on Java 11. There are some non vital features in 5.X (all_tables request, dropping partition using WHERE closure), but these are not unavoidable for us.
   
   Thank you for investing your time to my PR!
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
        in the commit message?
   
   - [ ] Does your PR title start with **NIFI-XXXX** where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically `main`)?
   
   - [ ] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._
   
   ### For code changes:
   - [ ] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder?
   - [ ] Have you written or updated unit tests to verify your changes?
   - [ ] Have you verified that the full build is successful on JDK 8?
   - [ ] Have you verified that the full build is successful on JDK 11?
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
   - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`?
   - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence commented on a change in pull request #4821: NIFI-8113 Adding persistent status history repository backed by embed…

Posted by GitBox <gi...@apache.org>.
simonbence commented on a change in pull request #4821:
URL: https://github.com/apache/nifi/pull/4821#discussion_r578219487



##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/storage/BufferedNodeStatusStorage.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history.storage;
+
+import org.apache.commons.math3.util.Pair;
+import org.apache.nifi.controller.status.NodeStatus;
+import org.apache.nifi.controller.status.history.StatusHistory;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Decorator in front of a {@link NodeStatusStorage}. It accumulates entries to store within an internal buffer until
+ * method {@link #persist()} is being called, when buffer content is drained and being sent to te decorated instance.
+ */
+public class BufferedNodeStatusStorage implements NodeStatusStorage, Persistable {
+    private final BlockingQueue<Pair<Date, NodeStatus>> queue = new LinkedBlockingQueue<>();
+    private final NodeStatusStorage payload;

Review comment:
       We had the same idea, I already pushed a change which makes the whole buffered (batched) writing somewhat more clear.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence commented on a change in pull request #4821: NIFI-8113 Adding persistent status history repository backed by embed…

Posted by GitBox <gi...@apache.org>.
simonbence commented on a change in pull request #4821:
URL: https://github.com/apache/nifi/pull/4821#discussion_r579142419



##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
##########
@@ -120,11 +120,40 @@ nifi.provenance.repository.concurrent.merge.threads=${nifi.provenance.repository
 # Volatile Provenance Respository Properties
 nifi.provenance.repository.buffer.size=${nifi.provenance.repository.buffer.size}
 
-# Component Status Repository
+# Component and Node Status Repository
+
+# Implementation based specification. Must be the classname of org.apache.nifi.controller.status.history.StatusRepository implementation.
+# This property specifies both the Component and the Node Status Repositories.
+# This approach takes precedence over the builder based specification if presents.
 nifi.components.status.repository.implementation=${nifi.components.status.repository.implementation}
+
+# Builder based specification. Gives the possibility to store Node and Component Status History information in different storage solutions.

Review comment:
       The guide update is underway, I will add it soonish




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence commented on a change in pull request #4821: NIFI-8113 Adding persistent status history repository backed by embed…

Posted by GitBox <gi...@apache.org>.
simonbence commented on a change in pull request #4821:
URL: https://github.com/apache/nifi/pull/4821#discussion_r579130587



##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbRolloverHandler.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import io.questdb.cairo.sql.Record;
+import io.questdb.cairo.sql.RecordCursor;
+import io.questdb.cairo.sql.RecordCursorFactory;
+import io.questdb.griffin.SqlCompiler;
+import io.questdb.griffin.SqlExecutionContext;
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.nifi.controller.status.history.questdb.QuestDbContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * QuestDB does not provide the possibility for deleting individual lines. Instead there is the option to drop older
+ * partitions. In order to clean up older status information, the partitions are outside of the scope of data we intend
+ * to keep will be deleted.
+ */
+public class EmbeddedQuestDbRolloverHandler implements Runnable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedQuestDbRolloverHandler.class);
+
+    // Drop keyword is intentionally not uppercase as the query parser only recognizes it in this way
+    private static final String DELETION_QUERY = "ALTER TABLE %s drop PARTITION '%s'";
+    // Distinct keyword is not recognized if the date mapping is not within an inner query
+    static final String SELECTION_QUERY = "SELECT DISTINCT * FROM (SELECT (to_str(capturedAt, 'yyyy-MM-dd')) AS partitionName FROM %s)";
+
+    static final FastDateFormat DATE_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd");
+
+    private final QuestDbContext dbContext;
+    private final List<String> tables = new ArrayList<>();
+    private final int daysToKeepData;
+
+    public EmbeddedQuestDbRolloverHandler(final Collection<String> tables, final int daysToKeepData, final QuestDbContext dbContext) {
+        this.tables.addAll(tables);
+        this.dbContext = dbContext;
+        this.daysToKeepData = daysToKeepData;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.debug("Starting rollover");
+        tables.forEach(tableName -> rollOverTable(tableName));
+        LOGGER.debug("Finishing rollover");
+    }
+
+    private void rollOverTable(final CharSequence tableName) {
+        try {
+            final Set<String> partitions = getPartitions(tableName);
+            final Set<String> partitionToKeep = getPartitionsToKeep();
+
+            for (final String partition : partitions) {
+                if (!partitionToKeep.contains(partition)) {
+                    deletePartition(tableName, partition);
+                }
+            }
+        } catch (final Exception e) {
+            LOGGER.error("Could not rollover table " + tableName, e);
+        }
+    }
+
+    private void deletePartition(final CharSequence tableName, final String partition) {
+        try (final SqlCompiler compiler = dbContext.getCompiler()) {
+            compiler.compile(String.format(DELETION_QUERY, new Object[]{tableName, partition}), dbContext.getSqlExecutionContext());
+        } catch (final Exception e) {
+            LOGGER.error("Dropping partition " + partition + " of table " + tableName + " failed", e);
+        }
+    }
+
+    private Set<String> getPartitions(final CharSequence tableName) throws Exception {
+        final SqlExecutionContext executionContext = dbContext.getSqlExecutionContext();
+        final Set<String> result = new HashSet<>();
+
+        try (
+            final SqlCompiler compiler = dbContext.getCompiler();
+            final RecordCursorFactory recordCursorFactory = compiler.compile(String.format(SELECTION_QUERY, new Object[]{tableName}), executionContext).getRecordCursorFactory();
+            final RecordCursor cursor = recordCursorFactory.getCursor(executionContext);
+        ) {
+            while (cursor.hasNext()) {
+                final Record record = cursor.getRecord();
+                result.add(new StringBuilder(record.getStr(0)).toString());

Review comment:
       `Record#getStr` returns with a CharSequence of some kind. I found StringBuilder the most elaborate way to ensure the correct content will be extracted as with most other ways only a `#toString` would be called.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence commented on a change in pull request #4821: NIFI-8113 Adding persistent status history repository backed by embed…

Posted by GitBox <gi...@apache.org>.
simonbence commented on a change in pull request #4821:
URL: https://github.com/apache/nifi/pull/4821#discussion_r579139071



##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java
##########
@@ -113,16 +113,16 @@ public String getField() {
 
 
     private static long calculateTaskMillis(final ProcessGroupStatus status) {
-        long nanos = 0L;

Review comment:
       Something like this would solve both issues:
   
   ```
   private static long calculateTaskMillis(final ProcessGroupStatus status) {
       return TimeUnit.MILLISECONDS.convert(calculateTaskNanos(status), TimeUnit.NANOSECONDS);
   }
   
   private static long calculateTaskNanos(final ProcessGroupStatus status) {
      long nanos = 0L;
   
      for (final ProcessorStatus procStatus : status.getProcessorStatus()) {
          nanos += procStatus.getProcessingNanos();
      }
   
      for (final ProcessGroupStatus childStatus : status.getProcessGroupStatus()) {
          nanos += calculateTaskNanos(childStatus);
      }
   
      return nanos;
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] turcsanyip commented on a change in pull request #4821: NIFI-8113 Adding persistent status history repository backed by embed…

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4821:
URL: https://github.com/apache/nifi/pull/4821#discussion_r577613897



##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/storage/BufferedNodeStatusStorage.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history.storage;
+
+import org.apache.commons.math3.util.Pair;
+import org.apache.nifi.controller.status.NodeStatus;
+import org.apache.nifi.controller.status.history.StatusHistory;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Decorator in front of a {@link NodeStatusStorage}. It accumulates entries to store within an internal buffer until
+ * method {@link #persist()} is being called, when buffer content is drained and being sent to te decorated instance.
+ */
+public class BufferedNodeStatusStorage implements NodeStatusStorage, Persistable {
+    private final BlockingQueue<Pair<Date, NodeStatus>> queue = new LinkedBlockingQueue<>();
+    private final NodeStatusStorage payload;

Review comment:
       `BufferedNodeStatusStorage` and `QuestDbNodeStatusStorage` implement this `NodeStatusStorage` interface but they cannot be used interchangeably. In `EmbeddedQuestDbNodeStatusRepository` a `BufferedNodeStatusStorage` needed (because it implements `Persistable` too) and here it does not really make sense to use a buffered instance (and it just would not work).
   For this reason, it would be better to separate these two classes (eg. via removing `BufferedNodeStatusStorage` from the hierarchy).

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbStatusWriter.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import org.apache.nifi.controller.status.history.storage.Persistable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class EmbeddedQuestDbStatusWriter implements Runnable {

Review comment:
       There is no QuestDB specific logic in this class so it should be renamed to some more generic name.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/questdb/QuestDbEntityWritingTemplate.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history.questdb;
+
+import io.questdb.cairo.TableWriter;
+import org.apache.commons.math3.util.Pair;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+
+/**
+ * Writes entry to the database with the given measurement time.
+ *
+ * @param <E> Entry type.
+ */
+public class QuestDbEntityWritingTemplate<E> extends QuestDbWritingTemplate<Pair<Date, E>> {
+    private final BiConsumer<E, TableWriter.Row> fillRow;
+
+    /**
+     * @param tableName Name of the target table.
+     * @param fillRow Responsible for filling a row based on the entry.
+     */
+    public QuestDbEntityWritingTemplate(final String tableName, final BiConsumer<E, TableWriter.Row> fillRow) {
+        super(tableName);
+        this.fillRow = fillRow;
+    }
+
+    @Override
+    protected void addRows(final TableWriter tableWriter, final Collection<Pair<Date, E>> entries) {
+        entries.forEach(statusEntry -> {
+            final long measuredAt = TimeUnit.MILLISECONDS.toMicros(statusEntry.getFirst().getTime());

Review comment:
       Both `measuredAt` and `capturedAt` are used throughout the code. Not sure they have the same meaning in every place but it they do, the same term should be used to make it clear.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] markap14 commented on a change in pull request #4821: NIFI-8113 Adding persistent status history repository backed by embed…

Posted by GitBox <gi...@apache.org>.
markap14 commented on a change in pull request #4821:
URL: https://github.com/apache/nifi/pull/4821#discussion_r579446476



##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbRolloverHandler.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import io.questdb.cairo.sql.Record;
+import io.questdb.cairo.sql.RecordCursor;
+import io.questdb.cairo.sql.RecordCursorFactory;
+import io.questdb.griffin.SqlCompiler;
+import io.questdb.griffin.SqlExecutionContext;
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.nifi.controller.status.history.questdb.QuestDbContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * QuestDB does not provide the possibility for deleting individual lines. Instead there is the option to drop older
+ * partitions. In order to clean up older status information, the partitions are outside of the scope of data we intend
+ * to keep will be deleted.
+ */
+public class EmbeddedQuestDbRolloverHandler implements Runnable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedQuestDbRolloverHandler.class);
+
+    // Drop keyword is intentionally not uppercase as the query parser only recognizes it in this way
+    private static final String DELETION_QUERY = "ALTER TABLE %s drop PARTITION '%s'";
+    // Distinct keyword is not recognized if the date mapping is not within an inner query
+    static final String SELECTION_QUERY = "SELECT DISTINCT * FROM (SELECT (to_str(capturedAt, 'yyyy-MM-dd')) AS partitionName FROM %s)";
+
+    static final FastDateFormat DATE_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd");
+
+    private final QuestDbContext dbContext;
+    private final List<String> tables = new ArrayList<>();
+    private final int daysToKeepData;
+
+    public EmbeddedQuestDbRolloverHandler(final Collection<String> tables, final int daysToKeepData, final QuestDbContext dbContext) {
+        this.tables.addAll(tables);
+        this.dbContext = dbContext;
+        this.daysToKeepData = daysToKeepData;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.debug("Starting rollover");
+        tables.forEach(tableName -> rollOverTable(tableName));
+        LOGGER.debug("Finishing rollover");
+    }
+
+    private void rollOverTable(final CharSequence tableName) {
+        try {
+            final Set<String> partitions = getPartitions(tableName);
+            final Set<String> partitionToKeep = getPartitionsToKeep();
+
+            for (final String partition : partitions) {
+                if (!partitionToKeep.contains(partition)) {
+                    deletePartition(tableName, partition);
+                }
+            }
+        } catch (final Exception e) {
+            LOGGER.error("Could not rollover table " + tableName, e);
+        }
+    }
+
+    private void deletePartition(final CharSequence tableName, final String partition) {
+        try (final SqlCompiler compiler = dbContext.getCompiler()) {
+            compiler.compile(String.format(DELETION_QUERY, new Object[]{tableName, partition}), dbContext.getSqlExecutionContext());
+        } catch (final Exception e) {
+            LOGGER.error("Dropping partition " + partition + " of table " + tableName + " failed", e);
+        }
+    }
+
+    private Set<String> getPartitions(final CharSequence tableName) throws Exception {
+        final SqlExecutionContext executionContext = dbContext.getSqlExecutionContext();
+        final Set<String> result = new HashSet<>();
+
+        try (
+            final SqlCompiler compiler = dbContext.getCompiler();
+            final RecordCursorFactory recordCursorFactory = compiler.compile(String.format(SELECTION_QUERY, new Object[]{tableName}), executionContext).getRecordCursorFactory();
+            final RecordCursor cursor = recordCursorFactory.getCursor(executionContext);
+        ) {
+            while (cursor.hasNext()) {
+                final Record record = cursor.getRecord();
+                result.add(new StringBuilder(record.getStr(0)).toString());

Review comment:
       Is there a reason not to just use `record.getStr(0).toString()` - to call the `toString` method of `CharSequence` directly? With that, if the object that is returned happens to be a `String` object (which implements `CharSequence`) then the `toString()` method simply returns `this`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence commented on a change in pull request #4821: NIFI-8113 Adding persistent status history repository backed by embed…

Posted by GitBox <gi...@apache.org>.
simonbence commented on a change in pull request #4821:
URL: https://github.com/apache/nifi/pull/4821#discussion_r578221925



##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
##########
@@ -16,412 +16,86 @@
  */
 package org.apache.nifi.controller.status.history;
 
-import org.apache.nifi.controller.status.ConnectionStatus;
 import org.apache.nifi.controller.status.NodeStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.controller.status.ProcessorStatus;
-import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
-import org.apache.nifi.util.ComponentMetrics;
 import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.util.RingBuffer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 
-public class VolatileComponentStatusRepository implements ComponentStatusRepository {
-    private static final Logger logger = LoggerFactory.getLogger(VolatileComponentStatusRepository.class);
-
-    private static final Set<MetricDescriptor<?>> DEFAULT_PROCESSOR_METRICS = Arrays.stream(ProcessorStatusDescriptor.values())
-        .map(ProcessorStatusDescriptor::getDescriptor)
-        .collect(Collectors.toSet());
-    private static final Set<MetricDescriptor<?>> DEFAULT_CONNECTION_METRICS = Arrays.stream(ConnectionStatusDescriptor.values())
-        .map(ConnectionStatusDescriptor::getDescriptor)
-        .collect(Collectors.toSet());
-    private static final Set<MetricDescriptor<?>> DEFAULT_GROUP_METRICS = Arrays.stream(ProcessGroupStatusDescriptor.values())
-        .map(ProcessGroupStatusDescriptor::getDescriptor)
-        .collect(Collectors.toSet());
-    private static final Set<MetricDescriptor<?>> DEFAULT_RPG_METRICS = Arrays.stream(RemoteProcessGroupStatusDescriptor.values())
-        .map(RemoteProcessGroupStatusDescriptor::getDescriptor)
-        .collect(Collectors.toSet());
-    private static final Set<MetricDescriptor<NodeStatus>> DEFAULT_NODE_METRICS = Arrays.stream(NodeStatusDescriptor.values())
-        .map(NodeStatusDescriptor::getDescriptor)
-        .collect(Collectors.toSet());
-
-    private static final String STORAGE_FREE_DESCRIPTION = "The usable space available for use by the underlying storage mechanism.";
-    private static final String STORAGE_USED_DESCRIPTION = "The space in use on the underlying storage mechanism";
-
-    private static final String GC_TIME_DESCRIPTION = "The sum time the garbage collection has run since the start of the Java virtual machine.";
-    private static final String GC_TIME_DIFF_DESCRIPTION = "The sum time the garbage collection has run since the last measurement.";
-    private static final String GC_COUNT_DESCRIPTION = "The sum amount of occasions the garbage collection has run since the start of the Java virtual machine.";
-    private static final String GC_COUNT_DIFF_DESCRIPTION = "The sum amount of occasions the garbage collection has run since the last measurement.";
-
-    public static final String NUM_DATA_POINTS_PROPERTY = "nifi.components.status.repository.buffer.size";
-    public static final int DEFAULT_NUM_DATA_POINTS = 288;   // 1 day worth of 5-minute snapshots
-
-    private final Map<String, ComponentStatusHistory> componentStatusHistories = new HashMap<>();
+/**
+ * Consists of in memory repositories for node and component status history.
+ * Necessary in order to support existing way of NiFi configuration.
+ */
+@Deprecated
+public class VolatileComponentStatusRepository implements StatusRepository {
 
-    // Changed to protected to allow unit testing
-    protected final RingBuffer<Date> timestamps;
-    private final RingBuffer<List<GarbageCollectionStatus>> gcStatuses;
-    private final RingBuffer<NodeStatus> nodeStatuses;
-    private final int numDataPoints;
-    private volatile long lastCaptureTime = 0L;
+    private final NodeStatusRepository nodeStatusRepository;
+    private final ComponentStatusRepository componentStatusRepository;

Review comment:
       You are right with that, however the instance creation is managed by the `InMemoryStatusRepositoryBuilder`, which is an implementation for `StatusRepositoryBuilder`. This comes with that it will not expose the implementation class it returns with and I wish not to expose that. (Also: adding an other way to create the instances would bring in code duplication and unnecessary complexity in my opinion)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence commented on pull request #4821: NIFI-8113 Adding persistent status history repository backed by embed…

Posted by GitBox <gi...@apache.org>.
simonbence commented on pull request #4821:
URL: https://github.com/apache/nifi/pull/4821#issuecomment-784292570


   I do abandon this review. Based on @markap14 's great comments, I simplified the configuration which resulted a more clean codebase as well. This comes with some changes I reverted and in order to keep the review readable I decided to create a new. All comments in this review are answered or aimed here.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence commented on a change in pull request #4821: NIFI-8113 Adding persistent status history repository backed by embed…

Posted by GitBox <gi...@apache.org>.
simonbence commented on a change in pull request #4821:
URL: https://github.com/apache/nifi/pull/4821#discussion_r581143770



##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbRolloverHandler.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import io.questdb.cairo.sql.Record;
+import io.questdb.cairo.sql.RecordCursor;
+import io.questdb.cairo.sql.RecordCursorFactory;
+import io.questdb.griffin.SqlCompiler;
+import io.questdb.griffin.SqlExecutionContext;
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.nifi.controller.status.history.questdb.QuestDbContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * QuestDB does not provide the possibility for deleting individual lines. Instead there is the option to drop older
+ * partitions. In order to clean up older status information, the partitions are outside of the scope of data we intend
+ * to keep will be deleted.
+ */
+public class EmbeddedQuestDbRolloverHandler implements Runnable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedQuestDbRolloverHandler.class);
+
+    // Drop keyword is intentionally not uppercase as the query parser only recognizes it in this way
+    private static final String DELETION_QUERY = "ALTER TABLE %s drop PARTITION '%s'";
+    // Distinct keyword is not recognized if the date mapping is not within an inner query
+    static final String SELECTION_QUERY = "SELECT DISTINCT * FROM (SELECT (to_str(capturedAt, 'yyyy-MM-dd')) AS partitionName FROM %s)";
+
+    static final FastDateFormat DATE_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd");
+
+    private final QuestDbContext dbContext;
+    private final List<String> tables = new ArrayList<>();
+    private final int daysToKeepData;
+
+    public EmbeddedQuestDbRolloverHandler(final Collection<String> tables, final int daysToKeepData, final QuestDbContext dbContext) {
+        this.tables.addAll(tables);
+        this.dbContext = dbContext;
+        this.daysToKeepData = daysToKeepData;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.debug("Starting rollover");
+        tables.forEach(tableName -> rollOverTable(tableName));
+        LOGGER.debug("Finishing rollover");
+    }
+
+    private void rollOverTable(final CharSequence tableName) {
+        try {
+            final Set<String> partitions = getPartitions(tableName);
+            final Set<String> partitionToKeep = getPartitionsToKeep();
+
+            for (final String partition : partitions) {
+                if (!partitionToKeep.contains(partition)) {
+                    deletePartition(tableName, partition);
+                }
+            }
+        } catch (final Exception e) {
+            LOGGER.error("Could not rollover table " + tableName, e);
+        }
+    }
+
+    private void deletePartition(final CharSequence tableName, final String partition) {
+        try (final SqlCompiler compiler = dbContext.getCompiler()) {
+            compiler.compile(String.format(DELETION_QUERY, new Object[]{tableName, partition}), dbContext.getSqlExecutionContext());
+        } catch (final Exception e) {
+            LOGGER.error("Dropping partition " + partition + " of table " + tableName + " failed", e);
+        }
+    }
+
+    private Set<String> getPartitions(final CharSequence tableName) throws Exception {
+        final SqlExecutionContext executionContext = dbContext.getSqlExecutionContext();
+        final Set<String> result = new HashSet<>();
+
+        try (
+            final SqlCompiler compiler = dbContext.getCompiler();
+            final RecordCursorFactory recordCursorFactory = compiler.compile(String.format(SELECTION_QUERY, new Object[]{tableName}), executionContext).getRecordCursorFactory();
+            final RecordCursor cursor = recordCursorFactory.getCursor(executionContext);
+        ) {
+            while (cursor.hasNext()) {
+                final Record record = cursor.getRecord();
+                result.add(new StringBuilder(record.getStr(0)).toString());

Review comment:
       The record return with an undetermined implementation of `CharSequence` (actually it's `CharSequenceView`, which is an internal implementation of the QuestDB) where it is not guaranteed that the `toString` will be implemented, or implemented properly. I was striving to keep in the safe side




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] markap14 commented on a change in pull request #4821: NIFI-8113 Adding persistent status history repository backed by embed…

Posted by GitBox <gi...@apache.org>.
markap14 commented on a change in pull request #4821:
URL: https://github.com/apache/nifi/pull/4821#discussion_r579444439



##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java
##########
@@ -113,16 +113,16 @@ public String getField() {
 
 
     private static long calculateTaskMillis(final ProcessGroupStatus status) {
-        long nanos = 0L;

Review comment:
       Ah, I see. Yes, I think this is a good approach, to calculate recursively using nanos and then converting to millis only after the recursive call.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence closed pull request #4821: NIFI-8113 Adding persistent status history repository backed by embed…

Posted by GitBox <gi...@apache.org>.
simonbence closed pull request #4821:
URL: https://github.com/apache/nifi/pull/4821


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence commented on a change in pull request #4821: NIFI-8113 Adding persistent status history repository backed by embed…

Posted by GitBox <gi...@apache.org>.
simonbence commented on a change in pull request #4821:
URL: https://github.com/apache/nifi/pull/4821#discussion_r579133945



##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java
##########
@@ -113,16 +113,16 @@ public String getField() {
 
 
     private static long calculateTaskMillis(final ProcessGroupStatus status) {
-        long nanos = 0L;

Review comment:
       I think I mentioned this at the PR description, but the point would be to avoid loss of information: with the original code, at every level of the recursion we did a nano > millis conversion, but the caller (one level up in the recursion) would consider the result as nano. Thus, the deeper we are in the group structure, the more times we make a conversion, which looks to be incorrect.
   
   If you still think that this comes with rounding errors, what I would suggest is to introduce a `calculateTaskNanos`, which would handle the recursion and work without converting, ant the `calculateTaskMillis` would call this and converting the end result only once.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence commented on a change in pull request #4821: NIFI-8113 Adding persistent status history repository backed by embed…

Posted by GitBox <gi...@apache.org>.
simonbence commented on a change in pull request #4821:
URL: https://github.com/apache/nifi/pull/4821#discussion_r579133945



##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java
##########
@@ -113,16 +113,16 @@ public String getField() {
 
 
     private static long calculateTaskMillis(final ProcessGroupStatus status) {
-        long nanos = 0L;

Review comment:
       I think I mentioned this at the PR description, but the point would be to avoid loss of information: with the original code, at every level of the recursion we did a nano > millis conversion, but the caller (one level up in the recursion) would consider the result as nano. Thus, the deeper we are in the group structure, the more times we make a conversion, which looks to be incorrect.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence commented on pull request #4821: NIFI-8113 Adding persistent status history repository backed by embed…

Posted by GitBox <gi...@apache.org>.
simonbence commented on pull request #4821:
URL: https://github.com/apache/nifi/pull/4821#issuecomment-784309459


   Please find the predecessor PR here: [PR 4839](https://github.com/apache/nifi/pull/4839)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] turcsanyip commented on a change in pull request #4821: NIFI-8113 Adding persistent status history repository backed by embed…

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4821:
URL: https://github.com/apache/nifi/pull/4821#discussion_r577129664



##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework-nar/src/main/resources/META-INF/NOTICE
##########
@@ -206,6 +206,10 @@ The following binary components are provided under the Apache Software License v
         Bytes Utility Library 1.3.0
         Copyright 2017 Patrick Favre-Bulle
 
+  (ASLv2) QuestDB (questdb-5.0.5.jar - https://github.com/questdb/questdb)

Review comment:
       Version does not match the one in the POM. Also in the other NOTICE file.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/questdb/QuestDbWritingTemplate.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history.questdb;
+
+import io.questdb.cairo.CairoEngine;
+import io.questdb.cairo.TableWriter;
+import io.questdb.griffin.SqlExecutionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+
+/**
+ * Template for writing entries into QuestDb.
+ *
+ * @param <T> The type of the entry.
+ */
+public abstract class QuestDbWritingTemplate<T> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(QuestDbWritingTemplate.class);
+
+    private final String tableName;
+
+    /**
+     * @param tableName Name of the target table.
+     */
+    protected QuestDbWritingTemplate(final String tableName) {
+        this.tableName = tableName;
+    }
+
+    /**
+     * Inserts the entries into the database.
+     *
+     * @param engine QuestDB engine.
+     * @param context Execution context.
+     * @param entries Entries to insert.
+     */
+    public void insert(final CairoEngine engine, final SqlExecutionContext context, final Collection<T> entries) {
+        if (entries.isEmpty()) {
+            return;
+        }
+
+        try (
+            final TableWriter tableWriter = engine.getWriter(context.getCairoSecurityContext(), tableName);
+        ) {
+            addRows(tableWriter, entries);
+            tableWriter.commit();
+        } catch (final Exception e) {
+            LOGGER.error("Error happened during writing into table " + tableName, e);
+            e.printStackTrace();

Review comment:
       Please remove `printStackTrace()`.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
##########
@@ -16,412 +16,86 @@
  */
 package org.apache.nifi.controller.status.history;
 
-import org.apache.nifi.controller.status.ConnectionStatus;
 import org.apache.nifi.controller.status.NodeStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.controller.status.ProcessorStatus;
-import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
-import org.apache.nifi.util.ComponentMetrics;
 import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.util.RingBuffer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 
-public class VolatileComponentStatusRepository implements ComponentStatusRepository {
-    private static final Logger logger = LoggerFactory.getLogger(VolatileComponentStatusRepository.class);
-
-    private static final Set<MetricDescriptor<?>> DEFAULT_PROCESSOR_METRICS = Arrays.stream(ProcessorStatusDescriptor.values())
-        .map(ProcessorStatusDescriptor::getDescriptor)
-        .collect(Collectors.toSet());
-    private static final Set<MetricDescriptor<?>> DEFAULT_CONNECTION_METRICS = Arrays.stream(ConnectionStatusDescriptor.values())
-        .map(ConnectionStatusDescriptor::getDescriptor)
-        .collect(Collectors.toSet());
-    private static final Set<MetricDescriptor<?>> DEFAULT_GROUP_METRICS = Arrays.stream(ProcessGroupStatusDescriptor.values())
-        .map(ProcessGroupStatusDescriptor::getDescriptor)
-        .collect(Collectors.toSet());
-    private static final Set<MetricDescriptor<?>> DEFAULT_RPG_METRICS = Arrays.stream(RemoteProcessGroupStatusDescriptor.values())
-        .map(RemoteProcessGroupStatusDescriptor::getDescriptor)
-        .collect(Collectors.toSet());
-    private static final Set<MetricDescriptor<NodeStatus>> DEFAULT_NODE_METRICS = Arrays.stream(NodeStatusDescriptor.values())
-        .map(NodeStatusDescriptor::getDescriptor)
-        .collect(Collectors.toSet());
-
-    private static final String STORAGE_FREE_DESCRIPTION = "The usable space available for use by the underlying storage mechanism.";
-    private static final String STORAGE_USED_DESCRIPTION = "The space in use on the underlying storage mechanism";
-
-    private static final String GC_TIME_DESCRIPTION = "The sum time the garbage collection has run since the start of the Java virtual machine.";
-    private static final String GC_TIME_DIFF_DESCRIPTION = "The sum time the garbage collection has run since the last measurement.";
-    private static final String GC_COUNT_DESCRIPTION = "The sum amount of occasions the garbage collection has run since the start of the Java virtual machine.";
-    private static final String GC_COUNT_DIFF_DESCRIPTION = "The sum amount of occasions the garbage collection has run since the last measurement.";
-
-    public static final String NUM_DATA_POINTS_PROPERTY = "nifi.components.status.repository.buffer.size";
-    public static final int DEFAULT_NUM_DATA_POINTS = 288;   // 1 day worth of 5-minute snapshots
-
-    private final Map<String, ComponentStatusHistory> componentStatusHistories = new HashMap<>();
+/**
+ * Consists of in memory repositories for node and component status history.
+ * Necessary in order to support existing way of NiFi configuration.
+ */
+@Deprecated
+public class VolatileComponentStatusRepository implements StatusRepository {
 
-    // Changed to protected to allow unit testing
-    protected final RingBuffer<Date> timestamps;
-    private final RingBuffer<List<GarbageCollectionStatus>> gcStatuses;
-    private final RingBuffer<NodeStatus> nodeStatuses;
-    private final int numDataPoints;
-    private volatile long lastCaptureTime = 0L;
+    private final NodeStatusRepository nodeStatusRepository;
+    private final ComponentStatusRepository componentStatusRepository;

Review comment:
       As `VolatileComponentStatusRepository` provides backward compatibility for the in memory storage and in the background it just delegates requests to the new `InMemory*StatusRepositories`, using those classes instead of the interface types would fit to the role of this class better.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
##########
@@ -120,11 +120,40 @@ nifi.provenance.repository.concurrent.merge.threads=${nifi.provenance.repository
 # Volatile Provenance Respository Properties
 nifi.provenance.repository.buffer.size=${nifi.provenance.repository.buffer.size}
 
-# Component Status Repository
+# Component and Node Status Repository
+
+# Implementation based specification. Must be the classname of org.apache.nifi.controller.status.history.StatusRepository implementation.
+# This property specifies both the Component and the Node Status Repositories.
+# This approach takes precedence over the builder based specification if presents.
 nifi.components.status.repository.implementation=${nifi.components.status.repository.implementation}
+
+# Builder based specification. Gives the possibility to store Node and Component Status History information in different storage solutions.
+# nifi.status.repository.builder.inmemory=org.apache.nifi.controller.status.history.InMemoryStatusRepositoryBuilder
+# nifi.status.repository.builder.persistent=org.apache.nifi.controller.status.history.EmbeddedQuestDbStatusRepositoryBuilder
+# nifi.status.repository.roles.component=persistent
+# nifi.status.repository.roles.node=inmemory

Review comment:
       I believe the new InMemory implementation should be the default instead of adding these lines commented out.
   As far as I understand, the backward compatibility point here is to support old `nifi.properties` files where only `nifi.components.status.repository.implementation` property exists. And it is provided / works even if the default is not the old property.
   New installations could (should) go with the new configuration / implementation classes.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusRepositoryFacade.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import org.apache.nifi.controller.status.NodeStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+
+import java.util.Date;
+import java.util.List;
+
+/**
+ * Facade implementation of the StatusRepository which dispatches request to the underlying node and component status
+ * repositories. The contained repositories might use different storage implementation.
+ */
+public class ComponentStatusRepositoryFacade implements StatusRepository {

Review comment:
       This class should be called `StatusRepositoryFacade` instead because it is not component status repository specific.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] markap14 commented on a change in pull request #4821: NIFI-8113 Adding persistent status history repository backed by embed…

Posted by GitBox <gi...@apache.org>.
markap14 commented on a change in pull request #4821:
URL: https://github.com/apache/nifi/pull/4821#discussion_r578557060



##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
##########
@@ -120,11 +120,40 @@ nifi.provenance.repository.concurrent.merge.threads=${nifi.provenance.repository
 # Volatile Provenance Respository Properties
 nifi.provenance.repository.buffer.size=${nifi.provenance.repository.buffer.size}
 
-# Component Status Repository
+# Component and Node Status Repository
+
+# Implementation based specification. Must be the classname of org.apache.nifi.controller.status.history.StatusRepository implementation.
+# This property specifies both the Component and the Node Status Repositories.
+# This approach takes precedence over the builder based specification if presents.
 nifi.components.status.repository.implementation=${nifi.components.status.repository.implementation}
+
+# Builder based specification. Gives the possibility to store Node and Component Status History information in different storage solutions.

Review comment:
       All properties that get added to this file need to be fully documented in the  `administration-guide.adoc` in `nifi-docs`

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
##########
@@ -120,11 +120,40 @@ nifi.provenance.repository.concurrent.merge.threads=${nifi.provenance.repository
 # Volatile Provenance Respository Properties
 nifi.provenance.repository.buffer.size=${nifi.provenance.repository.buffer.size}
 
-# Component Status Repository
+# Component and Node Status Repository
+
+# Implementation based specification. Must be the classname of org.apache.nifi.controller.status.history.StatusRepository implementation.
+# This property specifies both the Component and the Node Status Repositories.
+# This approach takes precedence over the builder based specification if presents.
 nifi.components.status.repository.implementation=${nifi.components.status.repository.implementation}
+
+# Builder based specification. Gives the possibility to store Node and Component Status History information in different storage solutions.
+# nifi.status.repository.builder.inmemory=org.apache.nifi.controller.status.history.InMemoryStatusRepositoryBuilder
+# nifi.status.repository.builder.persistent=org.apache.nifi.controller.status.history.EmbeddedQuestDbStatusRepositoryBuilder
+# nifi.status.repository.roles.component=persistent
+# nifi.status.repository.roles.node=inmemory

Review comment:
       I'm not sure that I see the benefit to adding these properties at all. If the user wants to persist the data, it should be persisted. If they want to keep it in memory, it should be kept in memory. These properties become confusing and add dubious value. We should lean more toward simple configuration vs. more raw power when we're able to.
   
   Recommend removing all 4 of these properties. Instead, just allow the QuestDB Status Repository to be configured via the `nifi.components.status.repository.implementation` property, in which case all stats are persistent. If Volatile repo is used, store everything in memory.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
##########
@@ -1124,20 +1129,77 @@ private ProvenanceRepository createProvenanceRepository(final NiFiProperties pro
         }
     }
 
-    private ComponentStatusRepository createComponentStatusRepository() {
-        final String implementationClassName = nifiProperties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION, DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION);
-        if (implementationClassName == null) {
-            throw new RuntimeException("Cannot create Component Status Repository because the NiFi Properties is missing the following property: "
-                    + NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
+    private StatusRepository createStatusRepositories() {
+        // Creating status repository based on implementation class takes precedence over creation based on builder
+        final String implementationClassName = nifiProperties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
+
+        if (implementationClassName != null) {

Review comment:
       Any time that we fetch a property value from nifi properties, we need to treat `null` the same as empty strings or strings with only white space. If the property name exists but with no value, you'll get back an empty string here instead of null.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
##########
@@ -120,11 +120,40 @@ nifi.provenance.repository.concurrent.merge.threads=${nifi.provenance.repository
 # Volatile Provenance Respository Properties
 nifi.provenance.repository.buffer.size=${nifi.provenance.repository.buffer.size}
 
-# Component Status Repository
+# Component and Node Status Repository
+
+# Implementation based specification. Must be the classname of org.apache.nifi.controller.status.history.StatusRepository implementation.
+# This property specifies both the Component and the Node Status Repositories.
+# This approach takes precedence over the builder based specification if presents.
 nifi.components.status.repository.implementation=${nifi.components.status.repository.implementation}
+
+# Builder based specification. Gives the possibility to store Node and Component Status History information in different storage solutions.
+# nifi.status.repository.builder.inmemory=org.apache.nifi.controller.status.history.InMemoryStatusRepositoryBuilder
+# nifi.status.repository.builder.persistent=org.apache.nifi.controller.status.history.EmbeddedQuestDbStatusRepositoryBuilder
+# nifi.status.repository.roles.component=persistent
+# nifi.status.repository.roles.node=inmemory
+
+# Volatile Status Repository Properties
 nifi.components.status.repository.buffer.size=${nifi.components.status.repository.buffer.size}
 nifi.components.status.snapshot.frequency=${nifi.components.status.snapshot.frequency}
 
+# QuestDB Status Repository Properties
+# nifi.status.repository.questdb.persist.frequency=${nifi.status.repository.questdb.persist.frequency}
+# nifi.status.repository.questdb.persist.roll.frequency=${nifi.status.repository.questdb.persist.roll.frequency}
+# nifi.status.repository.questdb.persist.batch.size=${nifi.status.repository.questdb.persist.batch.size}
+# nifi.status.repository.questdb.persist.node.days=${nifi.status.repository.questdb.persist.node.days}
+# nifi.status.repository.questdb.persist.component.days=${nifi.status.repository.questdb.persist.component.days}
+# nifi.status.repository.questdb.persist.location=${nifi.status.repository.questdb.persist.location}
+
+# The properties below are used for optimize QuesDB performance. For further details please see https://questdb.io/docs/reference/sql/create-table
+# An estimation of the number of components per component type, serves as hint for QuestDB performance optimization
+# nifi.status.repository.questdb.component.id.distinctvalues=${nifi.status.repository.questdb.component.id.distinctvalues}
+# If true, it turns on Java heap based caching for quicker lookup. This increases selection speed but consumes heap memory.
+# nifi.status.repository.questdb.component.id.cached=${nifi.status.repository.questdb.component.id.cached}
+# Turns on indexing of the component id field. For further details please see https://questdb.io/docs/concept/indexes/

Review comment:
       While referencing the QuestDB docs may provide some additional insights, we should not expect users to understand how QuestDB works. That is simply an implementation detail. We need to ensure that we fully document exactly how this property will affect the user, given the context of NiFi. We should do this in the administration guide, though, rather than add too much to the nifi.properties.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbRolloverHandler.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import io.questdb.cairo.sql.Record;
+import io.questdb.cairo.sql.RecordCursor;
+import io.questdb.cairo.sql.RecordCursorFactory;
+import io.questdb.griffin.SqlCompiler;
+import io.questdb.griffin.SqlExecutionContext;
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.nifi.controller.status.history.questdb.QuestDbContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * QuestDB does not provide the possibility for deleting individual lines. Instead there is the option to drop older
+ * partitions. In order to clean up older status information, the partitions are outside of the scope of data we intend
+ * to keep will be deleted.
+ */
+public class EmbeddedQuestDbRolloverHandler implements Runnable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedQuestDbRolloverHandler.class);
+
+    // Drop keyword is intentionally not uppercase as the query parser only recognizes it in this way
+    private static final String DELETION_QUERY = "ALTER TABLE %s drop PARTITION '%s'";
+    // Distinct keyword is not recognized if the date mapping is not within an inner query
+    static final String SELECTION_QUERY = "SELECT DISTINCT * FROM (SELECT (to_str(capturedAt, 'yyyy-MM-dd')) AS partitionName FROM %s)";
+
+    static final FastDateFormat DATE_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd");
+
+    private final QuestDbContext dbContext;
+    private final List<String> tables = new ArrayList<>();
+    private final int daysToKeepData;
+
+    public EmbeddedQuestDbRolloverHandler(final Collection<String> tables, final int daysToKeepData, final QuestDbContext dbContext) {
+        this.tables.addAll(tables);
+        this.dbContext = dbContext;
+        this.daysToKeepData = daysToKeepData;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.debug("Starting rollover");
+        tables.forEach(tableName -> rollOverTable(tableName));
+        LOGGER.debug("Finishing rollover");
+    }
+
+    private void rollOverTable(final CharSequence tableName) {
+        try {
+            final Set<String> partitions = getPartitions(tableName);
+            final Set<String> partitionToKeep = getPartitionsToKeep();
+
+            for (final String partition : partitions) {
+                if (!partitionToKeep.contains(partition)) {
+                    deletePartition(tableName, partition);
+                }
+            }
+        } catch (final Exception e) {
+            LOGGER.error("Could not rollover table " + tableName, e);
+        }
+    }
+
+    private void deletePartition(final CharSequence tableName, final String partition) {
+        try (final SqlCompiler compiler = dbContext.getCompiler()) {
+            compiler.compile(String.format(DELETION_QUERY, new Object[]{tableName, partition}), dbContext.getSqlExecutionContext());
+        } catch (final Exception e) {
+            LOGGER.error("Dropping partition " + partition + " of table " + tableName + " failed", e);
+        }
+    }
+
+    private Set<String> getPartitions(final CharSequence tableName) throws Exception {
+        final SqlExecutionContext executionContext = dbContext.getSqlExecutionContext();
+        final Set<String> result = new HashSet<>();
+
+        try (
+            final SqlCompiler compiler = dbContext.getCompiler();
+            final RecordCursorFactory recordCursorFactory = compiler.compile(String.format(SELECTION_QUERY, new Object[]{tableName}), executionContext).getRecordCursorFactory();
+            final RecordCursor cursor = recordCursorFactory.getCursor(executionContext);
+        ) {
+            while (cursor.hasNext()) {
+                final Record record = cursor.getRecord();
+                result.add(new StringBuilder(record.getStr(0)).toString());

Review comment:
       Why are we creating a StringBuilder here?

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
##########
@@ -120,11 +120,40 @@ nifi.provenance.repository.concurrent.merge.threads=${nifi.provenance.repository
 # Volatile Provenance Respository Properties
 nifi.provenance.repository.buffer.size=${nifi.provenance.repository.buffer.size}
 
-# Component Status Repository
+# Component and Node Status Repository
+
+# Implementation based specification. Must be the classname of org.apache.nifi.controller.status.history.StatusRepository implementation.
+# This property specifies both the Component and the Node Status Repositories.
+# This approach takes precedence over the builder based specification if presents.
 nifi.components.status.repository.implementation=${nifi.components.status.repository.implementation}
+
+# Builder based specification. Gives the possibility to store Node and Component Status History information in different storage solutions.
+# nifi.status.repository.builder.inmemory=org.apache.nifi.controller.status.history.InMemoryStatusRepositoryBuilder
+# nifi.status.repository.builder.persistent=org.apache.nifi.controller.status.history.EmbeddedQuestDbStatusRepositoryBuilder
+# nifi.status.repository.roles.component=persistent
+# nifi.status.repository.roles.node=inmemory
+
+# Volatile Status Repository Properties
 nifi.components.status.repository.buffer.size=${nifi.components.status.repository.buffer.size}
 nifi.components.status.snapshot.frequency=${nifi.components.status.snapshot.frequency}
 
+# QuestDB Status Repository Properties
+# nifi.status.repository.questdb.persist.frequency=${nifi.status.repository.questdb.persist.frequency}
+# nifi.status.repository.questdb.persist.roll.frequency=${nifi.status.repository.questdb.persist.roll.frequency}
+# nifi.status.repository.questdb.persist.batch.size=${nifi.status.repository.questdb.persist.batch.size}
+# nifi.status.repository.questdb.persist.node.days=${nifi.status.repository.questdb.persist.node.days}
+# nifi.status.repository.questdb.persist.component.days=${nifi.status.repository.questdb.persist.component.days}
+# nifi.status.repository.questdb.persist.location=${nifi.status.repository.questdb.persist.location}
+
+# The properties below are used for optimize QuesDB performance. For further details please see https://questdb.io/docs/reference/sql/create-table

Review comment:
       Typo in "QuestDB" - missing the T. Should also read "properties are used to optimize" rather than "for optimize"

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java
##########
@@ -113,16 +113,16 @@ public String getField() {
 
 
     private static long calculateTaskMillis(final ProcessGroupStatus status) {
-        long nanos = 0L;

Review comment:
       Why are these nanos being changed to millis? This leads to a lot of rounding errors, resulting in the data being both less precise and less accurate. By holding onto nanos and converting once at the end, it's also more efficient.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
##########
@@ -120,11 +120,40 @@ nifi.provenance.repository.concurrent.merge.threads=${nifi.provenance.repository
 # Volatile Provenance Respository Properties
 nifi.provenance.repository.buffer.size=${nifi.provenance.repository.buffer.size}
 
-# Component Status Repository
+# Component and Node Status Repository
+
+# Implementation based specification. Must be the classname of org.apache.nifi.controller.status.history.StatusRepository implementation.
+# This property specifies both the Component and the Node Status Repositories.
+# This approach takes precedence over the builder based specification if presents.
 nifi.components.status.repository.implementation=${nifi.components.status.repository.implementation}
+
+# Builder based specification. Gives the possibility to store Node and Component Status History information in different storage solutions.
+# nifi.status.repository.builder.inmemory=org.apache.nifi.controller.status.history.InMemoryStatusRepositoryBuilder
+# nifi.status.repository.builder.persistent=org.apache.nifi.controller.status.history.EmbeddedQuestDbStatusRepositoryBuilder
+# nifi.status.repository.roles.component=persistent
+# nifi.status.repository.roles.node=inmemory
+
+# Volatile Status Repository Properties
 nifi.components.status.repository.buffer.size=${nifi.components.status.repository.buffer.size}
 nifi.components.status.snapshot.frequency=${nifi.components.status.snapshot.frequency}
 
+# QuestDB Status Repository Properties
+# nifi.status.repository.questdb.persist.frequency=${nifi.status.repository.questdb.persist.frequency}
+# nifi.status.repository.questdb.persist.roll.frequency=${nifi.status.repository.questdb.persist.roll.frequency}
+# nifi.status.repository.questdb.persist.batch.size=${nifi.status.repository.questdb.persist.batch.size}

Review comment:
       These properties also seem too complex to me. Admins shouldn't need to guess what an appropriate "batch size" is for storing metrics. We should try to keep this as simple as possible and just configure how frequently we capture a snapshot. Can always add in additional properties later, if necessary, for tuning. Just don't want to overwhelm users with 15 additional properties when all the user really cares about is "I want this persisted for longer and across restarts."




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org