You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/02/06 04:36:41 UTC

[45/50] nifi git commit: NIFI-259 Corrected GetHttp state managment and added a new unit test

NIFI-259 Corrected GetHttp state managment and added a new unit test


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/88d4d2ce
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/88d4d2ce
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/88d4d2ce

Branch: refs/heads/master
Commit: 88d4d2ce5fe421b6e8955a7b875ad3e01195bd13
Parents: 55b77fe
Author: jpercivall <jo...@yahoo.com>
Authored: Wed Feb 3 20:04:39 2016 -0500
Committer: jpercivall <jo...@yahoo.com>
Committed: Wed Feb 3 20:04:39 2016 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/GetHTTP.java       | 98 ++++++++++++++++----
 .../nifi/processors/standard/TestGetHTTP.java   | 84 +++++++++++++++--
 2 files changed, 156 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/88d4d2ce/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
index 48f22c5..818cd3b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
@@ -75,6 +75,7 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.expression.AttributeExpression;
 import org.apache.nifi.flowfile.FlowFile;
@@ -91,11 +92,14 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.ssl.SSLContextService.ClientAuth;
 import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.Tuple;
 
 @Tags({"get", "fetch", "poll", "http", "https", "ingest", "source", "input"})
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @CapabilityDescription("Fetches a file via HTTP. If the HTTP server supports it, the Processor then stores the Last Modified time and the ETag "
-    + "so that data will not be pulled again until the remote data changes or until the state is cleared.")
+    + "so that data will not be pulled again until the remote data changes or until the state is cleared. Note that due to limitations on state "
+    + "management, stored \"last modified\" and etag fields never expire. If the URL in GetHttp uses Expression Language that is unbounded, there "
+    + "is the potential for Out of Memory Errors to occur.")
 @WritesAttributes({
     @WritesAttribute(attribute = "filename", description = "The filename is set to the name of the file on the remote server"),
     @WritesAttribute(attribute = "mime.type", description = "The MIME Type of the FlowFile, as reported by the HTTP Content-Type header")
@@ -400,16 +404,18 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
             final HttpGet get = new HttpGet(url);
             get.setConfig(requestConfigBuilder.build());
 
+            final StateMap beforeStateMap;
+
             try {
-                final StateMap stateMap = context.getStateManager().getState(Scope.LOCAL);
-                final String lastModified = stateMap.get(LAST_MODIFIED);
+                beforeStateMap = context.getStateManager().getState(Scope.LOCAL);
+                final String lastModified = beforeStateMap.get(LAST_MODIFIED+":" + url);
                 if (lastModified != null) {
-                    get.addHeader(HEADER_IF_MODIFIED_SINCE, lastModified);
+                    get.addHeader(HEADER_IF_MODIFIED_SINCE, parseStateValue(lastModified).getValue());
                 }
 
-                final String etag = stateMap.get(ETAG);
+                final String etag = beforeStateMap.get(ETAG+":" + url);
                 if (etag != null) {
-                    get.addHeader(HEADER_IF_NONE_MATCH, etag);
+                    get.addHeader(HEADER_IF_NONE_MATCH, parseStateValue(etag).getValue());
                 }
             } catch (final IOException ioe) {
                 throw new ProcessException(ioe);
@@ -461,20 +467,8 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
                 logger.info("Successfully received {} from {} at a rate of {}; transferred to success", new Object[]{flowFile, url, dataRate});
                 session.commit();
 
-                final Map<String, String> updatedState = new HashMap<>(2);
-                final Header lastModified = response.getFirstHeader(HEADER_LAST_MODIFIED);
-                if (lastModified != null) {
-                    updatedState.put(LAST_MODIFIED, lastModified.getValue());
-                }
+                updateStateMap(context,response,beforeStateMap,url);
 
-                final Header etag = response.getFirstHeader(HEADER_ETAG);
-                if (etag != null) {
-                    updatedState.put(ETAG, etag.getValue());
-                }
-
-                if (!updatedState.isEmpty()) {
-                    context.getStateManager().setState(updatedState, Scope.LOCAL);
-                }
             } catch (final IOException e) {
                 context.yield();
                 session.rollback();
@@ -490,4 +484,70 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
             conMan.shutdown();
         }
     }
+
+    private void updateStateMap(ProcessContext context, HttpResponse response, StateMap beforeStateMap, String url){
+        try {
+            Map<String,String> workingMap = new HashMap<>();
+            workingMap.putAll(beforeStateMap.toMap());
+            final StateManager stateManager = context.getStateManager();
+            StateMap oldValue = beforeStateMap;
+
+            long currentTime = System.currentTimeMillis();
+
+            final Header receivedLastModified = response.getFirstHeader(HEADER_LAST_MODIFIED);
+            if (receivedLastModified != null) {
+                workingMap.put(LAST_MODIFIED + ":" + url, currentTime+":"+receivedLastModified.getValue());
+            }
+
+            final Header receivedEtag = response.getFirstHeader(HEADER_ETAG);
+            if (receivedEtag != null) {
+                workingMap.put(ETAG + ":" + url, currentTime+":"+receivedEtag.getValue());
+            }
+
+            boolean replaceSucceeded = stateManager.replace(oldValue, workingMap, Scope.LOCAL);
+            boolean changed;
+
+            while(!replaceSucceeded){
+                oldValue = stateManager.getState(Scope.LOCAL);
+                workingMap.clear();
+                workingMap.putAll(oldValue.toMap());
+
+                changed = false;
+
+                if(receivedLastModified != null){
+                    Tuple<String,String> storedLastModifiedTuple = parseStateValue(workingMap.get(LAST_MODIFIED+":"+url));
+
+                    if(Long.parseLong(storedLastModifiedTuple.getKey()) < currentTime){
+                        workingMap.put(LAST_MODIFIED + ":" + url, currentTime+":"+receivedLastModified.getValue());
+                        changed = true;
+                    }
+                }
+
+                if(receivedEtag != null){
+                    Tuple<String,String> storedLastModifiedTuple = parseStateValue(workingMap.get(ETAG+":"+url));
+
+                    if(Long.parseLong(storedLastModifiedTuple.getKey()) < currentTime){
+                        workingMap.put(ETAG + ":" + url, currentTime+":"+receivedEtag.getValue());
+                        changed = true;
+                    }
+                }
+
+                if(changed) {
+                    replaceSucceeded = stateManager.replace(oldValue, workingMap, Scope.LOCAL);
+                } else {
+                    break;
+                }
+            }
+        } catch (final IOException ioe) {
+            throw new ProcessException(ioe);
+        }
+    }
+
+    protected static Tuple<String, String> parseStateValue(String mapValue){
+        int indexOfColon = mapValue.indexOf(":");
+
+        String timestamp = mapValue.substring(0,indexOfColon);
+        String value = mapValue.substring(indexOfColon+1);
+        return new Tuple<>(timestamp,value);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/88d4d2ce/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java
index 428c811..f8e4122 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java
@@ -32,6 +32,7 @@ import org.junit.Test;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -77,21 +78,21 @@ public class TestGetHTTP {
             controller.run(2);
 
             // verify the lastModified and entityTag are updated
-            controller.getStateManager().assertStateNotEquals(GetHTTP.ETAG, "", Scope.LOCAL);
-            controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED, "Thu, 01 Jan 1970 00:00:00 GMT", Scope.LOCAL);
+            controller.getStateManager().assertStateNotEquals(GetHTTP.ETAG+":"+destination, "", Scope.LOCAL);
+            controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED+":"+destination, "Thu, 01 Jan 1970 00:00:00 GMT", Scope.LOCAL);
 
             // ran twice, but got one...which is good
             controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1);
 
             // verify remote.source flowfile attribute
             controller.getFlowFilesForRelationship(GetHTTP.REL_SUCCESS).get(0).assertAttributeEquals("gethttp.remote.source", "localhost");
