You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2018/04/23 19:40:22 UTC

[1/2] nifi git commit: NIFI-3576 Support for QueryInfo relationship, can be used to track no-hits

Repository: nifi
Updated Branches:
  refs/heads/master 0e736f59f -> 5ca6261de


NIFI-3576 Support for QueryInfo relationship, can be used to track no-hits

Squashed commit includes related commit from GitHub user wietze.

This closes #2601

Signed-off-by: Mike Thomsen <mi...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/45bc1f1b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/45bc1f1b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/45bc1f1b

Branch: refs/heads/master
Commit: 45bc1f1b4f590db0e4a3cb76fcd4cd89ca1a06bf
Parents: 2799211
Author: Otto Fowler <ot...@gmail.com>
Authored: Mon Apr 2 13:16:39 2018 -0400
Committer: Mike Thomsen <mi...@gmail.com>
Committed: Mon Apr 23 15:38:27 2018 -0400

----------------------------------------------------------------------
 .../elasticsearch/QueryElasticsearchHttp.java   |  80 +++-
 .../TestQueryElasticsearchHttp.java             |  20 +
 .../TestQueryElasticsearchHttpNoHits.java       | 363 +++++++++++++++++++
 3 files changed, 452 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/45bc1f1b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
index 0f6ec46..15ac65d 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
@@ -16,7 +16,10 @@
  */
 package org.apache.nifi.processors.elasticsearch;
 
+import static org.apache.nifi.flowfile.attributes.CoreAttributes.MIME_TYPE;
+
 import com.fasterxml.jackson.databind.JsonNode;
+import java.util.Arrays;
 import okhttp3.HttpUrl;
 import okhttp3.OkHttpClient;
 import okhttp3.Response;
@@ -31,6 +34,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
@@ -69,8 +73,10 @@ import java.util.stream.Stream;
         + "To retrieve more records, use the ScrollElasticsearchHttp processor.")
 @WritesAttributes({
         @WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"),
+        @WritesAttribute(attribute = "es.query.hitcount", description = "The number of hits for a query"),
         @WritesAttribute(attribute = "es.id", description = "The Elasticsearch document identifier"),
         @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
+        @WritesAttribute(attribute = "es.query.url", description = "The Elasticsearch query that was built"),
         @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type"),
         @WritesAttribute(attribute = "es.result.*", description = "If Target is 'Flow file attributes', the JSON attributes of "
                 + "each result will be placed into corresponding attributes with this prefix.") })
@@ -81,12 +87,21 @@ import java.util.stream.Stream;
         description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing")
 public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
 
+    public enum QueryInfoRouteStrategy {
+        NEVER,
+        ALWAYS,
+        NOHIT
+    }
+
     private static final String FROM_QUERY_PARAM = "from";
 
     public static final String TARGET_FLOW_FILE_CONTENT = "Flow file content";
     public static final String TARGET_FLOW_FILE_ATTRIBUTES = "Flow file attributes";
     private static final String ATTRIBUTE_PREFIX = "es.result.";
 
+    static final AllowableValue ALWAYS = new AllowableValue(QueryInfoRouteStrategy.ALWAYS.name(), "Always", "Always route Query Info");
+    static final AllowableValue NEVER = new AllowableValue(QueryInfoRouteStrategy.NEVER.name(), "Never", "Never route Query Info");
+    static final AllowableValue NO_HITS = new AllowableValue(QueryInfoRouteStrategy.NOHIT.name(), "No Hits", "Route Query Info if the Query returns no hits");
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description(
@@ -107,6 +122,13 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
                             + "based on the processor properties and the results of the fetch operation.")
             .build();
 
+    public static final Relationship REL_QUERY_INFO = new Relationship.Builder()
+            .name("query-info")
+            .description(
+                    "Depending on the setting of the Routing Strategy for Query Info property, a FlowFile is routed to this relationship with " +
+                            "the incoming FlowFile's attributes (if present), the number of hits, and the Elasticsearch query")
+            .build();
+
     public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
             .name("query-es-query")
             .displayName("Query")
