You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2020/01/09 10:26:55 UTC

[nifi] branch master updated: NIFI-6960: Add Provenance Table to MetricsSqlQueryService

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d214f9  NIFI-6960: Add Provenance Table to MetricsSqlQueryService
8d214f9 is described below

commit 8d214f99a4c2f58210279b918ca0d974ef6eb733
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Thu Dec 19 14:40:06 2019 -0500

    NIFI-6960: Add Provenance Table to MetricsSqlQueryService
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #3945.
---
 .../nifi/reporting/sql/MetricsSqlQueryService.java |   3 +
 .../nifi/reporting/sql/QueryNiFiReportingTask.java |   4 +-
 .../sql/provenance/ProvenanceEnumerator.java       | 191 ++++++++++++++++++
 .../provenance/ProvenanceProjectTableScanRule.java |  76 ++++++++
 .../reporting/sql/provenance/ProvenanceTable.java  | 214 +++++++++++++++++++++
 .../sql/provenance/ProvenanceTableScan.java        |  91 +++++++++
 .../nifi/reporting/sql/util/QueryMetricsUtil.java  |   4 +-
 .../additionalDetails.html                         |  35 +++-
 .../reporting/sql/TestQueryNiFiReportingTask.java  | 117 ++++++++++-
 9 files changed, 719 insertions(+), 16 deletions(-)

diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsSqlQueryService.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsSqlQueryService.java
index a267998..cbcf4b2 100644
--- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsSqlQueryService.java
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsSqlQueryService.java
@@ -33,6 +33,7 @@ import org.apache.nifi.reporting.sql.connectionstatuspredictions.ConnectionStatu
 import org.apache.nifi.reporting.sql.metrics.JvmMetricsTable;
 import org.apache.nifi.reporting.sql.processgroupstatus.ProcessGroupStatusTable;
 import org.apache.nifi.reporting.sql.processorstatus.ProcessorStatusTable;
+import org.apache.nifi.reporting.sql.provenance.ProvenanceTable;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.ResultSetRecordSet;
 import org.apache.nifi.util.db.JdbcCommon;