-
             controller.clearTransferState();
 
             // turn off checking for etag and lastModified
             RESTServiceContentModified.IGNORE_ETAG = true;
             RESTServiceContentModified.IGNORE_LAST_MODIFIED = true;
             controller.run(2);
+
             // ran twice, got two...which is good
             controller.assertTransferCount(GetHTTP.REL_SUCCESS, 2);
             controller.clearTransferState();
@@ -114,30 +115,99 @@ public class TestGetHTTP {
             RESTServiceContentModified.IGNORE_ETAG = false;
             RESTServiceContentModified.ETAG = 1;
             controller.run(2);
+
             // ran twice, got 1...but should have new cached etag
             controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1);
-            controller.getStateManager().assertStateEquals(GetHTTP.ETAG, "1", Scope.LOCAL);
+            String eTagStateValue = controller.getStateManager().getState(Scope.LOCAL).get(GetHTTP.ETAG+":"+destination);
+            assertEquals("1",GetHTTP.parseStateValue(eTagStateValue).getValue());
             controller.clearTransferState();
 
             // turn off checking for Etag, turn on checking for lastModified, but change value
             RESTServiceContentModified.IGNORE_LAST_MODIFIED = false;
             RESTServiceContentModified.IGNORE_ETAG = true;
             RESTServiceContentModified.modificationDate = System.currentTimeMillis() / 1000 * 1000 + 5000;
