You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/04/12 23:20:06 UTC

nifi-minifi git commit: MINIFI-13 created a provenance reporting task to send provenance information via S2S

Repository: nifi-minifi
Updated Branches:
  refs/heads/master 59f2d4418 -> 61e3a925d


MINIFI-13 created a provenance reporting task to send provenance information via S2S

This closes #6


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi/commit/61e3a925
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi/tree/61e3a925
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi/diff/61e3a925

Branch: refs/heads/master
Commit: 61e3a925d3af52da04a2866a298bb88264b20c5b
Parents: 59f2d44
Author: Joseph Percivall <jo...@yahoo.com>
Authored: Fri Apr 8 13:17:33 2016 -0400
Committer: Joseph Percivall <jo...@yahoo.com>
Committed: Tue Apr 12 17:03:59 2016 -0400

----------------------------------------------------------------------
 minifi-assembly/pom.xml                         |   6 +
 .../minifi-framework-bundle/pom.xml             |   2 +-
 .../minifi-provenance-reporting-nar/pom.xml     |  41 ++
 .../src/main/resources/META-INF/NOTICE          |  15 +
 .../minifi-provenance-reporting-task/pom.xml    |  82 ++++
 .../reporting/ProvenanceReportingTask.java      | 457 +++++++++++++++++++
 .../org.apache.nifi.reporting.ReportingTask     |  16 +
 .../reporting/TestProvenanceReportingTask.java  | 186 ++++++++
 .../minifi-provenance-reporting-bundle/pom.xml  |  41 ++
 minifi-nar-bundles/pom.xml                      |   3 +-
 pom.xml                                         |  13 +-
 11 files changed, 854 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/minifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-assembly/pom.xml b/minifi-assembly/pom.xml
index b076459..8dd223a 100644
--- a/minifi-assembly/pom.xml
+++ b/minifi-assembly/pom.xml
@@ -105,6 +105,12 @@ limitations under the License.
             <artifactId>minifi-runtime</artifactId>
             <version>0.0.1-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi.minifi</groupId>
+            <artifactId>minifi-provenance-reporting-nar</artifactId>
+            <version>0.0.1-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
 
         <!-- MiNiFi NiFi Dependencies -->
         <dependency>

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/minifi-nar-bundles/minifi-framework-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/pom.xml b/minifi-nar-bundles/minifi-framework-bundle/pom.xml
index 7fbd6b7..dca3a5c 100644
--- a/minifi-nar-bundles/minifi-framework-bundle/pom.xml
+++ b/minifi-nar-bundles/minifi-framework-bundle/pom.xml
@@ -20,7 +20,7 @@ limitations under the License.
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>minifi-nar-bundles</artifactId>
-        <groupId>org.apache.nifi</groupId>
+        <groupId>org.apache.nifi.minifi</groupId>
         <version>0.0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-nar/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-nar/pom.xml b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-nar/pom.xml
