You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/05/27 02:06:14 UTC

nifi git commit: NIFI-3985: This closes #1864. Added 'Starting Position' property to SiteToSiteReportingTask; also added additionalDetails.html that explains the schema and updated the reporting task to stop publishing when the user clicks 'stops' instea

Repository: nifi
Updated Branches:
  refs/heads/master dd50745a9 -> 2b435cdfc


NIFI-3985: This closes #1864. Added 'Starting Position' property to SiteToSiteReportingTask; also added additionalDetails.html that explains the schema and updated the reporting task to stop publishing when the user clicks 'stops' instead of running indefinitely until the reporting task has caught up

Signed-off-by: joewitt <jo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2b435cdf
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2b435cdf
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2b435cdf

Branch: refs/heads/master
Commit: 2b435cdfc6fd0824d9eb5f2cf140a330c9f258ed
Parents: dd50745
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri May 26 10:59:47 2017 -0400
Committer: joewitt <jo...@apache.org>
Committed: Fri May 26 21:05:45 2017 -0500

----------------------------------------------------------------------
 .../SiteToSiteProvenanceReportingTask.java      | 54 ++++++++++++--
 .../additionalDetails.html                      | 77 ++++++++++++++++++++
 .../TestSiteToSiteProvenanceReportingTask.java  | 23 +++---
 3 files changed, 135 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/2b435cdf/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
index cae4d17..37b5070 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
@@ -23,6 +23,8 @@ import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateManager;
@@ -71,6 +73,11 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
     static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
     static final String LAST_EVENT_ID_KEY = "last_event_id";
 
+    static final AllowableValue BEGINNING_OF_STREAM = new AllowableValue("beginning-of-stream", "Beginning of Stream",
+        "Start reading provenance Events from the beginning of the stream (the oldest event first)");
+    static final AllowableValue END_OF_STREAM = new AllowableValue("end-of-stream", "End of Stream",
+        "Start reading provenance Events from the end of the stream, ignoring old events");
+
     static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
         .name("Platform")
         .displayName("Platform")
@@ -83,7 +90,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
 
     static final PropertyDescriptor FILTER_EVENT_TYPE = new PropertyDescriptor.Builder()
         .name("s2s-prov-task-event-filter")
-        .displayName("Event type")
+        .displayName("Event Type")
         .description("Comma-separated list of event types that will be used to filter the provenance events sent by the reporting task. "
                 + "Available event types are " + ProvenanceEventType.values() + ". If no filter is set, all the events are sent. If "
                         + "multiple filters are set, the filters are cumulative.")
@@ -93,7 +100,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
 
     static final PropertyDescriptor FILTER_COMPONENT_TYPE = new PropertyDescriptor.Builder()
         .name("s2s-prov-task-type-filter")
-        .displayName("Component type")
+        .displayName("Component Type")
         .description("Regular expression to filter the provenance events based on the component type. Only the events matching the regular "
                 + "expression will be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.")
         .required(false)
@@ -109,11 +116,21 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .build();
 
+    static final PropertyDescriptor START_POSITION = new PropertyDescriptor.Builder()
+        .name("start-position")
+        .displayName("Start Position")
+        .description("If the Reporting Task has never been run, or if its state has been reset by a user, specifies where in the stream of Provenance Events the Reporting Task should start")
+        .allowableValues(BEGINNING_OF_STREAM, END_OF_STREAM)
+        .defaultValue(BEGINNING_OF_STREAM.getValue())
+        .required(true)
+        .build();
+
     private volatile long firstEventId = -1L;
     private volatile boolean isFilteringEnabled = false;
     private volatile Pattern componentTypeRegex;
     private volatile List<ProvenanceEventType> eventTypes = new ArrayList<ProvenanceEventType>();
     private volatile List<String> componentIds = new ArrayList<String>();