-            String lastMod = controller.getStateManager().getState(Scope.LOCAL).get(GetHTTP.LAST_MODIFIED);
+            String lastMod = controller.getStateManager().getState(Scope.LOCAL).get(GetHTTP.LAST_MODIFIED+":"+destination);
             controller.run(2);
+
             // ran twice, got 1...but should have new cached etag
             controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1);
-            controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED, lastMod, Scope.LOCAL);
+            controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED+":"+destination, lastMod, Scope.LOCAL);
             controller.clearTransferState();
 
-            // shutdown web service
         } finally {
+            // shutdown web service
             server.shutdownServer();
         }
     }
 
 
     @Test
+    public final void testContentModifiedTwoServers() throws Exception {
+        // set up web services
+        ServletHandler handler1 = new ServletHandler();
+        handler1.addServletWithMapping(RESTServiceContentModified.class, "/*");
+
+        ServletHandler handler2 = new ServletHandler();
+        handler2.addServletWithMapping(RESTServiceContentModified.class, "/*");
+
+        // create the services
+        TestServer server1 = new TestServer();
+        server1.addHandler(handler1);
+
+        TestServer server2 = new TestServer();
+        server2.addHandler(handler2);
+
+        try {
+            server1.startServer();
+            server2.startServer();
+
+            // this is the base urls with the random ports
+            String destination1 = server1.getUrl();
+            String destination2 = server2.getUrl();
+
+            // set up NiFi mock controller
+            controller = TestRunners.newTestRunner(GetHTTP.class);
+            controller.setProperty(GetHTTP.CONNECTION_TIMEOUT, "5 secs");
+            controller.setProperty(GetHTTP.URL, destination1);
+            controller.setProperty(GetHTTP.FILENAME, "testFile");
+            controller.setProperty(GetHTTP.ACCEPT_CONTENT_TYPE, "application/json");
+
+            controller.getStateManager().assertStateNotSet(GetHTTP.ETAG+":"+destination1, Scope.LOCAL);
+            controller.getStateManager().assertStateNotSet(GetHTTP.LAST_MODIFIED+":"+destination1, Scope.LOCAL);
+            controller.run(2);
+
+            // verify the lastModified and entityTag are updated
+            controller.getStateManager().assertStateNotEquals(GetHTTP.ETAG+":"+destination1, "", Scope.LOCAL);
+            controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED+":"+destination1, "Thu, 01 Jan 1970 00:00:00 GMT", Scope.LOCAL);
+
+            // ran twice, but got one...which is good
+            controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1);
+
+            controller.clearTransferState();
+
+            controller.setProperty(GetHTTP.URL, destination2);
+            controller.getStateManager().assertStateNotSet(GetHTTP.ETAG+":"+destination2, Scope.LOCAL);
+            controller.getStateManager().assertStateNotSet(GetHTTP.LAST_MODIFIED+":"+destination2, Scope.LOCAL);
+
+            controller.run(2);
+
+            // ran twice, but got one...which is good
+            controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1);
+
+            // verify the lastModified's and entityTags are updated
+            controller.getStateManager().assertStateNotEquals(GetHTTP.ETAG+":"+destination1, "", Scope.LOCAL);
+            controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED+":"+destination1, "Thu, 01 Jan 1970 00:00:00 GMT", Scope.LOCAL);
+            controller.getStateManager().assertStateNotEquals(GetHTTP.ETAG+":"+destination2, "", Scope.LOCAL);
+            controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED+":"+destination2, "Thu, 01 Jan 1970 00:00:00 GMT", Scope.LOCAL);
+
+        } finally {
+            // shutdown web services
+            server1.shutdownServer();
+            server2.shutdownServer();
+        }
+    }
+
+    @Test
     public final void testUserAgent() throws Exception {
         // set up web service
         ServletHandler handler = new ServletHandler();