new file mode 100644
index 0000000..be74790
--- /dev/null
+++ b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-nar/pom.xml
@@ -0,0 +1,41 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi.minifi</groupId>
+        <artifactId>minifi-provenance-reporting-bundle</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>minifi-provenance-reporting-nar</artifactId>
+    <packaging>nar</packaging>
+    <properties>
+        <maven.javadoc.skip>true</maven.javadoc.skip>
+        <source.skip>true</source.skip>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi.minifi</groupId>
+            <artifactId>minifi-provenance-reporting-task</artifactId>
+            <version>0.0.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-nar/src/main/resources/META-INF/NOTICE b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..be55e59
--- /dev/null
+++ b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,15 @@
+minifi-provenance-reporting-task-nar
+Copyright 2015-2016 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+************************
+Common Development and Distribution License 1.1
+************************
+
+The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details.
+
+    (CDDL 1.1) (GPL2 w/ CPE) JSON Processing API (javax.json:javax.json-api:jar:1.0 - http://json-processing-spec.java.net)
+    (CDDL 1.1) (GPL2 w/ CPE) JSON Processing Default Provider (org.glassfish:javax.json:jar:1.0.4 - https://jsonp.java.net)
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/pom.xml b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/pom.xml
new file mode 100644
index 0000000..2eb157e
--- /dev/null
+++ b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/pom.xml
@@ -0,0 +1,82 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi.minifi</groupId>
+        <artifactId>minifi-provenance-reporting-bundle</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>minifi-provenance-reporting-task</artifactId>
+    <description>Publishes MiNiFi metrics to NiFi via S2S</description>
+    <version>0.0.1-SNAPSHOT</version>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-processor-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-ssl-context-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-site-to-site-client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish</groupId>
+            <artifactId>javax.json</artifactId>
+            <version>1.0.4</version>
+        </dependency>
+        <dependency>
+            <groupId>javax.json</groupId>
+            <artifactId>javax.json-api</artifactId>
+            <version>1.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-data-provenance-utils</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/java/org/apache/nifi/minifi/provenance/reporting/ProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/java/org/apache/nifi/minifi/provenance/reporting/ProvenanceReportingTask.java b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/java/org/apache/nifi/minifi/provenance/reporting/ProvenanceReportingTask.java
new file mode 100644
index 0000000..8ed5dee
--- /dev/null
+++ b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/java/org/apache/nifi/minifi/provenance/reporting/ProvenanceReportingTask.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.provenance.reporting;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import javax.json.Json;
+import javax.json.JsonArray;
+import javax.json.JsonArrayBuilder;
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+import javax.json.JsonObjectBuilder;
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.PortStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.SSLContextService.ClientAuth;
+
+@Tags({"provenance", "lineage", "tracking", "site", "site to site"})
+@CapabilityDescription("Publishes Provenance events using the Site To Site protocol.")
+@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last event Id so that on restart of MiNiFi the task knows where it left off.")
+public class ProvenanceReportingTask extends AbstractReportingTask {
+    private static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+    private static final String LAST_EVENT_ID_KEY = "last_event_id";
+
+    static final PropertyDescriptor DESTINATION_URL = new PropertyDescriptor.Builder()
+        .name("Destination URL")
+        .description("The URL to post the Provenance Events to.")
+        .required(true)
+        .expressionLanguageSupported(true)
+        .addValidator(StandardValidators.URL_VALIDATOR)
+        .build();
+    static final PropertyDescriptor PORT_NAME = new PropertyDescriptor.Builder()
+        .name("Input Port Name")
+        .description("The name of the Input Port to delivery Provenance Events to.")
+        .required(true)
+        .expressionLanguageSupported(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+    static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder()
+        .name("SSL Context Service")
+        .description("The SSL Context Service to use when communicating with the destination. If not specified, communications will not be secure.")
+        .required(false)
+        .identifiesControllerService(SSLContextService.class)
+        .build();
+    static final PropertyDescriptor MINIFI_URL = new PropertyDescriptor.Builder()
+        .name("MiNiFi URL")
+        .description("The URL of this MiNiFi instance. This is used to include the Content URI to send to the destination.")
+        .required(true)
+        .expressionLanguageSupported(true)
+        .defaultValue("http://${hostname(true)}:8080/nifi")
+        .addValidator(new NiFiUrlValidator())
+        .build();
+    static final PropertyDescriptor COMPRESS = new PropertyDescriptor.Builder()
+        .name("Compress Events")
+        .description("Indicates whether or not to compress the events when being sent.")
+        .required(true)
+        .allowableValues("true", "false")
+        .defaultValue("true")
+        .build();
+    static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
+        .name("Communications Timeout")
+        .description("Specifies how long to wait to a response from the destination before deciding that an error has occurred and canceling the transaction")
+        .required(true)
+        .defaultValue("30 secs")
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .build();
+    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+        .name("Batch Size")
+        .description("Specifies how many records to send in a single batch, at most.")
+        .required(true)
+        .defaultValue("1000")
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .build();
+
+    private volatile long firstEventId = -1L;
+    private volatile SiteToSiteClient siteToSiteClient;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(DESTINATION_URL);
+        properties.add(PORT_NAME);
+        properties.add(SSL_CONTEXT);
+        properties.add(MINIFI_URL);
+        properties.add(COMPRESS);
+        properties.add(TIMEOUT);
+        properties.add(BATCH_SIZE);
+        return properties;
+    }
+
+    @OnScheduled
+    public void setup(final ConfigurationContext context) throws IOException {
+        final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
+        final SSLContext sslContext = sslContextService == null ? null : sslContextService.createSSLContext(ClientAuth.REQUIRED);
+        final EventReporter eventReporter = new EventReporter() {
+            @Override
+            public void reportEvent(final Severity severity, final String category, final String message) {
+                switch (severity) {
+                    case WARNING:
+                        getLogger().warn(message);
+                        break;
+                    case ERROR:
+                        getLogger().error(message);
+                        break;
+                    default:
+                        break;
+                }
+            }
+        };
+
+        final String destinationUrlPrefix = context.getProperty(DESTINATION_URL).evaluateAttributeExpressions().getValue();
+        final String destinationUrl = destinationUrlPrefix + (destinationUrlPrefix.endsWith("/") ? "nifi" : "/nifi");
+
+        siteToSiteClient = new SiteToSiteClient.Builder()
+            .url(destinationUrl)
+            .portName(context.getProperty(PORT_NAME).getValue())
+            .useCompression(context.getProperty(COMPRESS).asBoolean())
+            .eventReporter(eventReporter)
+            .sslContext(sslContext)
+            .timeout(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)
+            .build();
+    }
+
+    @OnStopped
+    public void shutdown() throws IOException {
+        final SiteToSiteClient client = getClient();
+        if (client != null) {
+            client.close();
+        }
+    }
+
+    // this getter is intended explicitly for testing purposes
+    protected SiteToSiteClient getClient() {
+        return this.siteToSiteClient;
+    }
+
+    private String getComponentName(final ProcessGroupStatus status, final ProvenanceEventRecord event) {
+        if (status == null) {
+            return null;
+        }
+
+        final String componentId = event.getComponentId();
+        if (status.getId().equals(componentId)) {
+            return status.getName();
+        }
+
+        for (final ProcessorStatus procStatus : status.getProcessorStatus()) {
+            if (procStatus.getId().equals(componentId)) {
+                return procStatus.getName();
+            }
+        }
+
+        for (final PortStatus portStatus : status.getInputPortStatus()) {
+            if (portStatus.getId().equals(componentId)) {
+                return portStatus.getName();
+            }
+        }
+
+        for (final PortStatus portStatus : status.getOutputPortStatus()) {
+            if (portStatus.getId().equals(componentId)) {
+                return portStatus.getName();
+            }
+        }
+
+        for (final RemoteProcessGroupStatus rpgStatus : status.getRemoteProcessGroupStatus()) {
+            if (rpgStatus.getId().equals(componentId)) {
+                return rpgStatus.getName();
+            }
+        }
+
+        for (final ProcessGroupStatus childGroup : status.getProcessGroupStatus()) {
+            final String componentName = getComponentName(childGroup, event);
+            if (componentName != null) {
+                return componentName;
+            }
+        }
+
+        return null;
+    }
+
+    @Override
+    public void onTrigger(final ReportingContext context) {
+        final ProcessGroupStatus procGroupStatus = context.getEventAccess().getControllerStatus();
+        final String rootGroupName = procGroupStatus == null ? null : procGroupStatus.getName();
+
+        Long currMaxId = context.getEventAccess().getProvenanceRepository().getMaxEventId();
+
+        if (firstEventId < 0) {
+            Map<String, String> state;
+            try {
+                state = context.getStateManager().getState(Scope.LOCAL).toMap();
+            } catch (IOException e) {
+                getLogger().error("Failed to get state at start up due to {}", e);
+                return;
+            }
+            if (state.containsKey(LAST_EVENT_ID_KEY)) {
+                firstEventId = Long.parseLong(state.get(LAST_EVENT_ID_KEY)) + 1;
+            }
+
+            if(currMaxId < firstEventId){
+                getLogger().debug("Current provenance max id is {} which is less than what was stored in state as the last queried event, which was {}. This means the provenance restarted its " +
+                        "ids. Restarting querying from the beginning.", new Object[]{currMaxId, firstEventId});
+                firstEventId = -1;
+            }
+        }
+
+        if (currMaxId == (firstEventId - 1)) {
+            getLogger().debug("No events to send due to the current max id being equal to the last id that was queried.");
+            return;
+        }
+
+        final List<ProvenanceEventRecord> events;
+        try {
+            events = context.getEventAccess().getProvenanceEvents(firstEventId, context.getProperty(BATCH_SIZE).asInteger());
+        } catch (final IOException ioe) {
+            getLogger().error("Failed to retrieve Provenance Events from repository due to {}", ioe);
+            return;
+        }
+
+        if (events == null || events.isEmpty()) {
+            getLogger().debug("No events to send due to 'events' being null or empty.");
+            return;
+        }
+
+        final long start = System.nanoTime();
+        final Map<String, ?> config = Collections.emptyMap();
+        final JsonBuilderFactory factory = Json.createBuilderFactory(config);
+        final JsonObjectBuilder builder = factory.createObjectBuilder();
+
+        final String nifiUrl = context.getProperty(MINIFI_URL).evaluateAttributeExpressions().getValue();
+        URL url;
+        try {
+            url = new URL(nifiUrl);
+        } catch (final MalformedURLException e1) {
+            // already validated
+            throw new AssertionError();
+        }
+
+        final String hostname = url.getHost();
+
+        final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
+        for (final ProvenanceEventRecord event : events) {
+            arrayBuilder.add(serialize(factory, builder, event, getComponentName(procGroupStatus, event), hostname, url, rootGroupName));
+        }
+        final JsonArray jsonArray = arrayBuilder.build();
+
+        try {
+            final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
+            if (transaction == null) {
+                getLogger().debug("All destination nodes are penalized; will attempt to send data later");
+                return;
+            }
+
+            final Map<String, String> attributes = new HashMap<>();
+            final String transactionId = UUID.randomUUID().toString();
+            attributes.put("reporting.task.transaction.id", transactionId);
+
+            final byte[] data = jsonArray.toString().getBytes(StandardCharsets.UTF_8);
+            transaction.send(data, attributes);
+            transaction.confirm();
+            transaction.complete();
+
+            final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+            getLogger().info("Successfully sent {} Provenance Events to destination in {} ms; Transaction ID = {}; First Event ID = {}",
+                new Object[] {events.size(), transferMillis, transactionId, events.get(0).getEventId()});
+        } catch (final IOException e) {
+            throw new ProcessException("Failed to send Provenance Events to destination due to IOException", e);
+        }
+
+        final ProvenanceEventRecord lastEvent = events.get(events.size() - 1);
+        final String lastEventId = String.valueOf(lastEvent.getEventId());
+        try {
+            StateManager stateManager = context.getStateManager();
+            StateMap stateMap = stateManager.getState(Scope.LOCAL);
+            Map<String, String> newMapOfState = new HashMap<>();
+            newMapOfState.put(LAST_EVENT_ID_KEY, lastEventId);
+            stateManager.replace(stateMap, newMapOfState, Scope.LOCAL);
+        } catch (final IOException ioe) {
+            getLogger().error("Failed to update state to {} due to {}; this could result in events being re-sent after a restart of MiNiFi",
+                new Object[] {lastEventId, ioe}, ioe);
+        }
+
+        firstEventId = lastEvent.getEventId() + 1;
+    }
+
+    static JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final ProvenanceEventRecord event,
+        final String componentName, final String hostname, final URL nifiUrl, final String applicationName) {
+        addField(builder, "eventId", UUID.randomUUID().toString());
+        addField(builder, "eventOrdinal", event.getEventId());
+        addField(builder, "eventType", event.getEventType().name());
+        addField(builder, "timestampMillis", event.getEventTime());
+
+        final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
+        df.setTimeZone(TimeZone.getTimeZone("Z"));
+        addField(builder, "timestamp", df.format(event.getEventTime()));
+
+        addField(builder, "durationMillis", event.getEventDuration());
+        addField(builder, "lineageStart", event.getLineageStartDate());
+
+        final Set<String> lineageIdentifiers = new HashSet<>();
+        if (event.getLineageIdentifiers() != null) {
+            lineageIdentifiers.addAll(event.getLineageIdentifiers());
+        }
+        lineageIdentifiers.add(event.getFlowFileUuid());
+        addField(builder, factory, "lineageIdentifiers", lineageIdentifiers);
+        addField(builder, "details", event.getDetails());
+        addField(builder, "componentId", event.getComponentId());
+        addField(builder, "componentType", event.getComponentType());
+        addField(builder, "componentName", componentName);
+        addField(builder, "entityId", event.getFlowFileUuid());
+        addField(builder, "entityType", "org.apache.nifi.flowfile.FlowFile");
+        addField(builder, "entitySize", event.getFileSize());
+        addField(builder, "previousEntitySize", event.getPreviousFileSize());
+        addField(builder, factory, "updatedAttributes", event.getUpdatedAttributes());
+        addField(builder, factory, "previousAttributes", event.getPreviousAttributes());
+
+        addField(builder, "actorHostname", hostname);
+        if (nifiUrl != null) {
+            final String urlPrefix = nifiUrl.toString().replace(nifiUrl.getPath(), "");
+            final String contentUriBase = urlPrefix + "/nifi-api/controller/provenance/events/" + event.getEventId() + "/content/";
+            addField(builder, "contentURI", contentUriBase + "output");
+            addField(builder, "previousContentURI", contentUriBase + "input");
+        }
+
+        addField(builder, factory, "parentIds", event.getParentUuids());
+        addField(builder, factory, "childIds", event.getChildUuids());
+        addField(builder, "transitUri", event.getTransitUri());
+        addField(builder, "remoteIdentifier", event.getSourceSystemFlowFileIdentifier());
+        addField(builder, "alternateIdentifier", event.getAlternateIdentifierUri());
+        addField(builder, "platform", "minifi");
+        addField(builder, "application", applicationName);
+
+        return builder.build();
+    }
+
+    private static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Map<String, String> values) {
+        if (values == null) {
+            return;
+        }
+
+        final JsonObjectBuilder mapBuilder = factory.createObjectBuilder();
+        for (final Map.Entry<String, String> entry : values.entrySet()) {
+            if (entry.getKey() == null || entry.getValue() == null) {
+                continue;
+            }
+
+            mapBuilder.add(entry.getKey(), entry.getValue());
+        }
+
+        builder.add(key, mapBuilder);
+    }
+
+    private static void addField(final JsonObjectBuilder builder, final String key, final Long value) {
+        if (value != null) {
+            builder.add(key, value.longValue());
+        }
+    }
+
+    private static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Collection<String> values) {
+        if (values == null) {
+            return;
+        }
+
+        builder.add(key, createJsonArray(factory, values));
+    }
+
+    private static void addField(final JsonObjectBuilder builder, final String key, final String value) {
+        if (value == null) {
+            return;
+        }
+
+        builder.add(key, value);
+    }
+
+    private static JsonArrayBuilder createJsonArray(JsonBuilderFactory factory, final Collection<String> values) {
+        final JsonArrayBuilder builder = factory.createArrayBuilder();
+        for (final String value : values) {
+            if (value != null) {
+                builder.add(value);
+            }
+        }
+        return builder;
+    }
+
+
+    private static class NiFiUrlValidator implements Validator {
+        @Override
+        public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+            final String value = context.newPropertyValue(input).evaluateAttributeExpressions().getValue();
+            try {
+                new URL(value);
+            } catch (final Exception e) {
+                return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Not a valid URL").build();
+            }
+
+            return new ValidationResult.Builder().input(input).subject(subject).valid(true).build();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
new file mode 100644
index 0000000..331d759
--- /dev/null
+++ b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.nifi.minifi.provenance.reporting.ProvenanceReportingTask
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/test/java/org/apache/nifi/minifi/provenance/reporting/TestProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/test/java/org/apache/nifi/minifi/provenance/reporting/TestProvenanceReportingTask.java b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/test/java/org/apache/nifi/minifi/provenance/reporting/TestProvenanceReportingTask.java
new file mode 100644
index 0000000..97291c0
--- /dev/null
+++ b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/test/java/org/apache/nifi/minifi/provenance/reporting/TestProvenanceReportingTask.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.provenance.reporting;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.provenance.ProvenanceEventBuilder;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.ReportingInitializationContext;
+import org.apache.nifi.state.MockStateManager;
+import org.apache.nifi.stream.io.ByteArrayInputStream;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockPropertyValue;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import javax.json.Json;
+import javax.json.JsonObject;
+import javax.json.JsonReader;
+
+public class TestProvenanceReportingTask {
+
+    @Test
+    public void testSerializedForm() throws IOException, InitializationException {
+        final String uuid = "10000000-0000-0000-0000-000000000000";
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("abc", "xyz");
+        attributes.put("xyz", "abc");
+        attributes.put("filename", "file-" + uuid);
+
+        final Map<String, String> prevAttrs = new HashMap<>();
+        attributes.put("filename", "1234.xyz");
+
+        final Set<String> lineageIdentifiers = new HashSet<>();
+        lineageIdentifiers.add("123");
+        lineageIdentifiers.add("321");
+
+        final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        attributes.put("uuid", uuid);
+        builder.fromFlowFile(createFlowFile(3L, attributes));
+        builder.setAttributes(prevAttrs, attributes);
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+        builder.setLineageIdentifiers(lineageIdentifiers);
+        final ProvenanceEventRecord event = builder.build();
+
+        final List<byte[]> dataSent = new ArrayList<>();
+        final ProvenanceReportingTask task = new ProvenanceReportingTask() {
+            @SuppressWarnings("unchecked")
+            @Override
+            protected SiteToSiteClient getClient() {
+                final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class);
+                final Transaction transaction = Mockito.mock(Transaction.class);
+
+                try {
+                    Mockito.doAnswer(new Answer<Object>() {
+                        @Override
+                        public Object answer(final InvocationOnMock invocation) throws Throwable {
+                            final byte[] data = invocation.getArgumentAt(0, byte[].class);
+                            dataSent.add(data);
+                            return null;
+                        }
+                    }).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class));
+
+                    Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
+                } catch (final Exception e) {
+                    e.printStackTrace();
+                    Assert.fail(e.toString());
+                }
+
+                return client;
+            }
+        };
+
+        final List<ProvenanceEventRecord> events = new ArrayList<>();
+        events.add(event);
+
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) {
+            properties.put(descriptor, descriptor.getDefaultValue());
+        }
+        properties.put(ProvenanceReportingTask.BATCH_SIZE, "1000");
+
+        final ReportingContext context = Mockito.mock(ReportingContext.class);
+        Mockito.when(context.getStateManager())
+                .thenReturn(new MockStateManager(task));
+        Mockito.doAnswer(new Answer<PropertyValue>() {
+            @Override
+            public PropertyValue answer(final InvocationOnMock invocation) throws Throwable {
+                final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class);
+                return new MockPropertyValue(properties.get(descriptor), null);
+            }
+        }).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
+
+        final EventAccess eventAccess = Mockito.mock(EventAccess.class);
+        Mockito.doAnswer(new Answer<List<ProvenanceEventRecord>>() {
+            @Override
+            public List<ProvenanceEventRecord> answer(final InvocationOnMock invocation) throws Throwable {
+                final long startId = invocation.getArgumentAt(0, long.class);
+                final int maxRecords = invocation.getArgumentAt(1, int.class);
+
+                final List<ProvenanceEventRecord> eventsToReturn = new ArrayList<>();
+                for (int i = (int) Math.max(0, startId); i < (int) (startId + maxRecords) && i < events.size(); i++) {
+                    eventsToReturn.add(events.get(i));
+                }
+                return eventsToReturn;
+            }
+        }).when(eventAccess).getProvenanceEvents(Mockito.anyLong(), Mockito.anyInt());
+
+        final ProvenanceEventRepository provenanceRepository = Mockito.mock(ProvenanceEventRepository.class);
+        Mockito.doAnswer(new Answer<Long>() {
+            @Override
+            public Long answer(final InvocationOnMock invocation) throws Throwable {
+                return 1L;
+            }
+        }).when(provenanceRepository).getMaxEventId();
+
+        Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
+        Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository);
+
+        final ComponentLog logger = Mockito.mock(ComponentLog.class);
+        final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class);
+        Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
+        Mockito.when(initContext.getLogger()).thenReturn(logger);
+
+
+        task.initialize(initContext);
+        task.onTrigger(context);
+
+        assertEquals(1, dataSent.size());
+        final String msg = new String(dataSent.get(0), StandardCharsets.UTF_8);
+        JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
+        JsonObject msgArray = jsonReader.readArray().getJsonObject(0).getJsonObject("updatedAttributes");
+        assertEquals(msgArray.getString("abc"), events.get(0).getAttributes().get("abc"));
+    }
+
+    public static FlowFile createFlowFile(final long id, final Map<String, String> attributes) {
+        MockFlowFile mockFlowFile = new MockFlowFile(id);
+        mockFlowFile.putAttributes(attributes);
+        return mockFlowFile;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/minifi-nar-bundles/minifi-provenance-reporting-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-provenance-reporting-bundle/pom.xml b/minifi-nar-bundles/minifi-provenance-reporting-bundle/pom.xml
new file mode 100644
index 0000000..0d34004
--- /dev/null
+++ b/minifi-nar-bundles/minifi-provenance-reporting-bundle/pom.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi.minifi</groupId>
+        <artifactId>minifi-nar-bundles</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>minifi-provenance-reporting-bundle</artifactId>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>minifi-provenance-reporting-task</module>
+        <module>minifi-provenance-reporting-nar</module>
+    </modules>
+    
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.glassfish.jersey.core</groupId>
+                <artifactId>jersey-client</artifactId>
+                <version>2.19</version>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/minifi-nar-bundles/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/pom.xml b/minifi-nar-bundles/pom.xml
index aeaad02..79a8414 100644
--- a/minifi-nar-bundles/pom.xml
+++ b/minifi-nar-bundles/pom.xml
@@ -20,10 +20,11 @@
         <artifactId>minifi</artifactId>
         <version>0.0.1-SNAPSHOT</version>
     </parent>
-    <groupId>org.apache.nifi</groupId>
+    <groupId>org.apache.nifi.minifi</groupId>
     <artifactId>minifi-nar-bundles</artifactId>
     <packaging>pom</packaging>
     <modules>
         <module>minifi-framework-bundle</module>
+        <module>minifi-provenance-reporting-bundle</module>
     </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2c8a881..cdd8357 100644
--- a/pom.xml
+++ b/pom.xml
@@ -104,7 +104,7 @@ limitations under the License.
         </dependency>
         <dependency>
             <groupId>org.mockito</groupId>
-            <artifactId>mockito-core</artifactId>
+            <artifactId>mockito-all</artifactId>
             <scope>test</scope>
         </dependency>
         <dependency>
@@ -345,6 +345,11 @@ limitations under the License.
                 <artifactId>nifi-write-ahead-log</artifactId>
                 <version>${org.apache.nifi.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-ssl-context-service-api</artifactId>
+                <version>${org.apache.nifi.version}</version>
+            </dependency>
 
             <!-- Test Dependencies -->
             <dependency>
@@ -354,11 +359,6 @@ limitations under the License.
             </dependency>
             <dependency>
                 <groupId>org.mockito</groupId>
-                <artifactId>mockito-core</artifactId>
-                <version>1.10.19</version>
-            </dependency>
-            <dependency>
-                <groupId>org.mockito</groupId>
                 <artifactId>mockito-all</artifactId>
                 <version>1.10.19</version>
                 <scope>test</scope>
@@ -368,6 +368,7 @@ limitations under the License.
                 <artifactId>slf4j-simple</artifactId>
                 <version>${org.slf4j.version}</version>
             </dependency>
+
         </dependencies>
     </dependencyManagement>