@@ -193,16 +215,21 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
-    private static final Set<Relationship> relationships;
+    public static final PropertyDescriptor ROUTING_QUERY_INFO_STRATEGY = new PropertyDescriptor.Builder()
+            .name("routing-query-info-strategy")
+            .displayName("Routing Strategy for Query Info")
+            .description("Specifies when to generate and route Query Info after a successful query")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .allowableValues(ALWAYS, NEVER, NO_HITS)
+            .defaultValue(NEVER.getValue())
+            .required(false)
+            .build();
+
+    private volatile Set<Relationship> relationships = new HashSet<>(Arrays.asList(new Relationship[] {REL_SUCCESS, REL_FAILURE, REL_RETRY}));
     private static final List<PropertyDescriptor> propertyDescriptors;
+    private QueryInfoRouteStrategy queryInfoRouteStrategy = QueryInfoRouteStrategy.NEVER;
 
     static {
-        final Set<Relationship> _rels = new HashSet<>();
-        _rels.add(REL_SUCCESS);
-        _rels.add(REL_FAILURE);
-        _rels.add(REL_RETRY);
-        relationships = Collections.unmodifiableSet(_rels);
-
         final List<PropertyDescriptor> descriptors = new ArrayList<>();
         descriptors.add(ES_URL);
         descriptors.add(PROP_SSL_CONTEXT_SERVICE);
@@ -218,6 +245,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         descriptors.add(SORT);
         descriptors.add(LIMIT);
         descriptors.add(TARGET);
+        descriptors.add(ROUTING_QUERY_INFO_STRATEGY);
 
         propertyDescriptors = Collections.unmodifiableList(descriptors);
     }
@@ -238,6 +266,23 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
     }
 
     @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+
+        if (ROUTING_QUERY_INFO_STRATEGY.equals(descriptor)) {
+            final Set<Relationship> relationshipSet = new HashSet<>();
+            relationshipSet.add(REL_SUCCESS);
+            relationshipSet.add(REL_FAILURE);
+            relationshipSet.add(REL_RETRY);
+
+            if (ALWAYS.getValue().equalsIgnoreCase(newValue) || NO_HITS.getValue().equalsIgnoreCase(newValue)) {
+                relationshipSet.add(REL_QUERY_INFO);
+            }
+            this.queryInfoRouteStrategy = QueryInfoRouteStrategy.valueOf(newValue);
+            this.relationships = relationshipSet;
+        }
+    }
+
+    @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session)
             throws ProcessException {
 
@@ -281,7 +326,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         final ComponentLog logger = getLogger();
 
         int fromIndex = 0;
-        int numResults;
+        int numResults = 0;
 
         try {
             logger.debug("Querying {}/{} from Elasticsearch: {}", new Object[] { index, docType,
@@ -305,10 +350,11 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
                 final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
                         username, password, "GET", null);
                 numResults = this.getPage(getResponse, queryUrl, context, session, flowFile,
-                        logger, startNanos, targetIsContent);
+                        logger, startNanos, targetIsContent, numResults);
                 fromIndex += pageSize;
                 getResponse.close();
-            } while (numResults > 0 && !hitLimit);
+            }
+            while (numResults > 0 && !hitLimit);
 
             if (flowFile != null) {
                 session.remove(flowFile);
@@ -341,7 +387,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
 
     private int getPage(final Response getResponse, final URL url, final ProcessContext context,
             final ProcessSession session, FlowFile flowFile, final ComponentLog logger,
-            final long startNanos, boolean targetIsContent)
+            final long startNanos, boolean targetIsContent, int priorResultCount)
             throws IOException {
         List<FlowFile> page = new ArrayList<>();
         final int statusCode = getResponse.code();
@@ -352,6 +398,17 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
             JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes));
             JsonNode hits = responseJson.get("hits").get("hits");
 