@@ -163,6 +164,8 @@ public class MetricsSqlQueryService implements MetricsQueryService {
         rootSchema.add("JVM_METRICS", jvmMetricsTable);
         final BulletinTable bulletinTable = new BulletinTable(context, getLogger());
         rootSchema.add("BULLETINS", bulletinTable);
+        final ProvenanceTable provenanceTable = new ProvenanceTable(context, getLogger());
+        rootSchema.add("PROVENANCE", provenanceTable);
 
         rootSchema.setCacheEnabled(false);
 
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java
index 6f3aa9e..a97baa8 100644
--- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java
@@ -38,9 +38,9 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-@Tags({"status", "connection", "processor", "jvm", "metrics", "history", "bulletin", "prediction", "process", "group", "record", "sql"})
+@Tags({"status", "connection", "processor", "jvm", "metrics", "history", "bulletin", "prediction", "process", "group", "provenance", "record", "sql"})
 @CapabilityDescription("Publishes NiFi status information based on the results of a user-specified SQL query. The query may make use of the CONNECTION_STATUS, PROCESSOR_STATUS, "
-        + "BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS, or CONNECTION_STATUS_PREDICTIONS tables, and can use any functions or capabilities provided by Apache Calcite. Note that the "
+        + "BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS, CONNECTION_STATUS_PREDICTIONS, or PROVENANCE tables, and can use any functions or capabilities provided by Apache Calcite. Note that the "
         + "CONNECTION_STATUS_PREDICTIONS table is not available for querying if analytics are not enabled (see the nifi.analytics.predict.enabled property in nifi.properties). Attempting a "
         + "query on the table when the capability is disabled will cause an error.")
 public class QueryNiFiReportingTask extends AbstractReportingTask {
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/provenance/ProvenanceEnumerator.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/provenance/ProvenanceEnumerator.java
new file mode 100644
index 0000000..6bc2e58
--- /dev/null
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/provenance/ProvenanceEnumerator.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.reporting.sql.provenance;
+
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.util.provenance.ComponentMapHolder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class ProvenanceEnumerator implements Enumerator<Object> {
+    private static final int FETCH_SIZE = 1000;
+    private final ComponentLog logger;
+    private final int[] fields;
+    private final ProvenanceEventRepository provenanceEventRepository;
+    private List<ProvenanceEventRecord> provenanceEvents;
+    private final ComponentMapHolder componentMapHolder;
+    private final String nodeIdentifier;
+
+    private Object currentRow;
+    private long currentId = 0;
+    private int currentIndex = 0; // Index into the current fetch
+
+    public ProvenanceEnumerator(final ReportingContext context, final ComponentLog logger, final int[] fields) {
+        this.logger = logger;
+        this.fields = fields;
+        final EventAccess eventAccess = context.getEventAccess();
+        this.provenanceEventRepository = eventAccess.getProvenanceRepository();
+        final ProcessGroupStatus procGroupStatus = eventAccess.getControllerStatus();
+        this.componentMapHolder = ComponentMapHolder.createComponentMap(procGroupStatus);
+
+        final boolean isClustered = context.isClustered();
+        nodeIdentifier = context.getClusterNodeIdentifier();
+        if (nodeIdentifier == null && isClustered) {
+            logger.warn("This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. "
+                    + "The contentPath and previousContentPath fields will be null for all rows in this query");
+        }
+
+        try {
+            this.provenanceEvents = provenanceEventRepository.getEvents(0, FETCH_SIZE);
+        } catch (IOException ioe) {
+            logger.error("Error retrieving provenance events, queries will return no rows");
+        }
+        reset();
+    }
+
+    @Override
+    public Object current() {
+        return currentRow;
+    }
+
+    @Override
+    public boolean moveNext() {
+        if (provenanceEvents == null) {
+            return false;
+        }
+        currentRow = null;
+        if (currentIndex == provenanceEvents.size()) {
+            // Need to fetch a new set of rows, starting with the last ID + 1
+            try {
+                provenanceEvents = provenanceEventRepository.getEvents(currentId + 1, FETCH_SIZE);
+                currentIndex = 0;
+            } catch (IOException ioe) {
+                logger.error("Error retrieving provenance events, queries will return no further rows");
+                return false;
+            }
+        }
+
+        if (provenanceEvents.isEmpty()) {
+            // If we are out of data, close the InputStream. We do this because
+            // Calcite does not necessarily call our close() method.
+            close();
+            try {
+                onFinish();
+            } catch (final Exception e) {
+                logger.error("Failed to perform tasks when enumerator was finished", e);
+            }
+
+            return false;
+        }
+
+        ProvenanceEventRecord provenanceEvent = provenanceEvents.get(currentIndex);
+        currentRow = filterColumns(provenanceEvent);
+        currentId = provenanceEvent.getEventId();
+        currentIndex++;
+        return true;
+    }
+
+    protected int getRecordsRead() {
+        return 1;
+    }
+
+    protected void onFinish() {
+    }
+
+    private Object filterColumns(final ProvenanceEventRecord provenanceEvent) {
+        if (provenanceEvent == null) {
+            return null;
+        }
+
+        final ArrayList<Object> rowList = new ArrayList<>();
+        rowList.add(provenanceEvent.getEventId());
+        rowList.add(provenanceEvent.getEventType().name());
+        rowList.add(provenanceEvent.getEventTime());
+        rowList.add(provenanceEvent.getEventDuration());
+        rowList.add(provenanceEvent.getLineageStartDate());
+        rowList.add(provenanceEvent.getDetails());
+        rowList.add(provenanceEvent.getComponentId());
+        rowList.add(componentMapHolder.getComponentName(provenanceEvent.getComponentId()));
+        rowList.add(provenanceEvent.getComponentType());
+        final String processGroupId = componentMapHolder.getProcessGroupId(provenanceEvent.getComponentId(), provenanceEvent.getComponentType());
+        rowList.add(processGroupId);
+        rowList.add(componentMapHolder.getComponentName(processGroupId));
+        rowList.add(provenanceEvent.getFlowFileUuid());
+        rowList.add("org.apache.nifi.flowfile.FlowFile"); // entityType
+        rowList.add(provenanceEvent.getFileSize());
+        rowList.add(provenanceEvent.getPreviousFileSize());
+        rowList.add(provenanceEvent.getUpdatedAttributes());
+        rowList.add(provenanceEvent.getPreviousAttributes());
+        if (nodeIdentifier != null) {
+            final String contentPathBase = "/nifi-api/provenance-events/" + provenanceEvent.getEventId() + "/content/";
+            final String nodeIdSuffix = "?clusterNodeId=" + nodeIdentifier;
+            rowList.add(contentPathBase + "output" + nodeIdSuffix);
+            rowList.add(contentPathBase + "input" + nodeIdSuffix);
+        } else {
+            rowList.add(null);
+            rowList.add(null);
+        }
+        rowList.add(provenanceEvent.getParentUuids());
+        rowList.add(provenanceEvent.getChildUuids());
+        rowList.add(provenanceEvent.getTransitUri());
+        rowList.add(provenanceEvent.getSourceSystemFlowFileIdentifier());
+        rowList.add(provenanceEvent.getAlternateIdentifierUri());
+
+        final Object[] row = rowList.toArray();
+
+        // If we want no fields just return null
+        if (fields == null) {
+            return row;
+        }
+
+        // If we want only a single field, then Calcite is going to expect us to return
+        // the actual value, NOT a 1-element array of values.
+        if (fields.length == 1) {
+            final int desiredCellIndex = fields[0];
+            return row[desiredCellIndex];
+        }
+
+        // Create a new Object array that contains only the desired fields.
+        final Object[] filtered = new Object[fields.length];
+        for (int i = 0; i < fields.length; i++) {
+            final int indexToKeep = fields[i];
+            filtered[i] = row[indexToKeep];
+        }
+
+        return filtered;
+    }
+
+    @Override
+    public void reset() {
+        currentId = 0;
+        currentIndex = 0;
+    }
+
+    @Override
+    public void close() {
+    }
+}
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/provenance/ProvenanceProjectTableScanRule.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/provenance/ProvenanceProjectTableScanRule.java
new file mode 100644
index 0000000..8062448
--- /dev/null
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/provenance/ProvenanceProjectTableScanRule.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.sql.provenance;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.List;
+
+/**
+ * Planner rule that projects from a {@link ProvenanceTableScan} scan just the columns
+ * needed to satisfy a projection. If the projection's expressions are trivial,
+ * the projection is removed.
+ */
+public class ProvenanceProjectTableScanRule extends RelOptRule {
+    public static final ProvenanceProjectTableScanRule INSTANCE = new ProvenanceProjectTableScanRule();
+
+    private ProvenanceProjectTableScanRule() {
+        super(
+            operand(LogicalProject.class,
+                operand(ProvenanceTableScan.class, none())),
+            "ProvenanceProjectTableScanRule");
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        final LogicalProject project = call.rel(0);
+        final ProvenanceTableScan scan = call.rel(1);
+        final int[] fields = getProjectFields(project.getProjects());
+
+        if (fields == null) {
+            // Project contains expressions more complex than just field references.
+            return;
+        }
+
+        call.transformTo(
+            new ProvenanceTableScan(
+                scan.getCluster(),
+                scan.getTable(),
+                scan.provenanceTable,
+                fields));
+    }
+
+    private int[] getProjectFields(List<RexNode> exps) {
+        final int[] fields = new int[exps.size()];
+
+        for (int i = 0; i < exps.size(); i++) {
+            final RexNode exp = exps.get(i);
+
+            if (exp instanceof RexInputRef) {
+                fields[i] = ((RexInputRef) exp).getIndex();
+            } else {
+                return null; // not a simple projection
+            }
+        }
+
+        return fields;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/provenance/ProvenanceTable.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/provenance/ProvenanceTable.java
new file mode 100644
index 0000000..1c8263a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/provenance/ProvenanceTable.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.sql.provenance;
+
+import org.apache.calcite.linq4j.AbstractEnumerable;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.linq4j.Queryable;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.QueryableTable;
+import org.apache.calcite.schema.Schema.TableType;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.calcite.schema.impl.AbstractTable;
+import org.apache.calcite.util.Pair;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.reporting.ReportingContext;
+
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+
+public class ProvenanceTable extends AbstractTable implements QueryableTable, TranslatableTable {
+
+    private final ComponentLog logger;
+
+    private RelDataType relDataType = null;
+
+    private volatile ReportingContext context;
+    private volatile int maxRecordsRead;
+
+    private final Set<ProvenanceEnumerator> enumerators = new HashSet<>();
+
+    /**
+     * Creates a Provenance events table.
+     */
+    public ProvenanceTable(final ReportingContext context, final ComponentLog logger) {
+        this.context = context;
+        this.logger = logger;
+    }
+
+    @Override
+    public String toString() {
+        return "ProvenanceTable";
+    }
+
+    public void close() {
+        synchronized (enumerators) {
+            for (final ProvenanceEnumerator enumerator : enumerators) {
+                enumerator.close();
+            }
+        }
+    }
+
+    /**
+     * Returns an enumerable over a given projection of the fields.
+     *
+     * <p>
+     * Called from generated code.
+     */
+    public Enumerable<Object> project(final int[] fields) {
+        return new AbstractEnumerable<Object>() {
+            @Override
+            public Enumerator<Object> enumerator() {
+                final ProvenanceEnumerator provenanceEnumerator = new ProvenanceEnumerator(context, logger, fields) {
+                    @Override
+                    protected void onFinish() {
+                        final int recordCount = getRecordsRead();
+                        if (recordCount > maxRecordsRead) {
+                            maxRecordsRead = recordCount;
+                        }
+                    }
+
+                    @Override
+                    public void close() {
+                        synchronized (enumerators) {
+                            enumerators.remove(this);
+                        }
+                        super.close();
+                    }
+                };
+
+                synchronized (enumerators) {
+                    enumerators.add(provenanceEnumerator);
+                }
+
+                return provenanceEnumerator;
+            }
+        };
+    }
+
+    public int getRecordsRead() {
+        return maxRecordsRead;
+    }
+
+    @Override
+    public Expression getExpression(final SchemaPlus schema, final String tableName, final Class clazz) {
+        return Schemas.tableExpression(schema, getElementType(), tableName, clazz);
+    }
+
+    @Override
+    public Type getElementType() {
+        return Object[].class;
+    }
+
+    @Override
+    public <T> Queryable<T> asQueryable(final QueryProvider queryProvider, final SchemaPlus schema, final String tableName) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public RelNode toRel(final RelOptTable.ToRelContext context, final RelOptTable relOptTable) {
+        // Request all fields.
+        final int fieldCount = relOptTable.getRowType().getFieldCount();
+        final int[] fields = new int[fieldCount];
+        for (int i = 0; i < fieldCount; i++) {
+            fields[i] = i;
+        }
+
+        return new ProvenanceTableScan(context.getCluster(), relOptTable, this, fields);
+    }
+
+    @Override
+    public RelDataType getRowType(final RelDataTypeFactory typeFactory) {
+        if (relDataType != null) {
+            return relDataType;
+        }
+
+        final List<String> names = Arrays.asList(
+                "eventId",
+                "eventType",
+                "timestampMillis",
+                "durationMillis",
+                "lineageStart",
+                "details",
+                "componentId",
+                "componentName",
+                "componentType",
+                "processGroupId",
+                "processGroupName",
+                "entityId",
+                "entityType",
+                "entitySize",
+                "previousEntitySize",
+                "updatedAttributes",
+                "previousAttributes",
+                "contentPath",
+                "previousContentPath",
+                "parentIds",
+                "childIds",
+                "transitUri",
+                "remoteIdentifier",
+                "alternateIdentifier"
+        );
+
+        final List<RelDataType> types = new ArrayList<>();
+        types.add(typeFactory.createJavaType(long.class));
+        types.add(typeFactory.createJavaType(String.class));
+        types.add(typeFactory.createJavaType(long.class));
+        types.add(typeFactory.createJavaType(long.class));
+        types.add(typeFactory.createJavaType(long.class));
+        types.add(typeFactory.createJavaType(String.class));
+        types.add(typeFactory.createJavaType(String.class));
+        types.add(typeFactory.createJavaType(String.class));
+        types.add(typeFactory.createJavaType(String.class));
+        types.add(typeFactory.createJavaType(String.class));
+        types.add(typeFactory.createJavaType(String.class));
+        types.add(typeFactory.createJavaType(String.class));
+        types.add(typeFactory.createJavaType(String.class));
+        types.add(typeFactory.createJavaType(long.class));
+        types.add(typeFactory.createJavaType(long.class));
+        types.add(typeFactory.createMapType(typeFactory.createJavaType(String.class), typeFactory.createJavaType(String.class)));
+        types.add(typeFactory.createMapType(typeFactory.createJavaType(String.class), typeFactory.createJavaType(String.class)));
+        types.add(typeFactory.createJavaType(String.class));
+        types.add(typeFactory.createJavaType(String.class));
+        types.add(typeFactory.createArrayType(typeFactory.createJavaType(String.class), -1));
+        types.add(typeFactory.createArrayType(typeFactory.createJavaType(String.class), -1));
+        types.add(typeFactory.createJavaType(String.class));
+        types.add(typeFactory.createJavaType(String.class));
+        types.add(typeFactory.createJavaType(String.class));
+
+        relDataType = typeFactory.createStructType(Pair.zip(names, types));
+        return relDataType;
+    }
+
+    @Override
+    public TableType getJdbcTableType() {
+        return TableType.TEMPORARY_TABLE;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/provenance/ProvenanceTableScan.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/provenance/ProvenanceTableScan.java
new file mode 100644
index 0000000..f8420f0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/provenance/ProvenanceTableScan.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.sql.provenance;
+
+import org.apache.calcite.adapter.enumerable.EnumerableConvention;
+import org.apache.calcite.adapter.enumerable.EnumerableRel;
+import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
+import org.apache.calcite.adapter.enumerable.PhysType;
+import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
+import org.apache.calcite.linq4j.tree.Blocks;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.linq4j.tree.Primitive;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+
+import java.util.List;
+
+/**
+ * Relational expression representing a query over the provenance repository.
+ *
+ * <p>
+ * Like any table scan, it serves as a leaf node of a query tree.
+ * </p>
+ */
+public class ProvenanceTableScan extends TableScan implements EnumerableRel {
+    final ProvenanceTable provenanceTable;
+    final int[] fields;
+
+    protected ProvenanceTableScan(final RelOptCluster cluster, final RelOptTable table, final ProvenanceTable provenanceTable, final int[] fields) {
+        super(cluster, cluster.traitSetOf(EnumerableConvention.INSTANCE), table);
+
+        this.provenanceTable = provenanceTable;
+        this.fields = fields;
+    }
+
+    @Override
+    public RelNode copy(final RelTraitSet traitSet, final List<RelNode> inputs) {
+        return new ProvenanceTableScan(getCluster(), table, provenanceTable, fields);
+    }
+
+    @Override
+    public RelWriter explainTerms(final RelWriter pw) {
+        return super.explainTerms(pw).item("fields", Primitive.asList(fields));
+    }
+
+    @Override
+    public RelDataType deriveRowType() {
+        final List<RelDataTypeField> fieldList = table.getRowType().getFieldList();
+        final RelDataTypeFactory.Builder builder = getCluster().getTypeFactory().builder();
+        for (int field : fields) {
+            builder.add(fieldList.get(field));
+        }
+        return builder.build();
+    }
+
+    @Override
+    public void register(RelOptPlanner planner) {
+        planner.addRule(ProvenanceProjectTableScanRule.INSTANCE);
+    }
+
+    @Override
+    public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
+        PhysType physType = PhysTypeImpl.of(implementor.getTypeFactory(), getRowType(), pref.preferArray());
+
+        return implementor.result(physType, Blocks.toBlock(
+            Expressions.call(table.getExpression(ProvenanceTable.class), "project", Expressions.constant(fields))));
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/QueryMetricsUtil.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/QueryMetricsUtil.java
index 159daec..18b6998 100644
--- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/QueryMetricsUtil.java
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/QueryMetricsUtil.java
@@ -41,8 +41,8 @@ public class QueryMetricsUtil {
             .name("sql-reporting-query")
             .displayName("SQL Query")
             .description("SQL SELECT statement specifies which tables to query and how data should be filtered/transformed. "
-                    + "SQL SELECT can select from the CONNECTION_STATUS, PROCESSOR_STATUS, BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS, or CONNECTION_STATUS_PREDICTIONS tables. Note that the "
-                    + "CONNECTION_STATUS_PREDICTIONS table is not available for querying if analytics are not enabled).")
+                    + "SQL SELECT can select from the CONNECTION_STATUS, PROCESSOR_STATUS, BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS, CONNECTION_STATUS_PREDICTIONS, or PROVENANCE tables. "
+                    + "Note that the CONNECTION_STATUS_PREDICTIONS table is not available for querying if analytics are not enabled).")
             .required(true)
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .addValidator(new SqlValidator())
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.QueryNiFiReportingTask/additionalDetails.html b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.QueryNiFiReportingTask/additionalDetails.html
index 1ce7583..7b55eef 100644
--- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.QueryNiFiReportingTask/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.QueryNiFiReportingTask/additionalDetails.html
@@ -26,9 +26,9 @@
 <p>
     This reporting task can be used to issue SQL queries against various NiFi metrics information, modeled as tables,
     and transmit the query results to some specified destination. The query may make use of the CONNECTION_STATUS,
-    PROCESSOR_STATUS, BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS, or CONNECTION_STATUS_PREDICTIONS tables, and can
-    use any functions or capabilities provided by <a href="https://calcite.apache.org/">Apache Calcite</a>, including
-    JOINs, aggregate functions, etc.
+    PROCESSOR_STATUS, BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS, CONNECTION_STATUS_PREDICTIONS, or PROVENANCE tables,
+    and can use any functions or capabilities provided by <a href="https://calcite.apache.org/">Apache Calcite</a>,
+    including JOINs, aggregate functions, etc.
 </p>
 <p>
     The results are transmitted to the destination using the configured Record Sink service, such as
@@ -174,6 +174,35 @@
     <tr><td>predictedTimeToCountBackpressureMillis</td><td>long</td></tr>
     <tr><td>predictionIntervalMillis</td><td>long</td></tr>
 </table>
+<br/>
+<h3>PROVENANCE</h3>
+<table title="PROVENANCE" border="1" width="500">
+    <tr><th>Column</th><th>Data Type</th></tr>
+    <tr><td>eventId</td><td>long</td></tr>
+    <tr><td>eventType</td><td>String</td></tr>
+    <tr><td>timestampMillis</td><td>long</td></tr>
+    <tr><td>durationMillis</td><td>long</td></tr>
+    <tr><td>lineageStart</td><td>long</td></tr>
+    <tr><td>details</td><td>String</td></tr>
+    <tr><td>componentId</td><td>String</td></tr>
+    <tr><td>componentName</td><td>String</td></tr>
+    <tr><td>componentType</td><td>String</td></tr>
+    <tr><td>processGroupId</td><td>String</td></tr>
+    <tr><td>processGroupName</td><td>String</td></tr>
+    <tr><td>entityId</td><td>String</td></tr>
+    <tr><td>entityType</td><td>String</td></tr>
+    <tr><td>entitySize</td><td>long</td></tr>
+    <tr><td>previousEntitySize</td><td>long</td></tr>
+    <tr><td>updatedAttributes</td><td>Map&lt;String,String&gt;</String></td></tr>
+    <tr><td>previousAttributes</td><td>Map&lt;String,String&gt;</td></tr>
+    <tr><td>contentPath</td><td>String</td></tr>
+    <tr><td>previousContentPath</td><td>String</td></tr>
+    <tr><td>parentIds</td><td>Array&lt;String&gt;</td></tr>
+    <tr><td>childIds</td><td>Array&lt;String&gt;</td></tr>
+    <tr><td>transitUri</td><td>String</td></tr>
+    <tr><td>remoteIdentifier</td><td>String</td></tr>
+    <tr><td>alternateIdentifier</td><td>String</td></tr>
+</table>
 <br/><br/>
 <h2>SQL Query Examples</h2>
 <p>
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
index eae9e91..c86fb1c 100644
--- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
@@ -25,6 +25,10 @@ import org.apache.nifi.controller.status.ConnectionStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.ProcessorStatus;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.provenance.MockProvenanceRepository;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.record.sink.MockRecordSinkService;
 import org.apache.nifi.record.sink.RecordSinkService;
 import org.apache.nifi.reporting.EventAccess;
@@ -34,7 +38,10 @@ import org.apache.nifi.reporting.ReportingInitializationContext;
 import org.apache.nifi.reporting.sql.util.QueryMetricsUtil;
 import org.apache.nifi.reporting.util.metrics.MetricNames;
 import org.apache.nifi.state.MockStateManager;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessSession;
 import org.apache.nifi.util.MockPropertyValue;
+import org.apache.nifi.util.SharedSessionState;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -47,11 +54,14 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 public class TestQueryNiFiReportingTask {
 
@@ -194,10 +204,10 @@ public class TestQueryNiFiReportingTask {
 
         List<Map<String, Object>> rows = mockRecordSinkService.getRows();
         assertEquals(1, rows.size());
-        Map<String,Object> row = rows.get(0);
+        Map<String, Object> row = rows.get(0);
         assertEquals(11, row.size());
-        assertTrue(row.get(MetricNames.JVM_DAEMON_THREAD_COUNT.replace(".","_")) instanceof Integer);
-        assertTrue(row.get(MetricNames.JVM_HEAP_USAGE.replace(".","_")) instanceof Double);
+        assertTrue(row.get(MetricNames.JVM_DAEMON_THREAD_COUNT.replace(".", "_")) instanceof Integer);
+        assertTrue(row.get(MetricNames.JVM_HEAP_USAGE.replace(".", "_")) instanceof Double);
     }
 
     @Test
@@ -237,11 +247,55 @@ public class TestQueryNiFiReportingTask {
         assertEquals(0, rows.size());
     }
 
+    @Test
+    public void testProvenanceTable() throws IOException, InitializationException {
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
+        properties.put(QueryMetricsUtil.QUERY, "select * from PROVENANCE order by eventId asc");
+        reportingTask = initTask(properties);
+        reportingTask.onTrigger(context);
+
+        List<Map<String, Object>> rows = mockRecordSinkService.getRows();
+        assertEquals(1001, rows.size());
+        // Validate the first row
+        Map<String, Object> row = rows.get(0);
+        assertEquals(24, row.size());
+        // Verify the first row contents
+        assertEquals(0L, row.get("eventId"));
+        assertEquals("CREATE", row.get("eventType"));
+        assertEquals(12L, row.get("entitySize"));
+        assertNull(row.get("contentPath"));
+        assertNull(row.get("previousContentPath"));
+
+        Object o = row.get("previousAttributes");
+        assertTrue(o instanceof Map);
+        Map<String, String> previousAttributes = (Map<String, String>) o;
+        assertEquals("A", previousAttributes.get("test.value"));
+        o = row.get("updatedAttributes");
+        assertTrue(o instanceof Map);
+        Map<String, String> updatedAttributes = (Map<String, String>) o;
+        assertEquals("B", updatedAttributes.get("test.value"));
+
+        // Verify some fields in the second row
+        row = rows.get(1);
+        assertEquals(24, row.size());
+        // Verify the second row contents
+        assertEquals(1L, row.get("eventId"));
+        assertEquals("DROP", row.get("eventType"));
+
+        // Verify some fields in the last row
+        row = rows.get(1000);
+        assertEquals(24, row.size());
+        // Verify the last row contents
+        assertEquals(1000L, row.get("eventId"));
+        assertEquals("DROP", row.get("eventType"));
+    }
+
     private MockQueryNiFiReportingTask initTask(Map<PropertyDescriptor, String> customProperties) throws InitializationException, IOException {
 
-        final ComponentLog logger = Mockito.mock(ComponentLog.class);
+        final ComponentLog logger = mock(ComponentLog.class);
         reportingTask = new MockQueryNiFiReportingTask();
-        final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class);
+        final ReportingInitializationContext initContext = mock(ReportingInitializationContext.class);
         Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
         Mockito.when(initContext.getLogger()).thenReturn(logger);
         reportingTask.initialize(initContext);
@@ -251,26 +305,71 @@ public class TestQueryNiFiReportingTask {
         }
         properties.putAll(customProperties);
 
-        context = Mockito.mock(ReportingContext.class);
+        context = mock(ReportingContext.class);
         Mockito.when(context.getStateManager()).thenReturn(new MockStateManager(reportingTask));
         Mockito.doAnswer((Answer<PropertyValue>) invocation -> {
             final PropertyDescriptor descriptor = invocation.getArgument(0, PropertyDescriptor.class);
             return new MockPropertyValue(properties.get(descriptor));
         }).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
 
-        final EventAccess eventAccess = Mockito.mock(EventAccess.class);
+        final EventAccess eventAccess = mock(EventAccess.class);
         Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
         Mockito.when(eventAccess.getControllerStatus()).thenReturn(status);
 
-        final PropertyValue pValue = Mockito.mock(StandardPropertyValue.class);
+        final PropertyValue pValue = mock(StandardPropertyValue.class);
         mockRecordSinkService = new MockRecordSinkService();
         Mockito.when(context.getProperty(QueryMetricsUtil.RECORD_SINK)).thenReturn(pValue);
         Mockito.when(pValue.asControllerService(RecordSinkService.class)).thenReturn(mockRecordSinkService);
 
-        ConfigurationContext configContext = Mockito.mock(ConfigurationContext.class);
+        ConfigurationContext configContext = mock(ConfigurationContext.class);
         Mockito.when(configContext.getProperty(QueryMetricsUtil.RECORD_SINK)).thenReturn(pValue);
         reportingTask.setup(configContext);
 
+        MockProvenanceRepository provenanceRepository = new MockProvenanceRepository();
+        long currentTimeMillis = System.currentTimeMillis();
+        Map<String, String> previousAttributes = new HashMap<>();
+        previousAttributes.put("mime.type", "application/json");
+        previousAttributes.put("test.value", "A");
+        Map<String, String> updatedAttributes = new HashMap<>(previousAttributes);
+        updatedAttributes.put("test.value", "B");
+
+        // Generate provenance events and put them in a repository
+        Processor processor = mock(Processor.class);
+        SharedSessionState sharedState = new SharedSessionState(processor, new AtomicLong(0));
+        MockProcessSession processSession = new MockProcessSession(sharedState, processor);
+        MockFlowFile mockFlowFile = processSession.createFlowFile("Test content".getBytes());
+
+        ProvenanceEventRecord prov1 = provenanceRepository.eventBuilder()
+                .setEventType(ProvenanceEventType.CREATE)
+                .fromFlowFile(mockFlowFile)
+                .setComponentId("12345")
+                .setComponentType("ReportingTask")
+                .setFlowFileUUID("I am FlowFile 1")
+                .setEventTime(currentTimeMillis)
+                .setEventDuration(100)
+                .setTransitUri("test://")
+                .setSourceSystemFlowFileIdentifier("I am FlowFile 1")
+                .setAlternateIdentifierUri("remote://test")
+                .setAttributes(previousAttributes, updatedAttributes)
+                .build();
+
+        provenanceRepository.registerEvent(prov1);
+
+        for (int i = 1; i < 1001; i++) {
+            String indexString = Integer.toString(i);
+            mockFlowFile = processSession.createFlowFile(("Test content " + indexString).getBytes());
+            ProvenanceEventRecord prov = provenanceRepository.eventBuilder()
+                    .fromFlowFile(mockFlowFile)
+                    .setEventType(ProvenanceEventType.DROP)
+                    .setComponentId(indexString)
+                    .setComponentType("Processor")
+                    .setFlowFileUUID("I am FlowFile " + indexString)
+                    .setEventTime(currentTimeMillis - i)
+                    .build();
+            provenanceRepository.registerEvent(prov);
+        }
+
+        Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository);
         return reportingTask;
     }