+    private volatile boolean scheduled = false;
 
     @OnScheduled
     public void onScheduled(final ConfigurationContext context) throws IOException {
@@ -139,6 +156,17 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
 
         // set a boolean whether filtering will be applied or not
         isFilteringEnabled = componentTypeRegex != null || !eventTypes.isEmpty() || !componentIds.isEmpty();
+
+        scheduled = true;
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {
+        scheduled = false;
+    }
+
+    public boolean isScheduled() {
+        return scheduled;
     }
 
     @Override
@@ -148,6 +176,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
         properties.add(FILTER_EVENT_TYPE);
         properties.add(FILTER_COMPONENT_TYPE);
         properties.add(FILTER_COMPONENT_ID);
+        properties.add(START_POSITION);
         return properties;
     }
 
@@ -210,14 +239,27 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
                 getLogger().error("Failed to get state at start up due to:" + e.getMessage(), e);
                 return;
             }
+
+            final String startPositionValue = context.getProperty(START_POSITION).getValue();
+
             if (state.containsKey(LAST_EVENT_ID_KEY)) {
                 firstEventId = Long.parseLong(state.get(LAST_EVENT_ID_KEY)) + 1;
+            } else {
+                if (END_OF_STREAM.getValue().equals(startPositionValue)) {
+                    firstEventId = currMaxId;
+                }
             }
 