+            // if there are no hits, and there have never been any hits in this run ( priorResultCount ) and
+            // we are in NOHIT or ALWAYS, send the query info
+            if ( (hits.size() == 0 && priorResultCount == 0 && queryInfoRouteStrategy == QueryInfoRouteStrategy.NOHIT)
+                    || queryInfoRouteStrategy == QueryInfoRouteStrategy.ALWAYS) {
+                FlowFile queryInfo = flowFile == null ? session.create() : session.create(flowFile);
+                session.putAttribute(queryInfo, "es.query.url", url.toExternalForm());
+                session.putAttribute(queryInfo, "es.query.hitcount", String.valueOf(hits.size()));
+                session.putAttribute(queryInfo, MIME_TYPE.key(), "application/json");
+                session.transfer(queryInfo,REL_QUERY_INFO);
+            }
+
             for(int i = 0; i < hits.size(); i++) {
                 JsonNode hit = hits.get(i);
                 String retrievedId = hit.get("_id").asText();
@@ -369,6 +426,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
                 documentFlowFile = session.putAttribute(documentFlowFile, "es.id", retrievedId);
                 documentFlowFile = session.putAttribute(documentFlowFile, "es.index", retrievedIndex);
                 documentFlowFile = session.putAttribute(documentFlowFile, "es.type", retrievedType);
+                documentFlowFile = session.putAttribute(documentFlowFile, "es.query.url", url.toExternalForm());
 
                 if (targetIsContent) {
                     documentFlowFile = session.putAttribute(documentFlowFile, "filename", retrievedId);

http://git-wip-us.apache.org/repos/asf/nifi/blob/45bc1f1b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
index 1b07d22..2863264 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
@@ -78,6 +78,25 @@ public class TestQueryElasticsearchHttp {
     }
 
     @Test
+    public void testQueryElasticsearchOnTrigger_withInput_withQueryInAttrs() throws IOException {
+        runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+
+        runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+        runner.assertNotValid();
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+        runner.assertNotValid();
+        runner.setProperty(QueryElasticsearchHttp.QUERY,
+                "source:Twitter AND identifier:\"${identifier}\"");
+        runner.assertValid();
+        runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
+        runner.assertValid();
+
+        runAndVerifySuccess(true);
+    }
+
+    @Test
     public void testQueryElasticsearchOnTrigger_withInput_EL() throws IOException {
         runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}");
@@ -161,6 +180,7 @@ public class TestQueryElasticsearchHttp {
         out.assertAttributeEquals("filename", "abc-97b-ASVsZu_"
                 + "vShwtGCJpGOObmuSqUJRUC3L_-SEND-S3");
         }
+        out.assertAttributeExists("es.query.url");
     }
 
     // By default, 3 files should go to Success

http://git-wip-us.apache.org/repos/asf/nifi/blob/45bc1f1b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttpNoHits.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttpNoHits.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttpNoHits.java
new file mode 100644
index 0000000..862aead
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttpNoHits.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import okhttp3.Call;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Protocol;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mockito.stubbing.OngoingStubbing;
+
+public class TestQueryElasticsearchHttpNoHits {
+
+        private TestRunner runner;
+
+        @After
+        public void teardown() {
+                runner = null;
+        }
+
+
+        @Test
+        public void testQueryElasticsearchOnTrigger_NoHits_NoHits() throws IOException {
+                runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
+                runner.setValidateExpressionUsage(true);
+                runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+
+                runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+                runner.assertNotValid();
+                runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+                runner.assertNotValid();
+                runner.setProperty(QueryElasticsearchHttp.QUERY,
+                        "source:Twitter AND identifier:\"${identifier}\"");
+                runner.assertValid();
+                runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
+                runner.assertValid();
+                runner.setProperty(QueryElasticsearchHttp.ROUTING_QUERY_INFO_STRATEGY, QueryElasticsearchHttp.QueryInfoRouteStrategy.NOHIT.name());
+                runner.assertValid();
+
+                runner.setIncomingConnection(false);
+                runAndVerify(0,1,0,true);
+        }
+
+        @Test
+        public void testQueryElasticsearchOnTrigger_NoHits_Never() throws IOException {
+                runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
+                runner.setValidateExpressionUsage(true);
+                runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+
+                runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+                runner.assertNotValid();
+                runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+                runner.assertNotValid();
+                runner.setProperty(QueryElasticsearchHttp.QUERY,
+                        "source:Twitter AND identifier:\"${identifier}\"");
+                runner.assertValid();
+                runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
+                runner.assertValid();
+                runner.setProperty(QueryElasticsearchHttp.ROUTING_QUERY_INFO_STRATEGY, QueryElasticsearchHttp.QueryInfoRouteStrategy.NEVER.name());
+                runner.assertValid();
+
+                runner.setIncomingConnection(false);
+                runAndVerify(0,0,0,true);
+        }
+
+        @Test
+        public void testQueryElasticsearchOnTrigger_NoHits_Always() throws IOException {
+                runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
+                runner.setValidateExpressionUsage(true);
+                runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+
+                runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+                runner.assertNotValid();
+                runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+                runner.assertNotValid();
+                runner.setProperty(QueryElasticsearchHttp.QUERY,
+                        "source:Twitter AND identifier:\"${identifier}\"");
+                runner.assertValid();
+                runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
+                runner.assertValid();
+                runner.setProperty(QueryElasticsearchHttp.ROUTING_QUERY_INFO_STRATEGY, QueryElasticsearchHttp.QueryInfoRouteStrategy.ALWAYS.name());
+                runner.assertValid();
+
+                runner.setIncomingConnection(false);
+                runAndVerify(0,1,0,true);
+        }
+
+        @Test
+        public void testQueryElasticsearchOnTrigger_Hits_NoHits() throws IOException {
+                runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor(true));
+                runner.setValidateExpressionUsage(true);
+                runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+
+                runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+                runner.assertNotValid();
+                runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+                runner.assertNotValid();
+                runner.setProperty(QueryElasticsearchHttp.QUERY,
+                        "source:Twitter AND identifier:\"${identifier}\"");
+                runner.assertValid();
+                runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
+                runner.assertValid();
+                runner.setProperty(QueryElasticsearchHttp.ROUTING_QUERY_INFO_STRATEGY, QueryElasticsearchHttp.QueryInfoRouteStrategy.NOHIT.name());
+                runner.assertValid();
+
+                runner.setIncomingConnection(false);
+                runAndVerify(3,0,0,true);
+        }
+
+        @Test
+        public void testQueryElasticsearchOnTrigger_Hits_Never() throws IOException {
+                runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor(true));
+                runner.setValidateExpressionUsage(true);
+                runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+
+                runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+                runner.assertNotValid();
+                runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+                runner.assertNotValid();
+                runner.setProperty(QueryElasticsearchHttp.QUERY,
+                        "source:Twitter AND identifier:\"${identifier}\"");
+                runner.assertValid();
+                runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
+                runner.assertValid();
+                runner.setProperty(QueryElasticsearchHttp.ROUTING_QUERY_INFO_STRATEGY, QueryElasticsearchHttp.QueryInfoRouteStrategy.NEVER.name());
+                runner.assertValid();
+
+                runner.setIncomingConnection(false);
+                runAndVerify(3,0,0,true);
+        }
+
+        @Test
+        public void testQueryElasticsearchOnTrigger_Hits_Always() throws IOException {
+                runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor(true));
+                runner.setValidateExpressionUsage(true);
+                runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+
+                runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+                runner.assertNotValid();
+                runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+                runner.assertNotValid();
+                runner.setProperty(QueryElasticsearchHttp.QUERY,
+                        "source:Twitter AND identifier:\"${identifier}\"");
+                runner.assertValid();
+                runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
+                runner.assertValid();
+                runner.setProperty(QueryElasticsearchHttp.ROUTING_QUERY_INFO_STRATEGY, QueryElasticsearchHttp.QueryInfoRouteStrategy.ALWAYS.name());
+                runner.assertValid();
+
+                runner.setIncomingConnection(false);
+                runAndVerify(3,3,2,true);
+        }
+
+
+
+        private void runAndVerify(int expectedResults,int expectedQueryInfoResults,int expectedHits, boolean targetIsContent) {
+                runner.enqueue("blah".getBytes(), new HashMap<String, String>() {
+                        {
+                                put("identifier", "28039652140");
+                        }
+                });
+
+                // Running once should page through the no hit doc
+                runner.run(1, true, true);
+
+                runner.assertTransferCount(QueryElasticsearchHttp.REL_QUERY_INFO, expectedQueryInfoResults);
+                if (expectedQueryInfoResults > 0) {
+                        final MockFlowFile out = runner.getFlowFilesForRelationship(QueryElasticsearchHttp.REL_QUERY_INFO).get(0);
+                        assertNotNull(out);
+                        if (targetIsContent) {
+                                out.assertAttributeEquals("es.query.hitcount", String.valueOf(expectedHits));
+                                Assert.assertTrue(out.getAttribute("es.query.url").startsWith("http://127.0.0.1:9200/doc/status/_search?q=source:Twitter%20AND%20identifier:%22%22&size=2"));
+                        }
+                }
+
+                runner.assertTransferCount(QueryElasticsearchHttp.REL_SUCCESS, expectedResults);
+                if (expectedResults > 0) {
+                        final MockFlowFile out = runner.getFlowFilesForRelationship(QueryElasticsearchHttp.REL_SUCCESS).get(0);
+                        assertNotNull(out);
+                        if (targetIsContent) {
+                                out.assertAttributeEquals("filename", "abc-97b-ASVsZu_" + "vShwtGCJpGOObmuSqUJRUC3L_-SEND-S3");
+                        }
+                }
+        }
+
+        // By default, 3 files should go to Success
+        private void runAndVerify(boolean targetIsContent) {
+                runAndVerify(0,1,0, targetIsContent);
+        }
+
+
+
+        /**
+         * A Test class that extends the processor in order to inject/mock behavior
+         */
+        private static class QueryElasticsearchHttpTestProcessor extends QueryElasticsearchHttp {
+                Exception exceptionToThrow = null;
+                OkHttpClient client;
+                int goodStatusCode = 200;
+                String goodStatusMessage = "OK";
+
+                int badStatusCode;
+                String badStatusMessage;
+                int runNumber;
+
+                boolean useHitPages;
+
+                // query-page3 has no hits
+                List<String> noHitPages = Arrays.asList(getDoc("query-page3.json"));
+                List<String> hitPages = Arrays.asList(getDoc("query-page1.json"), getDoc("query-page2.json"),
+                        getDoc("query-page3.json"));
+
+                String expectedParam = null;
+
+                public QueryElasticsearchHttpTestProcessor() {
+                        this(false);
+                }
+                public QueryElasticsearchHttpTestProcessor(boolean useHitPages) {
+                        this.useHitPages = useHitPages;
+                }
+
+                public void setExceptionToThrow(Exception exceptionToThrow) {
+                        this.exceptionToThrow = exceptionToThrow;
+                }
+
+                /**
+                 * Sets the status code and message for the 1st query
+                 *
+                 * @param code
+                 *            The status code to return
+                 * @param message
+                 *            The status message
+                 */
+                void setStatus(int code, String message) {
+                        this.setStatus(code, message, 1);
+                }
+
+                /**
+                 * Sets an query parameter (name=value) expected to be at the end of the URL for the query operation
+                 *
+                 * @param param
+                 *            The parameter to expect
+                 */
+                void setExpectedParam(String param) {
+                        expectedParam = param;
+                }
+
+                /**
+                 * Sets the status code and message for the runNumber-th query
+                 *
+                 * @param code
+                 *            The status code to return
+                 * @param message
+                 *            The status message
+                 * @param runNumber
+                 *            The run number for which to set this status
+                 */
+                void setStatus(int code, String message, int runNumber) {
+                        badStatusCode = code;
+                        badStatusMessage = message;
+                        this.runNumber = runNumber;
+                }
+
+                @Override
+                protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
+                        client = mock(OkHttpClient.class);
+
+                        OngoingStubbing<Call> stub = when(client.newCall(any(Request.class)));
+                        List<String> pages;
+                        if(useHitPages) {
+                                pages = hitPages;
+                        } else {
+                                pages = noHitPages;
+                        }
+
+                        for (int i = 0; i < pages.size(); i++) {
+                                String page = pages.get(i);
+                                if (runNumber == i + 1) {
+                                        stub = mockReturnDocument(stub, page, badStatusCode, badStatusMessage);
+                                } else {
+                                        stub = mockReturnDocument(stub, page, goodStatusCode, goodStatusMessage);
+                                }
+                        }
+                }
+
+                private OngoingStubbing<Call> mockReturnDocument(OngoingStubbing<Call> stub,
+                        final String document, int statusCode, String statusMessage) {
+                        return stub.thenAnswer(new Answer<Call>() {
+
+                                @Override
+                                public Call answer(InvocationOnMock invocationOnMock) throws Throwable {
+                                        Request realRequest = (Request) invocationOnMock.getArguments()[0];
+                                        assertTrue((expectedParam == null) || (realRequest.url().toString().endsWith(expectedParam)));
+                                        Response mockResponse = new Response.Builder()
+                                                .request(realRequest)
+                                                .protocol(Protocol.HTTP_1_1)
+                                                .code(statusCode)
+                                                .message(statusMessage)
+                                                .body(ResponseBody.create(MediaType.parse("application/json"), document))
+                                                .build();
+                                        final Call call = mock(Call.class);
+                                        if (exceptionToThrow != null) {
+                                                when(call.execute()).thenThrow(exceptionToThrow);
+                                        } else {
+                                                when(call.execute()).thenReturn(mockResponse);
+                                        }
+                                        return call;
+                                }
+                        });
+                }
+
+                protected OkHttpClient getClient() {
+                        return client;
+                }
+        }
+
+        private static String getDoc(String filename) {
+                try {
+                        return IOUtils.toString(QueryElasticsearchHttp.class.getClassLoader().getResourceAsStream(filename), StandardCharsets.UTF_8);
+                } catch (IOException e) {
+                        System.out.println("Error reading document " + filename);
+                        return "";
+                }
+        }
+}


