You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/02/06 04:36:41 UTC
[45/50] nifi git commit: NIFI-259 Corrected GetHttp state managment
and added a new unit test
NIFI-259 Corrected GetHttp state managment and added a new unit test
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/88d4d2ce
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/88d4d2ce
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/88d4d2ce
Branch: refs/heads/master
Commit: 88d4d2ce5fe421b6e8955a7b875ad3e01195bd13
Parents: 55b77fe
Author: jpercivall <jo...@yahoo.com>
Authored: Wed Feb 3 20:04:39 2016 -0500
Committer: jpercivall <jo...@yahoo.com>
Committed: Wed Feb 3 20:04:39 2016 -0500
----------------------------------------------------------------------
.../nifi/processors/standard/GetHTTP.java | 98 ++++++++++++++++----
.../nifi/processors/standard/TestGetHTTP.java | 84 +++++++++++++++--
2 files changed, 156 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/88d4d2ce/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
index 48f22c5..818cd3b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
@@ -75,6 +75,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.flowfile.FlowFile;
@@ -91,11 +92,14 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.SSLContextService.ClientAuth;
import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.Tuple;
@Tags({"get", "fetch", "poll", "http", "https", "ingest", "source", "input"})
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@CapabilityDescription("Fetches a file via HTTP. If the HTTP server supports it, the Processor then stores the Last Modified time and the ETag "
- + "so that data will not be pulled again until the remote data changes or until the state is cleared.")
+ + "so that data will not be pulled again until the remote data changes or until the state is cleared. Note that due to limitations on state "
+ + "management, stored \"last modified\" and etag fields never expire. If the URL in GetHttp uses Expression Language that is unbounded, there "
+ + "is the potential for Out of Memory Errors to occur.")
@WritesAttributes({
@WritesAttribute(attribute = "filename", description = "The filename is set to the name of the file on the remote server"),
@WritesAttribute(attribute = "mime.type", description = "The MIME Type of the FlowFile, as reported by the HTTP Content-Type header")
@@ -400,16 +404,18 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
final HttpGet get = new HttpGet(url);
get.setConfig(requestConfigBuilder.build());
+ final StateMap beforeStateMap;
+
try {
- final StateMap stateMap = context.getStateManager().getState(Scope.LOCAL);
- final String lastModified = stateMap.get(LAST_MODIFIED);
+ beforeStateMap = context.getStateManager().getState(Scope.LOCAL);
+ final String lastModified = beforeStateMap.get(LAST_MODIFIED+":" + url);
if (lastModified != null) {
- get.addHeader(HEADER_IF_MODIFIED_SINCE, lastModified);
+ get.addHeader(HEADER_IF_MODIFIED_SINCE, parseStateValue(lastModified).getValue());
}
- final String etag = stateMap.get(ETAG);
+ final String etag = beforeStateMap.get(ETAG+":" + url);
if (etag != null) {
- get.addHeader(HEADER_IF_NONE_MATCH, etag);
+ get.addHeader(HEADER_IF_NONE_MATCH, parseStateValue(etag).getValue());
}
} catch (final IOException ioe) {
throw new ProcessException(ioe);
@@ -461,20 +467,8 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
logger.info("Successfully received {} from {} at a rate of {}; transferred to success", new Object[]{flowFile, url, dataRate});
session.commit();
- final Map<String, String> updatedState = new HashMap<>(2);
- final Header lastModified = response.getFirstHeader(HEADER_LAST_MODIFIED);
- if (lastModified != null) {
- updatedState.put(LAST_MODIFIED, lastModified.getValue());
- }
+ updateStateMap(context,response,beforeStateMap,url);
- final Header etag = response.getFirstHeader(HEADER_ETAG);
- if (etag != null) {
- updatedState.put(ETAG, etag.getValue());
- }
-
- if (!updatedState.isEmpty()) {
- context.getStateManager().setState(updatedState, Scope.LOCAL);
- }
} catch (final IOException e) {
context.yield();
session.rollback();
@@ -490,4 +484,70 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
conMan.shutdown();
}
}
+
+ private void updateStateMap(ProcessContext context, HttpResponse response, StateMap beforeStateMap, String url){
+ try {
+ Map<String,String> workingMap = new HashMap<>();
+ workingMap.putAll(beforeStateMap.toMap());
+ final StateManager stateManager = context.getStateManager();
+ StateMap oldValue = beforeStateMap;
+
+ long currentTime = System.currentTimeMillis();
+
+ final Header receivedLastModified = response.getFirstHeader(HEADER_LAST_MODIFIED);
+ if (receivedLastModified != null) {
+ workingMap.put(LAST_MODIFIED + ":" + url, currentTime+":"+receivedLastModified.getValue());
+ }
+
+ final Header receivedEtag = response.getFirstHeader(HEADER_ETAG);
+ if (receivedEtag != null) {
+ workingMap.put(ETAG + ":" + url, currentTime+":"+receivedEtag.getValue());
+ }
+
+ boolean replaceSucceeded = stateManager.replace(oldValue, workingMap, Scope.LOCAL);
+ boolean changed;
+
+ while(!replaceSucceeded){
+ oldValue = stateManager.getState(Scope.LOCAL);
+ workingMap.clear();
+ workingMap.putAll(oldValue.toMap());
+
+ changed = false;
+
+ if(receivedLastModified != null){
+ Tuple<String,String> storedLastModifiedTuple = parseStateValue(workingMap.get(LAST_MODIFIED+":"+url));
+
+ if(Long.parseLong(storedLastModifiedTuple.getKey()) < currentTime){
+ workingMap.put(LAST_MODIFIED + ":" + url, currentTime+":"+receivedLastModified.getValue());
+ changed = true;
+ }
+ }
+
+ if(receivedEtag != null){
+ Tuple<String,String> storedLastModifiedTuple = parseStateValue(workingMap.get(ETAG+":"+url));
+
+ if(Long.parseLong(storedLastModifiedTuple.getKey()) < currentTime){
+ workingMap.put(ETAG + ":" + url, currentTime+":"+receivedEtag.getValue());
+ changed = true;
+ }
+ }
+
+ if(changed) {
+ replaceSucceeded = stateManager.replace(oldValue, workingMap, Scope.LOCAL);
+ } else {
+ break;
+ }
+ }
+ } catch (final IOException ioe) {
+ throw new ProcessException(ioe);
+ }
+ }
+
+ protected static Tuple<String, String> parseStateValue(String mapValue){
+ int indexOfColon = mapValue.indexOf(":");
+
+ String timestamp = mapValue.substring(0,indexOfColon);
+ String value = mapValue.substring(indexOfColon+1);
+ return new Tuple<>(timestamp,value);
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/88d4d2ce/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java
index 428c811..f8e4122 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java
@@ -32,6 +32,7 @@ import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
@@ -77,21 +78,21 @@ public class TestGetHTTP {
controller.run(2);
// verify the lastModified and entityTag are updated
- controller.getStateManager().assertStateNotEquals(GetHTTP.ETAG, "", Scope.LOCAL);
- controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED, "Thu, 01 Jan 1970 00:00:00 GMT", Scope.LOCAL);
+ controller.getStateManager().assertStateNotEquals(GetHTTP.ETAG+":"+destination, "", Scope.LOCAL);
+ controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED+":"+destination, "Thu, 01 Jan 1970 00:00:00 GMT", Scope.LOCAL);
// ran twice, but got one...which is good
controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1);
// verify remote.source flowfile attribute
controller.getFlowFilesForRelationship(GetHTTP.REL_SUCCESS).get(0).assertAttributeEquals("gethttp.remote.source", "localhost");
-
controller.clearTransferState();
// turn off checking for etag and lastModified
RESTServiceContentModified.IGNORE_ETAG = true;
RESTServiceContentModified.IGNORE_LAST_MODIFIED = true;
controller.run(2);
+
// ran twice, got two...which is good
controller.assertTransferCount(GetHTTP.REL_SUCCESS, 2);
controller.clearTransferState();
@@ -114,30 +115,99 @@ public class TestGetHTTP {
RESTServiceContentModified.IGNORE_ETAG = false;
RESTServiceContentModified.ETAG = 1;
controller.run(2);
+
// ran twice, got 1...but should have new cached etag
controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1);
- controller.getStateManager().assertStateEquals(GetHTTP.ETAG, "1", Scope.LOCAL);
+ String eTagStateValue = controller.getStateManager().getState(Scope.LOCAL).get(GetHTTP.ETAG+":"+destination);
+ assertEquals("1",GetHTTP.parseStateValue(eTagStateValue).getValue());
controller.clearTransferState();
// turn off checking for Etag, turn on checking for lastModified, but change value
RESTServiceContentModified.IGNORE_LAST_MODIFIED = false;
RESTServiceContentModified.IGNORE_ETAG = true;
RESTServiceContentModified.modificationDate = System.currentTimeMillis() / 1000 * 1000 + 5000;
- String lastMod = controller.getStateManager().getState(Scope.LOCAL).get(GetHTTP.LAST_MODIFIED);
+ String lastMod = controller.getStateManager().getState(Scope.LOCAL).get(GetHTTP.LAST_MODIFIED+":"+destination);
controller.run(2);
+
// ran twice, got 1...but should have new cached etag
controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1);
- controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED, lastMod, Scope.LOCAL);
+ controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED+":"+destination, lastMod, Scope.LOCAL);
controller.clearTransferState();
- // shutdown web service
} finally {
+ // shutdown web service
server.shutdownServer();
}
}
@Test
+ public final void testContentModifiedTwoServers() throws Exception {
+ // set up web services
+ ServletHandler handler1 = new ServletHandler();
+ handler1.addServletWithMapping(RESTServiceContentModified.class, "/*");
+
+ ServletHandler handler2 = new ServletHandler();
+ handler2.addServletWithMapping(RESTServiceContentModified.class, "/*");
+
+ // create the services
+ TestServer server1 = new TestServer();
+ server1.addHandler(handler1);
+
+ TestServer server2 = new TestServer();
+ server2.addHandler(handler2);
+
+ try {
+ server1.startServer();
+ server2.startServer();
+
+ // this is the base urls with the random ports
+ String destination1 = server1.getUrl();
+ String destination2 = server2.getUrl();
+
+ // set up NiFi mock controller
+ controller = TestRunners.newTestRunner(GetHTTP.class);
+ controller.setProperty(GetHTTP.CONNECTION_TIMEOUT, "5 secs");
+ controller.setProperty(GetHTTP.URL, destination1);
+ controller.setProperty(GetHTTP.FILENAME, "testFile");
+ controller.setProperty(GetHTTP.ACCEPT_CONTENT_TYPE, "application/json");
+
+ controller.getStateManager().assertStateNotSet(GetHTTP.ETAG+":"+destination1, Scope.LOCAL);
+ controller.getStateManager().assertStateNotSet(GetHTTP.LAST_MODIFIED+":"+destination1, Scope.LOCAL);
+ controller.run(2);
+
+ // verify the lastModified and entityTag are updated
+ controller.getStateManager().assertStateNotEquals(GetHTTP.ETAG+":"+destination1, "", Scope.LOCAL);
+ controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED+":"+destination1, "Thu, 01 Jan 1970 00:00:00 GMT", Scope.LOCAL);
+
+ // ran twice, but got one...which is good
+ controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1);
+
+ controller.clearTransferState();
+
+ controller.setProperty(GetHTTP.URL, destination2);
+ controller.getStateManager().assertStateNotSet(GetHTTP.ETAG+":"+destination2, Scope.LOCAL);
+ controller.getStateManager().assertStateNotSet(GetHTTP.LAST_MODIFIED+":"+destination2, Scope.LOCAL);
+
+ controller.run(2);
+
+ // ran twice, but got one...which is good
+ controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1);
+
+ // verify the lastModified's and entityTags are updated
+ controller.getStateManager().assertStateNotEquals(GetHTTP.ETAG+":"+destination1, "", Scope.LOCAL);
+ controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED+":"+destination1, "Thu, 01 Jan 1970 00:00:00 GMT", Scope.LOCAL);
+ controller.getStateManager().assertStateNotEquals(GetHTTP.ETAG+":"+destination2, "", Scope.LOCAL);
+ controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED+":"+destination2, "Thu, 01 Jan 1970 00:00:00 GMT", Scope.LOCAL);
+
+ } finally {
+ // shutdown web services
+ server1.shutdownServer();
+ server2.shutdownServer();
+ }
+ }
+
+ @Test
public final void testUserAgent() throws Exception {
// set up web service
ServletHandler handler = new ServletHandler();