You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2018/10/11 14:05:39 UTC

nifi git commit: NIFI-5645: Auto reconnect ConsumeWindowsEventLog

Repository: nifi
Updated Branches:
  refs/heads/master 97afa4e7b -> a6f722222


NIFI-5645: Auto reconnect ConsumeWindowsEventLog

This commit also contains following refactoring:
- Catch URISyntaxException inside subscribe when constructing provenance
URI as it does not affect the core responsibility of this processor.
Even if it fails to be a proper URI, if the query works for consuming
logs, the processor should proceed forward.

Upgrade JNA version.

Do not update lastActivityTimestamp when subscribe failed.

This closes #3037


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a6f72222
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a6f72222
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a6f72222

Branch: refs/heads/master
Commit: a6f722222a7fa31fef06a465efe506fd0a773eec
Parents: 97afa4e
Author: Koji Kawamura <ij...@apache.org>
Authored: Fri Sep 28 17:37:34 2018 +0900
Committer: Matt Gilman <ma...@gmail.com>
Committed: Thu Oct 11 10:05:26 2018 -0400

----------------------------------------------------------------------
 .../nifi-windows-event-log-processors/pom.xml   |  4 +-
 .../event/log/ConsumeWindowsEventLog.java       | 95 +++++++++++++++-----
 2 files changed, 76 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a6f72222/nifi-nar-bundles/nifi-windows-event-log-bundle/nifi-windows-event-log-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-windows-event-log-bundle/nifi-windows-event-log-processors/pom.xml b/nifi-nar-bundles/nifi-windows-event-log-bundle/nifi-windows-event-log-processors/pom.xml
index a504a3b..4b7d063 100644
--- a/nifi-nar-bundles/nifi-windows-event-log-bundle/nifi-windows-event-log-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-windows-event-log-bundle/nifi-windows-event-log-processors/pom.xml
@@ -21,8 +21,8 @@ language governing permissions and limitations under the License. -->
     <packaging>jar</packaging>
 
     <properties>
-        <jna.version>4.2.2</jna.version>
-        <javassist.version>3.20.0-GA</javassist.version>
+        <jna.version>4.5.2</jna.version>
+        <javassist.version>3.23.1-GA</javassist.version>
     </properties>
 
     <dependencies>

http://git-wip-us.apache.org/repos/asf/nifi/blob/a6f72222/nifi-nar-bundles/nifi-windows-event-log-bundle/nifi-windows-event-log-processors/src/main/java/org/apache/nifi/processors/windows/event/log/ConsumeWindowsEventLog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-windows-event-log-bundle/nifi-windows-event-log-processors/src/main/java/org/apache/nifi/processors/windows/event/log/ConsumeWindowsEventLog.java b/nifi-nar-bundles/nifi-windows-event-log-bundle/nifi-windows-event-log-processors/src/main/java/org/apache/nifi/processors/windows/event/log/ConsumeWindowsEventLog.java
index 87aa07f..7dc4236 100644
--- a/nifi-nar-bundles/nifi-windows-event-log-bundle/nifi-windows-event-log-processors/src/main/java/org/apache/nifi/processors/windows/event/log/ConsumeWindowsEventLog.java
+++ b/nifi-nar-bundles/nifi-windows-event-log-bundle/nifi-windows-event-log-processors/src/main/java/org/apache/nifi/processors/windows/event/log/ConsumeWindowsEventLog.java
@@ -20,6 +20,7 @@ package org.apache.nifi.processors.windows.event.log;
 import com.sun.jna.platform.win32.Kernel32;
 import com.sun.jna.platform.win32.Kernel32Util;
 import com.sun.jna.platform.win32.WinNT;
+
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.charset.StandardCharsets;
@@ -32,6 +33,8 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -43,6 +46,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
@@ -77,6 +81,7 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
             .defaultValue(DEFAULT_CHANNEL)
             .description("The Windows Event Log Channel to listen to.")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
     public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
@@ -86,6 +91,7 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
             .defaultValue(DEFAULT_XPATH)
             .description("XPath Query to filter events. (See https://msdn.microsoft.com/en-us/library/windows/desktop/dd996910(v=vs.85).aspx for examples.)")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
     public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
@@ -108,7 +114,21 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
 
-    public static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(CHANNEL, QUERY, MAX_BUFFER_SIZE, MAX_EVENT_QUEUE_SIZE));
+    public static final PropertyDescriptor INACTIVE_DURATION_TO_RECONNECT = new PropertyDescriptor.Builder()
+            .name("inactiveDurationToReconnect")
+            .displayName("Inactive duration to reconnect")
+            .description("If no new event logs are processed for the specified time period," +
+                    " this processor will try reconnecting to recover from a state where any further messages cannot be consumed." +
+                    " Such situation can happen if Windows Event Log service is restarted, or ERROR_EVT_QUERY_RESULT_STALE (15011) is returned." +
+                    " Setting no duration, e.g. '0 ms' disables auto-reconnection.")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .defaultValue("10 mins")
+            .addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.MILLISECONDS))
+            .build();
+
+    public static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.unmodifiableList(
+            Arrays.asList(CHANNEL, QUERY, MAX_BUFFER_SIZE, MAX_EVENT_QUEUE_SIZE, INACTIVE_DURATION_TO_RECONNECT));
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
@@ -134,6 +154,9 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
     private ProcessSessionFactory sessionFactory;
     private String provenanceUri;
 