[2/2] nifi git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/nifi

Posted by mt...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/nifi


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5ca6261d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5ca6261d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5ca6261d

Branch: refs/heads/master
Commit: 5ca6261de009f57ed238bc30191e2f7404762f5c
Parents: 45bc1f1 0e736f5
Author: Mike Thomsen <mi...@gmail.com>
Authored: Mon Apr 23 15:39:20 2018 -0400
Committer: Mike Thomsen <mi...@gmail.com>
Committed: Mon Apr 23 15:39:20 2018 -0400

----------------------------------------------------------------------
 .../nifi/controller/tasks/ConnectableTask.java  |   46 +-
 .../controller/tasks/TestConnectableTask.java   |   93 +-
 .../nifi-record-serialization-services/pom.xml  |   20 +
 .../java/org/apache/nifi/xml/XMLReader.java     |  140 ++
 .../org/apache/nifi/xml/XMLRecordReader.java    |  568 +++++++
 ...org.apache.nifi.controller.ControllerService |    4 +-
 .../additionalDetails.html                      |  433 ++++++
 .../java/org/apache/nifi/xml/TestXMLReader.java |  160 ++
 .../apache/nifi/xml/TestXMLReaderProcessor.java |   79 +
 .../apache/nifi/xml/TestXMLRecordReader.java    | 1436 ++++++++++++++++++
 .../src/test/resources/xml/people.xml           |   22 +
 .../src/test/resources/xml/people2.xml          |   12 +
 .../src/test/resources/xml/people3.xml          |   12 +
 .../src/test/resources/xml/people_array.xml     |   37 +
 .../test/resources/xml/people_array_simple.xml  |   28 +
 .../src/test/resources/xml/people_cdata.xml     |   22 +
 .../src/test/resources/xml/people_complex1.xml  |   33 +
 .../src/test/resources/xml/people_complex2.xml  |   73 +
 .../src/test/resources/xml/people_empty.xml     |   12 +
 .../src/test/resources/xml/people_invalid.xml   |   21 +
 .../src/test/resources/xml/people_map.xml       |   18 +
 .../src/test/resources/xml/people_map2.xml      |   32 +
 .../src/test/resources/xml/people_namespace.xml |   22 +
 .../src/test/resources/xml/people_nested.xml    |   38 +
 .../test/resources/xml/people_no_attributes.xml |   22 +
 .../resources/xml/people_tag_in_characters.xml  |   23 +
 .../xml/people_with_header_and_comments.xml     |   29 +
 .../src/test/resources/xml/person.xml           |    5 +
 .../src/test/resources/xml/testschema           |   11 +
 .../src/test/resources/xml/testschema2          |   19 +
 30 files changed, 3450 insertions(+), 20 deletions(-)
----------------------------------------------------------------------