You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/04/12 23:20:06 UTC
nifi-minifi git commit: MINIFI-13 created a provenance reporting task
to send provenance information via S2S
Repository: nifi-minifi
Updated Branches:
refs/heads/master 59f2d4418 -> 61e3a925d
MINIFI-13 created a provenance reporting task to send provenance information via S2S
This closes #6
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi/commit/61e3a925
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi/tree/61e3a925
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi/diff/61e3a925
Branch: refs/heads/master
Commit: 61e3a925d3af52da04a2866a298bb88264b20c5b
Parents: 59f2d44
Author: Joseph Percivall <jo...@yahoo.com>
Authored: Fri Apr 8 13:17:33 2016 -0400
Committer: Joseph Percivall <jo...@yahoo.com>
Committed: Tue Apr 12 17:03:59 2016 -0400
----------------------------------------------------------------------
minifi-assembly/pom.xml | 6 +
.../minifi-framework-bundle/pom.xml | 2 +-
.../minifi-provenance-reporting-nar/pom.xml | 41 ++
.../src/main/resources/META-INF/NOTICE | 15 +
.../minifi-provenance-reporting-task/pom.xml | 82 ++++
.../reporting/ProvenanceReportingTask.java | 457 +++++++++++++++++++
.../org.apache.nifi.reporting.ReportingTask | 16 +
.../reporting/TestProvenanceReportingTask.java | 186 ++++++++
.../minifi-provenance-reporting-bundle/pom.xml | 41 ++
minifi-nar-bundles/pom.xml | 3 +-
pom.xml | 13 +-
11 files changed, 854 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/minifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-assembly/pom.xml b/minifi-assembly/pom.xml
index b076459..8dd223a 100644
--- a/minifi-assembly/pom.xml
+++ b/minifi-assembly/pom.xml
@@ -105,6 +105,12 @@ limitations under the License.
<artifactId>minifi-runtime</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi.minifi</groupId>
+ <artifactId>minifi-provenance-reporting-nar</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
<!-- MiNiFi NiFi Dependencies -->
<dependency>
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/minifi-nar-bundles/minifi-framework-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/pom.xml b/minifi-nar-bundles/minifi-framework-bundle/pom.xml
index 7fbd6b7..dca3a5c 100644
--- a/minifi-nar-bundles/minifi-framework-bundle/pom.xml
+++ b/minifi-nar-bundles/minifi-framework-bundle/pom.xml
@@ -20,7 +20,7 @@ limitations under the License.
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>minifi-nar-bundles</artifactId>
- <groupId>org.apache.nifi</groupId>
+ <groupId>org.apache.nifi.minifi</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-nar/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-nar/pom.xml b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-nar/pom.xml
new file mode 100644
index 0000000..be74790
--- /dev/null
+++ b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-nar/pom.xml
@@ -0,0 +1,41 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi.minifi</groupId>
+ <artifactId>minifi-provenance-reporting-bundle</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>minifi-provenance-reporting-nar</artifactId>
+ <packaging>nar</packaging>
+ <properties>
+ <maven.javadoc.skip>true</maven.javadoc.skip>
+ <source.skip>true</source.skip>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi.minifi</groupId>
+ <artifactId>minifi-provenance-reporting-task</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-standard-services-api-nar</artifactId>
+ <type>nar</type>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-nar/src/main/resources/META-INF/NOTICE b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..be55e59
--- /dev/null
+++ b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,15 @@
+minifi-provenance-reporting-task-nar
+Copyright 2015-2016 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+************************
+Common Development and Distribution License 1.1
+************************
+
+The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details.
+
+ (CDDL 1.1) (GPL2 w/ CPE) JSON Processing API (javax.json:javax.json-api:jar:1.0 - http://json-processing-spec.java.net)
+ (CDDL 1.1) (GPL2 w/ CPE) JSON Processing Default Provider (org.glassfish:javax.json:jar:1.0.4 - https://jsonp.java.net)
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/pom.xml b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/pom.xml
new file mode 100644
index 0000000..2eb157e
--- /dev/null
+++ b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/pom.xml
@@ -0,0 +1,82 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi.minifi</groupId>
+ <artifactId>minifi-provenance-reporting-bundle</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>minifi-provenance-reporting-task</artifactId>
+ <description>Publishes MiNiFi metrics to NiFi via S2S</description>
+ <version>0.0.1-SNAPSHOT</version>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-processor-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-ssl-context-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-site-to-site-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish</groupId>
+ <artifactId>javax.json</artifactId>
+ <version>1.0.4</version>
+ </dependency>
+ <dependency>
+ <groupId>javax.json</groupId>
+ <artifactId>javax.json-api</artifactId>
+ <version>1.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-data-provenance-utils</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/java/org/apache/nifi/minifi/provenance/reporting/ProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/java/org/apache/nifi/minifi/provenance/reporting/ProvenanceReportingTask.java b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/java/org/apache/nifi/minifi/provenance/reporting/ProvenanceReportingTask.java
new file mode 100644
index 0000000..8ed5dee
--- /dev/null
+++ b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/java/org/apache/nifi/minifi/provenance/reporting/ProvenanceReportingTask.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.provenance.reporting;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import javax.json.Json;
+import javax.json.JsonArray;
+import javax.json.JsonArrayBuilder;
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+import javax.json.JsonObjectBuilder;
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.PortStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.SSLContextService.ClientAuth;
+
+@Tags({"provenance", "lineage", "tracking", "site", "site to site"})
+@CapabilityDescription("Publishes Provenance events using the Site To Site protocol.")
+@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last event Id so that on restart of MiNiFi the task knows where it left off.")
+public class ProvenanceReportingTask extends AbstractReportingTask {
+ private static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+ private static final String LAST_EVENT_ID_KEY = "last_event_id";
+
+ static final PropertyDescriptor DESTINATION_URL = new PropertyDescriptor.Builder()
+ .name("Destination URL")
+ .description("The URL to post the Provenance Events to.")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.URL_VALIDATOR)
+ .build();
+ static final PropertyDescriptor PORT_NAME = new PropertyDescriptor.Builder()
+ .name("Input Port Name")
+ .description("The name of the Input Port to delivery Provenance Events to.")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder()
+ .name("SSL Context Service")
+ .description("The SSL Context Service to use when communicating with the destination. If not specified, communications will not be secure.")
+ .required(false)
+ .identifiesControllerService(SSLContextService.class)
+ .build();
+ static final PropertyDescriptor MINIFI_URL = new PropertyDescriptor.Builder()
+ .name("MiNiFi URL")
+ .description("The URL of this MiNiFi instance. This is used to include the Content URI to send to the destination.")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .defaultValue("http://${hostname(true)}:8080/nifi")
+ .addValidator(new NiFiUrlValidator())
+ .build();
+ static final PropertyDescriptor COMPRESS = new PropertyDescriptor.Builder()
+ .name("Compress Events")
+ .description("Indicates whether or not to compress the events when being sent.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
+ static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
+ .name("Communications Timeout")
+ .description("Specifies how long to wait to a response from the destination before deciding that an error has occurred and canceling the transaction")
+ .required(true)
+ .defaultValue("30 secs")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+ static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("Batch Size")
+ .description("Specifies how many records to send in a single batch, at most.")
+ .required(true)
+ .defaultValue("1000")
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+
+ private volatile long firstEventId = -1L;
+ private volatile SiteToSiteClient siteToSiteClient;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(DESTINATION_URL);
+ properties.add(PORT_NAME);
+ properties.add(SSL_CONTEXT);
+ properties.add(MINIFI_URL);
+ properties.add(COMPRESS);
+ properties.add(TIMEOUT);
+ properties.add(BATCH_SIZE);
+ return properties;
+ }
+
+ @OnScheduled
+ public void setup(final ConfigurationContext context) throws IOException {
+ final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
+ final SSLContext sslContext = sslContextService == null ? null : sslContextService.createSSLContext(ClientAuth.REQUIRED);
+ final EventReporter eventReporter = new EventReporter() {
+ @Override
+ public void reportEvent(final Severity severity, final String category, final String message) {
+ switch (severity) {
+ case WARNING:
+ getLogger().warn(message);
+ break;
+ case ERROR:
+ getLogger().error(message);
+ break;
+ default:
+ break;
+ }
+ }
+ };
+
+ final String destinationUrlPrefix = context.getProperty(DESTINATION_URL).evaluateAttributeExpressions().getValue();
+ final String destinationUrl = destinationUrlPrefix + (destinationUrlPrefix.endsWith("/") ? "nifi" : "/nifi");
+
+ siteToSiteClient = new SiteToSiteClient.Builder()
+ .url(destinationUrl)
+ .portName(context.getProperty(PORT_NAME).getValue())
+ .useCompression(context.getProperty(COMPRESS).asBoolean())
+ .eventReporter(eventReporter)
+ .sslContext(sslContext)
+ .timeout(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)
+ .build();
+ }
+
+ @OnStopped
+ public void shutdown() throws IOException {
+ final SiteToSiteClient client = getClient();
+ if (client != null) {
+ client.close();
+ }
+ }
+
+ // this getter is intended explicitly for testing purposes
+ protected SiteToSiteClient getClient() {
+ return this.siteToSiteClient;
+ }
+
+ private String getComponentName(final ProcessGroupStatus status, final ProvenanceEventRecord event) {
+ if (status == null) {
+ return null;
+ }
+
+ final String componentId = event.getComponentId();
+ if (status.getId().equals(componentId)) {
+ return status.getName();
+ }
+
+ for (final ProcessorStatus procStatus : status.getProcessorStatus()) {
+ if (procStatus.getId().equals(componentId)) {
+ return procStatus.getName();
+ }
+ }
+
+ for (final PortStatus portStatus : status.getInputPortStatus()) {
+ if (portStatus.getId().equals(componentId)) {
+ return portStatus.getName();
+ }
+ }
+
+ for (final PortStatus portStatus : status.getOutputPortStatus()) {
+ if (portStatus.getId().equals(componentId)) {
+ return portStatus.getName();
+ }
+ }
+
+ for (final RemoteProcessGroupStatus rpgStatus : status.getRemoteProcessGroupStatus()) {
+ if (rpgStatus.getId().equals(componentId)) {
+ return rpgStatus.getName();
+ }
+ }
+
+ for (final ProcessGroupStatus childGroup : status.getProcessGroupStatus()) {
+ final String componentName = getComponentName(childGroup, event);
+ if (componentName != null) {
+ return componentName;
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public void onTrigger(final ReportingContext context) {
+ final ProcessGroupStatus procGroupStatus = context.getEventAccess().getControllerStatus();
+ final String rootGroupName = procGroupStatus == null ? null : procGroupStatus.getName();
+
+ Long currMaxId = context.getEventAccess().getProvenanceRepository().getMaxEventId();
+
+ if (firstEventId < 0) {
+ Map<String, String> state;
+ try {
+ state = context.getStateManager().getState(Scope.LOCAL).toMap();
+ } catch (IOException e) {
+ getLogger().error("Failed to get state at start up due to {}", e);
+ return;
+ }
+ if (state.containsKey(LAST_EVENT_ID_KEY)) {
+ firstEventId = Long.parseLong(state.get(LAST_EVENT_ID_KEY)) + 1;
+ }
+
+ if(currMaxId < firstEventId){
+ getLogger().debug("Current provenance max id is {} which is less than what was stored in state as the last queried event, which was {}. This means the provenance restarted its " +
+ "ids. Restarting querying from the beginning.", new Object[]{currMaxId, firstEventId});
+ firstEventId = -1;
+ }
+ }
+
+ if (currMaxId == (firstEventId - 1)) {
+ getLogger().debug("No events to send due to the current max id being equal to the last id that was queried.");
+ return;
+ }
+
+ final List<ProvenanceEventRecord> events;
+ try {
+ events = context.getEventAccess().getProvenanceEvents(firstEventId, context.getProperty(BATCH_SIZE).asInteger());
+ } catch (final IOException ioe) {
+ getLogger().error("Failed to retrieve Provenance Events from repository due to {}", ioe);
+ return;
+ }
+
+ if (events == null || events.isEmpty()) {
+ getLogger().debug("No events to send due to 'events' being null or empty.");
+ return;
+ }
+
+ final long start = System.nanoTime();
+ final Map<String, ?> config = Collections.emptyMap();
+ final JsonBuilderFactory factory = Json.createBuilderFactory(config);
+ final JsonObjectBuilder builder = factory.createObjectBuilder();
+
+ final String nifiUrl = context.getProperty(MINIFI_URL).evaluateAttributeExpressions().getValue();
+ URL url;
+ try {
+ url = new URL(nifiUrl);
+ } catch (final MalformedURLException e1) {
+ // already validated
+ throw new AssertionError();
+ }
+
+ final String hostname = url.getHost();
+
+ final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
+ for (final ProvenanceEventRecord event : events) {
+ arrayBuilder.add(serialize(factory, builder, event, getComponentName(procGroupStatus, event), hostname, url, rootGroupName));
+ }
+ final JsonArray jsonArray = arrayBuilder.build();
+
+ try {
+ final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
+ if (transaction == null) {
+ getLogger().debug("All destination nodes are penalized; will attempt to send data later");
+ return;
+ }
+
+ final Map<String, String> attributes = new HashMap<>();
+ final String transactionId = UUID.randomUUID().toString();
+ attributes.put("reporting.task.transaction.id", transactionId);
+
+ final byte[] data = jsonArray.toString().getBytes(StandardCharsets.UTF_8);
+ transaction.send(data, attributes);
+ transaction.confirm();
+ transaction.complete();
+
+ final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ getLogger().info("Successfully sent {} Provenance Events to destination in {} ms; Transaction ID = {}; First Event ID = {}",
+ new Object[] {events.size(), transferMillis, transactionId, events.get(0).getEventId()});
+ } catch (final IOException e) {
+ throw new ProcessException("Failed to send Provenance Events to destination due to IOException", e);
+ }
+
+ final ProvenanceEventRecord lastEvent = events.get(events.size() - 1);
+ final String lastEventId = String.valueOf(lastEvent.getEventId());
+ try {
+ StateManager stateManager = context.getStateManager();
+ StateMap stateMap = stateManager.getState(Scope.LOCAL);
+ Map<String, String> newMapOfState = new HashMap<>();
+ newMapOfState.put(LAST_EVENT_ID_KEY, lastEventId);
+ stateManager.replace(stateMap, newMapOfState, Scope.LOCAL);
+ } catch (final IOException ioe) {
+ getLogger().error("Failed to update state to {} due to {}; this could result in events being re-sent after a restart of MiNiFi",
+ new Object[] {lastEventId, ioe}, ioe);
+ }
+
+ firstEventId = lastEvent.getEventId() + 1;
+ }
+
+ static JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final ProvenanceEventRecord event,
+ final String componentName, final String hostname, final URL nifiUrl, final String applicationName) {
+ addField(builder, "eventId", UUID.randomUUID().toString());
+ addField(builder, "eventOrdinal", event.getEventId());
+ addField(builder, "eventType", event.getEventType().name());
+ addField(builder, "timestampMillis", event.getEventTime());
+
+ final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
+ df.setTimeZone(TimeZone.getTimeZone("Z"));
+ addField(builder, "timestamp", df.format(event.getEventTime()));
+
+ addField(builder, "durationMillis", event.getEventDuration());
+ addField(builder, "lineageStart", event.getLineageStartDate());
+
+ final Set<String> lineageIdentifiers = new HashSet<>();
+ if (event.getLineageIdentifiers() != null) {
+ lineageIdentifiers.addAll(event.getLineageIdentifiers());
+ }
+ lineageIdentifiers.add(event.getFlowFileUuid());
+ addField(builder, factory, "lineageIdentifiers", lineageIdentifiers);
+ addField(builder, "details", event.getDetails());
+ addField(builder, "componentId", event.getComponentId());
+ addField(builder, "componentType", event.getComponentType());
+ addField(builder, "componentName", componentName);
+ addField(builder, "entityId", event.getFlowFileUuid());
+ addField(builder, "entityType", "org.apache.nifi.flowfile.FlowFile");
+ addField(builder, "entitySize", event.getFileSize());
+ addField(builder, "previousEntitySize", event.getPreviousFileSize());
+ addField(builder, factory, "updatedAttributes", event.getUpdatedAttributes());
+ addField(builder, factory, "previousAttributes", event.getPreviousAttributes());
+
+ addField(builder, "actorHostname", hostname);
+ if (nifiUrl != null) {
+ final String urlPrefix = nifiUrl.toString().replace(nifiUrl.getPath(), "");
+ final String contentUriBase = urlPrefix + "/nifi-api/controller/provenance/events/" + event.getEventId() + "/content/";
+ addField(builder, "contentURI", contentUriBase + "output");
+ addField(builder, "previousContentURI", contentUriBase + "input");
+ }
+
+ addField(builder, factory, "parentIds", event.getParentUuids());
+ addField(builder, factory, "childIds", event.getChildUuids());
+ addField(builder, "transitUri", event.getTransitUri());
+ addField(builder, "remoteIdentifier", event.getSourceSystemFlowFileIdentifier());
+ addField(builder, "alternateIdentifier", event.getAlternateIdentifierUri());
+ addField(builder, "platform", "minifi");
+ addField(builder, "application", applicationName);
+
+ return builder.build();
+ }
+
+ private static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Map<String, String> values) {
+ if (values == null) {
+ return;
+ }
+
+ final JsonObjectBuilder mapBuilder = factory.createObjectBuilder();
+ for (final Map.Entry<String, String> entry : values.entrySet()) {
+ if (entry.getKey() == null || entry.getValue() == null) {
+ continue;
+ }
+
+ mapBuilder.add(entry.getKey(), entry.getValue());
+ }
+
+ builder.add(key, mapBuilder);
+ }
+
+ private static void addField(final JsonObjectBuilder builder, final String key, final Long value) {
+ if (value != null) {
+ builder.add(key, value.longValue());
+ }
+ }
+
+ private static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Collection<String> values) {
+ if (values == null) {
+ return;
+ }
+
+ builder.add(key, createJsonArray(factory, values));
+ }
+
+ private static void addField(final JsonObjectBuilder builder, final String key, final String value) {
+ if (value == null) {
+ return;
+ }
+
+ builder.add(key, value);
+ }
+
+ private static JsonArrayBuilder createJsonArray(JsonBuilderFactory factory, final Collection<String> values) {
+ final JsonArrayBuilder builder = factory.createArrayBuilder();
+ for (final String value : values) {
+ if (value != null) {
+ builder.add(value);
+ }
+ }
+ return builder;
+ }
+
+
+ private static class NiFiUrlValidator implements Validator {
+ @Override
+ public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+ final String value = context.newPropertyValue(input).evaluateAttributeExpressions().getValue();
+ try {
+ new URL(value);
+ } catch (final Exception e) {
+ return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Not a valid URL").build();
+ }
+
+ return new ValidationResult.Builder().input(input).subject(subject).valid(true).build();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
new file mode 100644
index 0000000..331d759
--- /dev/null
+++ b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.nifi.minifi.provenance.reporting.ProvenanceReportingTask
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/test/java/org/apache/nifi/minifi/provenance/reporting/TestProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/test/java/org/apache/nifi/minifi/provenance/reporting/TestProvenanceReportingTask.java b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/test/java/org/apache/nifi/minifi/provenance/reporting/TestProvenanceReportingTask.java
new file mode 100644
index 0000000..97291c0
--- /dev/null
+++ b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/test/java/org/apache/nifi/minifi/provenance/reporting/TestProvenanceReportingTask.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.provenance.reporting;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.provenance.ProvenanceEventBuilder;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.ReportingInitializationContext;
+import org.apache.nifi.state.MockStateManager;
+import org.apache.nifi.stream.io.ByteArrayInputStream;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockPropertyValue;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import javax.json.Json;
+import javax.json.JsonObject;
+import javax.json.JsonReader;
+
+public class TestProvenanceReportingTask {
+
+ @Test
+ public void testSerializedForm() throws IOException, InitializationException {
+ final String uuid = "10000000-0000-0000-0000-000000000000";
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("abc", "xyz");
+ attributes.put("xyz", "abc");
+ attributes.put("filename", "file-" + uuid);
+
+ final Map<String, String> prevAttrs = new HashMap<>();
+ attributes.put("filename", "1234.xyz");
+
+ final Set<String> lineageIdentifiers = new HashSet<>();
+ lineageIdentifiers.add("123");
+ lineageIdentifiers.add("321");
+
+ final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+ builder.setEventTime(System.currentTimeMillis());
+ builder.setEventType(ProvenanceEventType.RECEIVE);
+ builder.setTransitUri("nifi://unit-test");
+ attributes.put("uuid", uuid);
+ builder.fromFlowFile(createFlowFile(3L, attributes));
+ builder.setAttributes(prevAttrs, attributes);
+ builder.setComponentId("1234");
+ builder.setComponentType("dummy processor");
+ builder.setLineageIdentifiers(lineageIdentifiers);
+ final ProvenanceEventRecord event = builder.build();
+
+ final List<byte[]> dataSent = new ArrayList<>();
+ final ProvenanceReportingTask task = new ProvenanceReportingTask() {
+ @SuppressWarnings("unchecked")
+ @Override
+ protected SiteToSiteClient getClient() {
+ final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class);
+ final Transaction transaction = Mockito.mock(Transaction.class);
+
+ try {
+ Mockito.doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable {
+ final byte[] data = invocation.getArgumentAt(0, byte[].class);
+ dataSent.add(data);
+ return null;
+ }
+ }).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class));
+
+ Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.toString());
+ }
+
+ return client;
+ }
+ };
+
+ final List<ProvenanceEventRecord> events = new ArrayList<>();
+ events.add(event);
+
+ final Map<PropertyDescriptor, String> properties = new HashMap<>();
+ for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) {
+ properties.put(descriptor, descriptor.getDefaultValue());
+ }
+ properties.put(ProvenanceReportingTask.BATCH_SIZE, "1000");
+
+ final ReportingContext context = Mockito.mock(ReportingContext.class);
+ Mockito.when(context.getStateManager())
+ .thenReturn(new MockStateManager(task));
+ Mockito.doAnswer(new Answer<PropertyValue>() {
+ @Override
+ public PropertyValue answer(final InvocationOnMock invocation) throws Throwable {
+ final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class);
+ return new MockPropertyValue(properties.get(descriptor), null);
+ }
+ }).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
+
+ final EventAccess eventAccess = Mockito.mock(EventAccess.class);
+ Mockito.doAnswer(new Answer<List<ProvenanceEventRecord>>() {
+ @Override
+ public List<ProvenanceEventRecord> answer(final InvocationOnMock invocation) throws Throwable {
+ final long startId = invocation.getArgumentAt(0, long.class);
+ final int maxRecords = invocation.getArgumentAt(1, int.class);
+
+ final List<ProvenanceEventRecord> eventsToReturn = new ArrayList<>();
+ for (int i = (int) Math.max(0, startId); i < (int) (startId + maxRecords) && i < events.size(); i++) {
+ eventsToReturn.add(events.get(i));
+ }
+ return eventsToReturn;
+ }
+ }).when(eventAccess).getProvenanceEvents(Mockito.anyLong(), Mockito.anyInt());
+
+ final ProvenanceEventRepository provenanceRepository = Mockito.mock(ProvenanceEventRepository.class);
+ Mockito.doAnswer(new Answer<Long>() {
+ @Override
+ public Long answer(final InvocationOnMock invocation) throws Throwable {
+ return 1L;
+ }
+ }).when(provenanceRepository).getMaxEventId();
+
+ Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
+ Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository);
+
+ final ComponentLog logger = Mockito.mock(ComponentLog.class);
+ final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class);
+ Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
+ Mockito.when(initContext.getLogger()).thenReturn(logger);
+
+
+ task.initialize(initContext);
+ task.onTrigger(context);
+
+ assertEquals(1, dataSent.size());
+ final String msg = new String(dataSent.get(0), StandardCharsets.UTF_8);
+ JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
+ JsonObject msgArray = jsonReader.readArray().getJsonObject(0).getJsonObject("updatedAttributes");
+ assertEquals(msgArray.getString("abc"), events.get(0).getAttributes().get("abc"));
+ }
+
+ public static FlowFile createFlowFile(final long id, final Map<String, String> attributes) {
+ MockFlowFile mockFlowFile = new MockFlowFile(id);
+ mockFlowFile.putAttributes(attributes);
+ return mockFlowFile;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/minifi-nar-bundles/minifi-provenance-reporting-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-provenance-reporting-bundle/pom.xml b/minifi-nar-bundles/minifi-provenance-reporting-bundle/pom.xml
new file mode 100644
index 0000000..0d34004
--- /dev/null
+++ b/minifi-nar-bundles/minifi-provenance-reporting-bundle/pom.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi.minifi</groupId>
+ <artifactId>minifi-nar-bundles</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>minifi-provenance-reporting-bundle</artifactId>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>minifi-provenance-reporting-task</module>
+ <module>minifi-provenance-reporting-nar</module>
+ </modules>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.glassfish.jersey.core</groupId>
+ <artifactId>jersey-client</artifactId>
+ <version>2.19</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+</project>
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/minifi-nar-bundles/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/pom.xml b/minifi-nar-bundles/pom.xml
index aeaad02..79a8414 100644
--- a/minifi-nar-bundles/pom.xml
+++ b/minifi-nar-bundles/pom.xml
@@ -20,10 +20,11 @@
<artifactId>minifi</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
- <groupId>org.apache.nifi</groupId>
+ <groupId>org.apache.nifi.minifi</groupId>
<artifactId>minifi-nar-bundles</artifactId>
<packaging>pom</packaging>
<modules>
<module>minifi-framework-bundle</module>
+ <module>minifi-provenance-reporting-bundle</module>
</modules>
</project>
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2c8a881..cdd8357 100644
--- a/pom.xml
+++ b/pom.xml
@@ -104,7 +104,7 @@ limitations under the License.
</dependency>
<dependency>
<groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
+ <artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
@@ -345,6 +345,11 @@ limitations under the License.
<artifactId>nifi-write-ahead-log</artifactId>
<version>${org.apache.nifi.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-ssl-context-service-api</artifactId>
+ <version>${org.apache.nifi.version}</version>
+ </dependency>
<!-- Test Dependencies -->
<dependency>
@@ -354,11 +359,6 @@ limitations under the License.
</dependency>
<dependency>
<groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- <version>1.10.19</version>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.10.19</version>
<scope>test</scope>
@@ -368,6 +368,7 @@ limitations under the License.
<artifactId>slf4j-simple</artifactId>
<version>${org.slf4j.version}</version>
</dependency>
+
</dependencies>
</dependencyManagement>