You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2018/04/23 19:40:22 UTC
[1/2] nifi git commit: NIFI-3576 Support for QueryInfo relationship,
can be used to track no-hits
Repository: nifi
Updated Branches:
refs/heads/master 0e736f59f -> 5ca6261de
NIFI-3576 Support for QueryInfo relationship, can be used to track no-hits
Squashed commit includes related commit from GitHub user wietze.
This closes #2601
Signed-off-by: Mike Thomsen <mi...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/45bc1f1b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/45bc1f1b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/45bc1f1b
Branch: refs/heads/master
Commit: 45bc1f1b4f590db0e4a3cb76fcd4cd89ca1a06bf
Parents: 2799211
Author: Otto Fowler <ot...@gmail.com>
Authored: Mon Apr 2 13:16:39 2018 -0400
Committer: Mike Thomsen <mi...@gmail.com>
Committed: Mon Apr 23 15:38:27 2018 -0400
----------------------------------------------------------------------
.../elasticsearch/QueryElasticsearchHttp.java | 80 +++-
.../TestQueryElasticsearchHttp.java | 20 +
.../TestQueryElasticsearchHttpNoHits.java | 363 +++++++++++++++++++
3 files changed, 452 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/45bc1f1b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
index 0f6ec46..15ac65d 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
@@ -16,7 +16,10 @@
*/
package org.apache.nifi.processors.elasticsearch;
+import static org.apache.nifi.flowfile.attributes.CoreAttributes.MIME_TYPE;
+
import com.fasterxml.jackson.databind.JsonNode;
+import java.util.Arrays;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Response;
@@ -31,6 +34,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
@@ -69,8 +73,10 @@ import java.util.stream.Stream;
+ "To retrieve more records, use the ScrollElasticsearchHttp processor.")
@WritesAttributes({
@WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"),
+ @WritesAttribute(attribute = "es.query.hitcount", description = "The number of hits for a query"),
@WritesAttribute(attribute = "es.id", description = "The Elasticsearch document identifier"),
@WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
+ @WritesAttribute(attribute = "es.query.url", description = "The Elasticsearch query that was built"),
@WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type"),
@WritesAttribute(attribute = "es.result.*", description = "If Target is 'Flow file attributes', the JSON attributes of "
+ "each result will be placed into corresponding attributes with this prefix.") })
@@ -81,12 +87,21 @@ import java.util.stream.Stream;
description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing")
public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
+ public enum QueryInfoRouteStrategy {
+ NEVER,
+ ALWAYS,
+ NOHIT
+ }
+
private static final String FROM_QUERY_PARAM = "from";
public static final String TARGET_FLOW_FILE_CONTENT = "Flow file content";
public static final String TARGET_FLOW_FILE_ATTRIBUTES = "Flow file attributes";
private static final String ATTRIBUTE_PREFIX = "es.result.";
+ static final AllowableValue ALWAYS = new AllowableValue(QueryInfoRouteStrategy.ALWAYS.name(), "Always", "Always route Query Info");
+ static final AllowableValue NEVER = new AllowableValue(QueryInfoRouteStrategy.NEVER.name(), "Never", "Never route Query Info");
+ static final AllowableValue NO_HITS = new AllowableValue(QueryInfoRouteStrategy.NOHIT.name(), "No Hits", "Route Query Info if the Query returns no hits");
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description(
@@ -107,6 +122,13 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
+ "based on the processor properties and the results of the fetch operation.")
.build();
+ public static final Relationship REL_QUERY_INFO = new Relationship.Builder()
+ .name("query-info")
+ .description(
+ "Depending on the setting of the Routing Strategy for Query Info property, a FlowFile is routed to this relationship with " +
+ "the incoming FlowFile's attributes (if present), the number of hits, and the Elasticsearch query")
+ .build();
+
public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
.name("query-es-query")
.displayName("Query")
@@ -193,16 +215,21 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
- private static final Set<Relationship> relationships;
+ public static final PropertyDescriptor ROUTING_QUERY_INFO_STRATEGY = new PropertyDescriptor.Builder()
+ .name("routing-query-info-strategy")
+ .displayName("Routing Strategy for Query Info")
+ .description("Specifies when to generate and route Query Info after a successful query")
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .allowableValues(ALWAYS, NEVER, NO_HITS)
+ .defaultValue(NEVER.getValue())
+ .required(false)
+ .build();
+
+ private volatile Set<Relationship> relationships = new HashSet<>(Arrays.asList(new Relationship[] {REL_SUCCESS, REL_FAILURE, REL_RETRY}));
private static final List<PropertyDescriptor> propertyDescriptors;
+ private QueryInfoRouteStrategy queryInfoRouteStrategy = QueryInfoRouteStrategy.NEVER;
static {
- final Set<Relationship> _rels = new HashSet<>();
- _rels.add(REL_SUCCESS);
- _rels.add(REL_FAILURE);
- _rels.add(REL_RETRY);
- relationships = Collections.unmodifiableSet(_rels);
-
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(ES_URL);
descriptors.add(PROP_SSL_CONTEXT_SERVICE);
@@ -218,6 +245,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
descriptors.add(SORT);
descriptors.add(LIMIT);
descriptors.add(TARGET);
+ descriptors.add(ROUTING_QUERY_INFO_STRATEGY);
propertyDescriptors = Collections.unmodifiableList(descriptors);
}
@@ -238,6 +266,23 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
}
@Override
+ public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+
+ if (ROUTING_QUERY_INFO_STRATEGY.equals(descriptor)) {
+ final Set<Relationship> relationshipSet = new HashSet<>();
+ relationshipSet.add(REL_SUCCESS);
+ relationshipSet.add(REL_FAILURE);
+ relationshipSet.add(REL_RETRY);
+
+ if (ALWAYS.getValue().equalsIgnoreCase(newValue) || NO_HITS.getValue().equalsIgnoreCase(newValue)) {
+ relationshipSet.add(REL_QUERY_INFO);
+ }
+ this.queryInfoRouteStrategy = QueryInfoRouteStrategy.valueOf(newValue);
+ this.relationships = relationshipSet;
+ }
+ }
+
+ @Override
public void onTrigger(final ProcessContext context, final ProcessSession session)
throws ProcessException {
@@ -281,7 +326,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
final ComponentLog logger = getLogger();
int fromIndex = 0;
- int numResults;
+ int numResults = 0;
try {
logger.debug("Querying {}/{} from Elasticsearch: {}", new Object[] { index, docType,
@@ -305,10 +350,11 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
username, password, "GET", null);
numResults = this.getPage(getResponse, queryUrl, context, session, flowFile,
- logger, startNanos, targetIsContent);
+ logger, startNanos, targetIsContent, numResults);
fromIndex += pageSize;
getResponse.close();
- } while (numResults > 0 && !hitLimit);
+ }
+ while (numResults > 0 && !hitLimit);
if (flowFile != null) {
session.remove(flowFile);
@@ -341,7 +387,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
private int getPage(final Response getResponse, final URL url, final ProcessContext context,
final ProcessSession session, FlowFile flowFile, final ComponentLog logger,
- final long startNanos, boolean targetIsContent)
+ final long startNanos, boolean targetIsContent, int priorResultCount)
throws IOException {
List<FlowFile> page = new ArrayList<>();
final int statusCode = getResponse.code();
@@ -352,6 +398,17 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes));
JsonNode hits = responseJson.get("hits").get("hits");
+ // if there are no hits, and there have never been any hits in this run ( priorResultCount ) and
+ // we are in NOHIT or ALWAYS, send the query info
+ if ( (hits.size() == 0 && priorResultCount == 0 && queryInfoRouteStrategy == QueryInfoRouteStrategy.NOHIT)
+ || queryInfoRouteStrategy == QueryInfoRouteStrategy.ALWAYS) {
+ FlowFile queryInfo = flowFile == null ? session.create() : session.create(flowFile);
+ session.putAttribute(queryInfo, "es.query.url", url.toExternalForm());
+ session.putAttribute(queryInfo, "es.query.hitcount", String.valueOf(hits.size()));
+ session.putAttribute(queryInfo, MIME_TYPE.key(), "application/json");
+ session.transfer(queryInfo,REL_QUERY_INFO);
+ }
+
for(int i = 0; i < hits.size(); i++) {
JsonNode hit = hits.get(i);
String retrievedId = hit.get("_id").asText();
@@ -369,6 +426,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
documentFlowFile = session.putAttribute(documentFlowFile, "es.id", retrievedId);
documentFlowFile = session.putAttribute(documentFlowFile, "es.index", retrievedIndex);
documentFlowFile = session.putAttribute(documentFlowFile, "es.type", retrievedType);
+ documentFlowFile = session.putAttribute(documentFlowFile, "es.query.url", url.toExternalForm());
if (targetIsContent) {
documentFlowFile = session.putAttribute(documentFlowFile, "filename", retrievedId);
http://git-wip-us.apache.org/repos/asf/nifi/blob/45bc1f1b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
index 1b07d22..2863264 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
@@ -78,6 +78,25 @@ public class TestQueryElasticsearchHttp {
}
@Test
+ public void testQueryElasticsearchOnTrigger_withInput_withQueryInAttrs() throws IOException {
+ runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
+ runner.setValidateExpressionUsage(true);
+ runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+
+ runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+ runner.assertNotValid();
+ runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+ runner.assertNotValid();
+ runner.setProperty(QueryElasticsearchHttp.QUERY,
+ "source:Twitter AND identifier:\"${identifier}\"");
+ runner.assertValid();
+ runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
+ runner.assertValid();
+
+ runAndVerifySuccess(true);
+ }
+
+ @Test
public void testQueryElasticsearchOnTrigger_withInput_EL() throws IOException {
runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}");
@@ -161,6 +180,7 @@ public class TestQueryElasticsearchHttp {
out.assertAttributeEquals("filename", "abc-97b-ASVsZu_"
+ "vShwtGCJpGOObmuSqUJRUC3L_-SEND-S3");
}
+ out.assertAttributeExists("es.query.url");
}
// By default, 3 files should go to Success
http://git-wip-us.apache.org/repos/asf/nifi/blob/45bc1f1b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttpNoHits.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttpNoHits.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttpNoHits.java
new file mode 100644
index 0000000..862aead
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttpNoHits.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import okhttp3.Call;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Protocol;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mockito.stubbing.OngoingStubbing;
+
+public class TestQueryElasticsearchHttpNoHits {
+
+ private TestRunner runner;
+
+ @After
+ public void teardown() {
+ runner = null;
+ }
+
+
+ @Test
+ public void testQueryElasticsearchOnTrigger_NoHits_NoHits() throws IOException {
+ runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
+ runner.setValidateExpressionUsage(true);
+ runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+
+ runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+ runner.assertNotValid();
+ runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+ runner.assertNotValid();
+ runner.setProperty(QueryElasticsearchHttp.QUERY,
+ "source:Twitter AND identifier:\"${identifier}\"");
+ runner.assertValid();
+ runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
+ runner.assertValid();
+ runner.setProperty(QueryElasticsearchHttp.ROUTING_QUERY_INFO_STRATEGY, QueryElasticsearchHttp.QueryInfoRouteStrategy.NOHIT.name());
+ runner.assertValid();
+
+ runner.setIncomingConnection(false);
+ runAndVerify(0,1,0,true);
+ }
+
+ @Test
+ public void testQueryElasticsearchOnTrigger_NoHits_Never() throws IOException {
+ runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
+ runner.setValidateExpressionUsage(true);
+ runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+
+ runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+ runner.assertNotValid();
+ runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+ runner.assertNotValid();
+ runner.setProperty(QueryElasticsearchHttp.QUERY,
+ "source:Twitter AND identifier:\"${identifier}\"");
+ runner.assertValid();
+ runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
+ runner.assertValid();
+ runner.setProperty(QueryElasticsearchHttp.ROUTING_QUERY_INFO_STRATEGY, QueryElasticsearchHttp.QueryInfoRouteStrategy.NEVER.name());
+ runner.assertValid();
+
+ runner.setIncomingConnection(false);
+ runAndVerify(0,0,0,true);
+ }
+
+ @Test
+ public void testQueryElasticsearchOnTrigger_NoHits_Always() throws IOException {
+ runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
+ runner.setValidateExpressionUsage(true);
+ runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+
+ runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+ runner.assertNotValid();
+ runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+ runner.assertNotValid();
+ runner.setProperty(QueryElasticsearchHttp.QUERY,
+ "source:Twitter AND identifier:\"${identifier}\"");
+ runner.assertValid();
+ runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
+ runner.assertValid();
+ runner.setProperty(QueryElasticsearchHttp.ROUTING_QUERY_INFO_STRATEGY, QueryElasticsearchHttp.QueryInfoRouteStrategy.ALWAYS.name());
+ runner.assertValid();
+
+ runner.setIncomingConnection(false);
+ runAndVerify(0,1,0,true);
+ }
+
+ @Test
+ public void testQueryElasticsearchOnTrigger_Hits_NoHits() throws IOException {
+ runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor(true));
+ runner.setValidateExpressionUsage(true);
+ runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+
+ runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+ runner.assertNotValid();
+ runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+ runner.assertNotValid();
+ runner.setProperty(QueryElasticsearchHttp.QUERY,
+ "source:Twitter AND identifier:\"${identifier}\"");
+ runner.assertValid();
+ runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
+ runner.assertValid();
+ runner.setProperty(QueryElasticsearchHttp.ROUTING_QUERY_INFO_STRATEGY, QueryElasticsearchHttp.QueryInfoRouteStrategy.NOHIT.name());
+ runner.assertValid();
+
+ runner.setIncomingConnection(false);
+ runAndVerify(3,0,0,true);
+ }
+
+ @Test
+ public void testQueryElasticsearchOnTrigger_Hits_Never() throws IOException {
+ runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor(true));
+ runner.setValidateExpressionUsage(true);
+ runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+
+ runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+ runner.assertNotValid();
+ runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+ runner.assertNotValid();
+ runner.setProperty(QueryElasticsearchHttp.QUERY,
+ "source:Twitter AND identifier:\"${identifier}\"");
+ runner.assertValid();
+ runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
+ runner.assertValid();
+ runner.setProperty(QueryElasticsearchHttp.ROUTING_QUERY_INFO_STRATEGY, QueryElasticsearchHttp.QueryInfoRouteStrategy.NEVER.name());
+ runner.assertValid();
+
+ runner.setIncomingConnection(false);
+ runAndVerify(3,0,0,true);
+ }
+
+ @Test
+ public void testQueryElasticsearchOnTrigger_Hits_Always() throws IOException {
+ runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor(true));
+ runner.setValidateExpressionUsage(true);
+ runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+
+ runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+ runner.assertNotValid();
+ runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+ runner.assertNotValid();
+ runner.setProperty(QueryElasticsearchHttp.QUERY,
+ "source:Twitter AND identifier:\"${identifier}\"");
+ runner.assertValid();
+ runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
+ runner.assertValid();
+ runner.setProperty(QueryElasticsearchHttp.ROUTING_QUERY_INFO_STRATEGY, QueryElasticsearchHttp.QueryInfoRouteStrategy.ALWAYS.name());
+ runner.assertValid();
+
+ runner.setIncomingConnection(false);
+ runAndVerify(3,3,2,true);
+ }
+
+
+
+ private void runAndVerify(int expectedResults,int expectedQueryInfoResults,int expectedHits, boolean targetIsContent) {
+ runner.enqueue("blah".getBytes(), new HashMap<String, String>() {
+ {
+ put("identifier", "28039652140");
+ }
+ });
+
+ // Running once should page through the no hit doc
+ runner.run(1, true, true);
+
+ runner.assertTransferCount(QueryElasticsearchHttp.REL_QUERY_INFO, expectedQueryInfoResults);
+ if (expectedQueryInfoResults > 0) {
+ final MockFlowFile out = runner.getFlowFilesForRelationship(QueryElasticsearchHttp.REL_QUERY_INFO).get(0);
+ assertNotNull(out);
+ if (targetIsContent) {
+ out.assertAttributeEquals("es.query.hitcount", String.valueOf(expectedHits));
+ Assert.assertTrue(out.getAttribute("es.query.url").startsWith("http://127.0.0.1:9200/doc/status/_search?q=source:Twitter%20AND%20identifier:%22%22&size=2"));
+ }
+ }
+
+ runner.assertTransferCount(QueryElasticsearchHttp.REL_SUCCESS, expectedResults);
+ if (expectedResults > 0) {
+ final MockFlowFile out = runner.getFlowFilesForRelationship(QueryElasticsearchHttp.REL_SUCCESS).get(0);
+ assertNotNull(out);
+ if (targetIsContent) {
+ out.assertAttributeEquals("filename", "abc-97b-ASVsZu_" + "vShwtGCJpGOObmuSqUJRUC3L_-SEND-S3");
+ }
+ }
+ }
+
+ // By default, 3 files should go to Success
+ private void runAndVerify(boolean targetIsContent) {
+ runAndVerify(0,1,0, targetIsContent);
+ }
+
+
+
+ /**
+ * A Test class that extends the processor in order to inject/mock behavior
+ */
+ private static class QueryElasticsearchHttpTestProcessor extends QueryElasticsearchHttp {
+ Exception exceptionToThrow = null;
+ OkHttpClient client;
+ int goodStatusCode = 200;
+ String goodStatusMessage = "OK";
+
+ int badStatusCode;
+ String badStatusMessage;
+ int runNumber;
+
+ boolean useHitPages;
+
+ // query-page3 has no hits
+ List<String> noHitPages = Arrays.asList(getDoc("query-page3.json"));
+ List<String> hitPages = Arrays.asList(getDoc("query-page1.json"), getDoc("query-page2.json"),
+ getDoc("query-page3.json"));
+
+ String expectedParam = null;
+
+ public QueryElasticsearchHttpTestProcessor() {
+ this(false);
+ }
+ public QueryElasticsearchHttpTestProcessor(boolean useHitPages) {
+ this.useHitPages = useHitPages;
+ }
+
+ public void setExceptionToThrow(Exception exceptionToThrow) {
+ this.exceptionToThrow = exceptionToThrow;
+ }
+
+ /**
+ * Sets the status code and message for the 1st query
+ *
+ * @param code
+ * The status code to return
+ * @param message
+ * The status message
+ */
+ void setStatus(int code, String message) {
+ this.setStatus(code, message, 1);
+ }
+
+ /**
+ * Sets an query parameter (name=value) expected to be at the end of the URL for the query operation
+ *
+ * @param param
+ * The parameter to expect
+ */
+ void setExpectedParam(String param) {
+ expectedParam = param;
+ }
+
+ /**
+ * Sets the status code and message for the runNumber-th query
+ *
+ * @param code
+ * The status code to return
+ * @param message
+ * The status message
+ * @param runNumber
+ * The run number for which to set this status
+ */
+ void setStatus(int code, String message, int runNumber) {
+ badStatusCode = code;
+ badStatusMessage = message;
+ this.runNumber = runNumber;
+ }
+
+ @Override
+ protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
+ client = mock(OkHttpClient.class);
+
+ OngoingStubbing<Call> stub = when(client.newCall(any(Request.class)));
+ List<String> pages;
+ if(useHitPages) {
+ pages = hitPages;
+ } else {
+ pages = noHitPages;
+ }
+
+ for (int i = 0; i < pages.size(); i++) {
+ String page = pages.get(i);
+ if (runNumber == i + 1) {
+ stub = mockReturnDocument(stub, page, badStatusCode, badStatusMessage);
+ } else {
+ stub = mockReturnDocument(stub, page, goodStatusCode, goodStatusMessage);
+ }
+ }
+ }
+
+ private OngoingStubbing<Call> mockReturnDocument(OngoingStubbing<Call> stub,
+ final String document, int statusCode, String statusMessage) {
+ return stub.thenAnswer(new Answer<Call>() {
+
+ @Override
+ public Call answer(InvocationOnMock invocationOnMock) throws Throwable {
+ Request realRequest = (Request) invocationOnMock.getArguments()[0];
+ assertTrue((expectedParam == null) || (realRequest.url().toString().endsWith(expectedParam)));
+ Response mockResponse = new Response.Builder()
+ .request(realRequest)
+ .protocol(Protocol.HTTP_1_1)
+ .code(statusCode)
+ .message(statusMessage)
+ .body(ResponseBody.create(MediaType.parse("application/json"), document))
+ .build();
+ final Call call = mock(Call.class);
+ if (exceptionToThrow != null) {
+ when(call.execute()).thenThrow(exceptionToThrow);
+ } else {
+ when(call.execute()).thenReturn(mockResponse);
+ }
+ return call;
+ }
+ });
+ }
+
+ protected OkHttpClient getClient() {
+ return client;
+ }
+ }
+
+ private static String getDoc(String filename) {
+ try {
+ return IOUtils.toString(QueryElasticsearchHttp.class.getClassLoader().getResourceAsStream(filename), StandardCharsets.UTF_8);
+ } catch (IOException e) {
+ System.out.println("Error reading document " + filename);
+ return "";
+ }
+ }
+}
[2/2] nifi git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/nifi
Posted by mt...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/nifi
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5ca6261d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5ca6261d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5ca6261d
Branch: refs/heads/master
Commit: 5ca6261de009f57ed238bc30191e2f7404762f5c
Parents: 45bc1f1 0e736f5
Author: Mike Thomsen <mi...@gmail.com>
Authored: Mon Apr 23 15:39:20 2018 -0400
Committer: Mike Thomsen <mi...@gmail.com>
Committed: Mon Apr 23 15:39:20 2018 -0400
----------------------------------------------------------------------
.../nifi/controller/tasks/ConnectableTask.java | 46 +-
.../controller/tasks/TestConnectableTask.java | 93 +-
.../nifi-record-serialization-services/pom.xml | 20 +
.../java/org/apache/nifi/xml/XMLReader.java | 140 ++
.../org/apache/nifi/xml/XMLRecordReader.java | 568 +++++++
...org.apache.nifi.controller.ControllerService | 4 +-
.../additionalDetails.html | 433 ++++++
.../java/org/apache/nifi/xml/TestXMLReader.java | 160 ++
.../apache/nifi/xml/TestXMLReaderProcessor.java | 79 +
.../apache/nifi/xml/TestXMLRecordReader.java | 1436 ++++++++++++++++++
.../src/test/resources/xml/people.xml | 22 +
.../src/test/resources/xml/people2.xml | 12 +
.../src/test/resources/xml/people3.xml | 12 +
.../src/test/resources/xml/people_array.xml | 37 +
.../test/resources/xml/people_array_simple.xml | 28 +
.../src/test/resources/xml/people_cdata.xml | 22 +
.../src/test/resources/xml/people_complex1.xml | 33 +
.../src/test/resources/xml/people_complex2.xml | 73 +
.../src/test/resources/xml/people_empty.xml | 12 +
.../src/test/resources/xml/people_invalid.xml | 21 +
.../src/test/resources/xml/people_map.xml | 18 +
.../src/test/resources/xml/people_map2.xml | 32 +
.../src/test/resources/xml/people_namespace.xml | 22 +
.../src/test/resources/xml/people_nested.xml | 38 +
.../test/resources/xml/people_no_attributes.xml | 22 +
.../resources/xml/people_tag_in_characters.xml | 23 +
.../xml/people_with_header_and_comments.xml | 29 +
.../src/test/resources/xml/person.xml | 5 +
.../src/test/resources/xml/testschema | 11 +
.../src/test/resources/xml/testschema2 | 19 +
30 files changed, 3450 insertions(+), 20 deletions(-)
----------------------------------------------------------------------