+    private long inactiveDurationToReconnect = 0;
+    private long lastActivityTimestamp = 0;
+
     /**
      * Framework constructor
      */
@@ -182,12 +205,20 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
      *
      * @param context the process context
      */
-    private String subscribe(ProcessContext context) throws URISyntaxException {
-        String channel = context.getProperty(CHANNEL).getValue();
-        String query = context.getProperty(QUERY).getValue();
+    private String subscribe(ProcessContext context) {
+        final String channel = context.getProperty(CHANNEL).evaluateAttributeExpressions().getValue();
+        final String query = context.getProperty(QUERY).evaluateAttributeExpressions().getValue();
 
         renderedXMLs = new LinkedBlockingQueue<>(context.getProperty(MAX_EVENT_QUEUE_SIZE).asInteger());
-        provenanceUri = new URI("winlog", name, "/" + channel, query, null).toASCIIString();
+
+        try {
+            provenanceUri = new URI("winlog", name, "/" + channel, query, null).toASCIIString();
+        } catch (URISyntaxException e) {
+            getLogger().debug("Failed to construct detailed provenanceUri from channel={}, query={}, use simpler one.", new Object[]{channel, query});
+            provenanceUri = String.format("winlog://%s/%s", name, channel);
+        }
+
+        inactiveDurationToReconnect = context.getProperty(INACTIVE_DURATION_TO_RECONNECT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
 
         evtSubscribeCallback = new EventSubscribeXmlRenderingCallback(getLogger(), s -> {
             try {
@@ -199,9 +230,12 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
 
         subscriptionHandle = wEvtApi.EvtSubscribe(null, null, channel, query, null, null,
                 evtSubscribeCallback, WEvtApi.EvtSubscribeFlags.SUBSCRIBE_TO_FUTURE | WEvtApi.EvtSubscribeFlags.EVT_SUBSCRIBE_STRICT);
+
         if (!isSubscribed()) {
             return UNABLE_TO_SUBSCRIBE + errorLookup.getLastError();
         }
+
+        lastActivityTimestamp = System.currentTimeMillis();
         return null;
     }
 
@@ -210,7 +244,7 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
     }
 
     @OnScheduled
-    public void onScheduled(ProcessContext context) throws AlreadySubscribedException, URISyntaxException {
+    public void onScheduled(ProcessContext context) throws AlreadySubscribedException {
         if (isSubscribed()) {
             throw new AlreadySubscribedException(PROCESSOR_ALREADY_SUBSCRIBED);
         }
@@ -225,11 +259,8 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
      */
     @OnStopped
     public void stop() {
-        if (isSubscribed()) {
-            wEvtApi.EvtClose(subscriptionHandle);
-        }
-        subscriptionHandle = null;
-        evtSubscribeCallback = null;
+        unsubscribe();
+
         if (!renderedXMLs.isEmpty()) {
             if (sessionFactory != null) {
                 getLogger().info("Finishing processing leftover events");
@@ -246,29 +277,49 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
         renderedXMLs = null;
     }
 
+    private void unsubscribe() {
+        if (isSubscribed()) {
+            wEvtApi.EvtClose(subscriptionHandle);
+        }
+        subscriptionHandle = null;
+        evtSubscribeCallback = null;
+    }
+
     @Override
     public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
         this.sessionFactory = sessionFactory;
+
         if (!isSubscribed()) {
-            String errorMessage;
-            try {
-                errorMessage = subscribe(context);
-            } catch (URISyntaxException e) {
-                getLogger().error(e.getMessage(), e);
-                context.yield();
-                return;
-            }
+            String errorMessage = subscribe(context);
             if (errorMessage != null) {
                 context.yield();
                 getLogger().error(errorMessage);
                 return;
             }
         }
-        processQueue(sessionFactory.createSession());
+
+        final int flowFileCount = processQueue(sessionFactory.createSession());
+
+        final long now = System.currentTimeMillis();
+        if (flowFileCount > 0) {
+            lastActivityTimestamp = now;
+
+        } else if (inactiveDurationToReconnect > 0) {
+            if ((now - lastActivityTimestamp) > inactiveDurationToReconnect) {
+                getLogger().info("Exceeds configured 'inactive duration to reconnect' {} ms. Unsubscribe to reconnect..", new Object[]{inactiveDurationToReconnect});
+                unsubscribe();
+            }
+        }
     }
 
-    private void processQueue(ProcessSession session) {
+    /**
+     * Create FlowFiles from received logs.
+     * @return the number of created FlowFiles
+     */
+    private int processQueue(ProcessSession session) {
         String xml;
+        int flowFileCount = 0;
+
         while ((xml = renderedXMLs.peek()) != null) {
             FlowFile flowFile = session.create();
             byte[] xmlBytes = xml.getBytes(StandardCharsets.UTF_8);
@@ -277,6 +328,7 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
             session.getProvenanceReporter().receive(flowFile, provenanceUri);
             session.transfer(flowFile, REL_SUCCESS);
             session.commit();
+            flowFileCount++;
             if (!renderedXMLs.remove(xml) && getLogger().isWarnEnabled()) {
                 getLogger().warn(new StringBuilder("Event ")
                         .append(xml)
@@ -286,6 +338,7 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
                         .toString());
             }
         }
+        return flowFileCount;
     }
 
     @Override