-            if(currMaxId < (firstEventId - 1)){
-                getLogger().warn("Current provenance max id is {} which is less than what was stored in state as the last queried event, which was {}. This means the provenance restarted its " +
+            if (currMaxId < (firstEventId - 1)) {
+                if (BEGINNING_OF_STREAM.getValue().equals(startPositionValue)) {
+                    getLogger().warn("Current provenance max id is {} which is less than what was stored in state as the last queried event, which was {}. This means the provenance restarted its " +
                         "ids. Restarting querying from the beginning.", new Object[]{currMaxId, firstEventId});
-                firstEventId = -1;
+                    firstEventId = -1;
+                } else {
+                    getLogger().warn("Current provenance max id is {} which is less than what was stored in state as the last queried event, which was {}. This means the provenance restarted its " +
+                        "ids. Restarting querying from the latest event in the Provenance Repository.", new Object[] {currMaxId, firstEventId});
+                    firstEventId = currMaxId;
+                }
             }
         }
 
@@ -258,7 +300,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
         final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
         df.setTimeZone(TimeZone.getTimeZone("Z"));
 
-        while (events != null && !events.isEmpty()) {
+        while (events != null && !events.isEmpty() && isScheduled()) {
             final long start = System.nanoTime();
 
             // Create a JSON array of all the events in the current batch

http://git-wip-us.apache.org/repos/asf/nifi/blob/2b435cdf/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html
new file mode 100644
index 0000000..7e8204c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html
@@ -0,0 +1,77 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>SiteToSiteProvenanceReportingTask</title>
+
+        <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
+    </head>
+
+    <body>
+    	<p>
+    		The Site-to-Site Provenance Reporting Task allows the user to publish all of the Provenance Events from a NiFi instance back to
+    		the same NiFi instance or another NiFi instance. This provides a great deal of power because it allows the user to make use of
+    		all of the different Processors that are available in NiFi in order to processor or distribute that data. When possible, it is
+    		advisable to send the Provenance data to a different NiFi instance than the one that this Reporting Task is running on, because
+    		when the data is received over Site-to-Site and processed, that in and of itself will generate Provenance events. As a result, there
+    		is a cycle that is created. However, the data is sent in batches (1,000 by default). This means that for each batch of Provenance events
+    		that are sent back to NiFi, the receiving NiFi will have to generate only a single event per component.
+    	</p>
+    	
+    	<p>
+    		When published to a NiFi instance, the Provenance data is sent as a JSON array. Quite often, it can be useful to work with this data using
+    		a schema. As such, the schema for this Provenance data can be defined as follows:
+    	</p>
+
+<pre>
+<code>
+{
+  "namespace": "nifi",
+  "name": "provenanceEvent",
+  "type": "record",
+  "fields": [
+    { "name": "eventId", "type": "string" },
+    { "name": "eventOrdinal", "type": "long" },
+    { "name": "eventType", "type": "string" },
+    { "name": "timestampMillis", "type": "long" },
+    { "name": "durationMillis", "type": "long" },
+    { "name": "lineageStart", "type": { "type": "long", "logicalType": "timestamp-millis" } },
+    { "name": "details", "type": "string" },
+    { "name": "componentId", "type": "string" },
+    { "name": "componentType", "type": "string" },
+    { "name": "entityId", "type": "string" },
+    { "name": "entityType", "type": "string" },
+    { "name": "entitySize", "type": ["null", "long"] },
+    { "name": "previousEntitySize", "type": ["null", "long"] },
+    { "name": "updatedAttributes", "type": { "type": "map", "values": "string" } },
+    { "name": "previousAttributes", "type": { "type": "map", "values": "string" } },
+    { "name": "actorHostname", "type": "string" },
+    { "name": "contentURI", "type": "string" },
+    { "name": "previousContentURI", "type": "string" },
+    { "name": "parentIds", "type": { "type": "array", "items": "string" } },
+    { "name": "childIds", "type": { "type": "array", "items": "string" } },
+    { "name": "platform", "type": "string" },
+    { "name": "application", "type": "string" },
+    { "name": "transitUri", "type": ["null", "string"] }
+  ]
+}
+</code>
+</pre>
+
+    	
+	</body>
+</html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/2b435cdf/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
index a396ac8..86cbb74 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
@@ -95,7 +95,10 @@ public class TestSiteToSiteProvenanceReportingTask {
 
                 final List<ProvenanceEventRecord> eventsToReturn = new ArrayList<>();
                 for (int i = (int) Math.max(0, startId); i < (int) (startId + maxRecords) && totalEvents.get() < maxEventId; i++) {
-                    eventsToReturn.add(event);
+                    if (event != null) {
+                        eventsToReturn.add(event);
+                    }
+
                     totalEvents.getAndIncrement();
                 }
                 return eventsToReturn;
@@ -304,7 +307,12 @@ public class TestSiteToSiteProvenanceReportingTask {
         final long maxEventId = 2500;
 
         // create the mock reporting task and mock state manager
-        final MockSiteToSiteProvenanceReportingTask task = new MockSiteToSiteProvenanceReportingTask();
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
+            properties.put(descriptor, descriptor.getDefaultValue());
+        }
+
+        final MockSiteToSiteProvenanceReportingTask task = setup(null, properties);
         final MockStateManager stateManager = new MockStateManager(task);
 
         // create the state map and set the last id to the same value as maxEventId
@@ -312,10 +320,6 @@ public class TestSiteToSiteProvenanceReportingTask {
         state.put(SiteToSiteProvenanceReportingTask.LAST_EVENT_ID_KEY, String.valueOf(maxEventId));
         stateManager.setState(state, Scope.LOCAL);
 
-        // setup the mock reporting context to return the mock state manager
-        final ReportingContext context = Mockito.mock(ReportingContext.class);
-        Mockito.when(context.getStateManager()).thenReturn(stateManager);
-
         // setup the mock provenance repository to return maxEventId
         final ProvenanceEventRepository provenanceRepository = Mockito.mock(ProvenanceEventRepository.class);
         Mockito.doAnswer(new Answer<Long>() {
@@ -327,15 +331,8 @@ public class TestSiteToSiteProvenanceReportingTask {
 
         // setup the mock EventAccess to return the mock provenance repository
         final EventAccess eventAccess = Mockito.mock(EventAccess.class);
-        Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
         Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository);
 
-        // setup the mock initialization context
-        final ComponentLog logger = Mockito.mock(ComponentLog.class);
-        final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class);
-        Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
-        Mockito.when(initContext.getLogger()).thenReturn(logger);
-
         task.initialize(initContext);
 
         // execute the reporting task and should not produce any data b/c max id same as previous id