You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by gresockj <gi...@git.apache.org> on 2016/07/28 11:52:40 UTC

[GitHub] nifi pull request #733: NIFI-2417: Implementing QueryElasticsearchHttp and S...

GitHub user gresockj opened a pull request:

    https://github.com/apache/nifi/pull/733

    NIFI-2417: Implementing QueryElasticsearchHttp and ScrollElasticsearchHttp

    I have implemented these processors for my own project, and thought it might be useful to submit them to NiFi.  They are based on FetchElasticsearchHttp, and have the following execution designs:
    
    - QueryElasticsearchHttp - submits an ES query and pages through the results in a single execution, emitting one flow file per document.  Allows both flow file input (in case the flow file has an attribute with the query to run) and non-input execution.
    - ScrollElasticsearchHttp - submits an ES query and uses the scroll API to scroll through the results.  The scroll_id for each respective page is kept in the state management for the processor, and each subsequent execution of the processor emits a single page of documents as a flow file.  We found this to be the most efficient way to scroll through a huge result set, as in the case of reindexing Elasticsearch, without losing our place if NiFi goes down.  The only quirky thing is that the processor state must be cleared before another query can be run, but this is documented in the processor, and jives with the use case of only being needed for rare events like a reindex.
    
    Since the processors already work correctly in our system, I am no longer authorized to put time into making major modifications to the code.  As a result, if any re-designs of this code is desired, I will be unable to put time toward it.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/gresockj/nifi NIFI-2417

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/733.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #733
    
----
commit 5bbe09e2a7c4689bfa01588260ea89d2375e8356
Author: Joe Gresock <jo...@lmco.com>
Date:   2016-07-28T11:44:29Z

    NIFI-2417: Implementing QueryElasticsearchHttp and ScrollElasticsearchHttp

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #733: NIFI-2417: Implementing QueryElasticsearchHttp and S...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/733#discussion_r79832704
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java ---
    @@ -0,0 +1,410 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.elasticsearch;
    +
    +import java.io.IOException;
    +import java.net.MalformedURLException;
    +import java.net.URL;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.ByteArrayInputStream;
    +import org.codehaus.jackson.JsonNode;
    +
    +import okhttp3.HttpUrl;
    +import okhttp3.OkHttpClient;
    +import okhttp3.Response;
    +import okhttp3.ResponseBody;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@EventDriven
    +@SupportsBatching
    +@Tags({ "elasticsearch", "query", "read", "get", "http" })
    +@CapabilityDescription("Queries Elasticsearch using the specified connection properties. "
    +        + "Note that the full body of each page of documents will be read into memory before being "
    +        + "written to Flow Files for transfer.  Also note that the Elasticsearch max_result_window index "
    +        + "setting is the upper bound on the number of records that can be retrieved using this query.  "
    +        + "To retrieve more records, use the ScrollElasticsearchHttp processor.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"),
    +        @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
    +        @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type"),
    +        @WritesAttribute(attribute = "es.result.*", description = "If Target is 'Flow file attributes', the JSON attributes of "
    +                + "each result will be placed into corresponding attributes with this prefix.") })
    +public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
    +
    +    private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
    +    private static final String QUERY_QUERY_PARAM = "q";
    +    private static final String SORT_QUERY_PARAM = "sort";
    +    private static final String FROM_QUERY_PARAM = "from";
    +    private static final String SIZE_QUERY_PARAM = "size";
    +
    +    public static final String TARGET_FLOW_FILE_CONTENT = "Flow file content";
    +    public static final String TARGET_FLOW_FILE_ATTRIBUTES = "Flow file attributes";
    +    private static final String ATTRIBUTE_PREFIX = "es.result.";
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description(
    +                    "All FlowFiles that are read from Elasticsearch are routed to this relationship.")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description(
    +                    "All FlowFiles that cannot be read from Elasticsearch are routed to this relationship. Note that only incoming "
    +                            + "flow files will be routed to failure.").build();
    +
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description(
    +                    "A FlowFile is routed to this relationship if the document cannot be fetched but attempting the operation again may "
    +                            + "succeed. Note that if the processor has no incoming connections, flow files may still be sent to this relationship "
    +                            + "based on the processor properties and the results of the fetch operation.")
    +            .build();
    +
    +    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
    +            .name("query-es-query").displayName("Query")
    +            .description("The Lucene-style query to run against ElasticSearch").required(true)
    --- End diff --
    
    Might be good to have a trivial example of a query here (see my comment on the other processor below)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #733: NIFI-2417: Implementing QueryElasticsearchHttp and ScrollEl...

Posted by gresockj <gi...@git.apache.org>.
Github user gresockj commented on the issue:

    https://github.com/apache/nifi/pull/733
  
    Thanks for the help!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #733: NIFI-2417: Implementing QueryElasticsearchHttp and S...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/733#discussion_r79827737
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java ---
    @@ -0,0 +1,410 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.elasticsearch;
    +
    +import java.io.IOException;
    +import java.net.MalformedURLException;
    +import java.net.URL;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.ByteArrayInputStream;
    +import org.codehaus.jackson.JsonNode;
    +
    +import okhttp3.HttpUrl;
    +import okhttp3.OkHttpClient;
    +import okhttp3.Response;
    +import okhttp3.ResponseBody;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@EventDriven
    +@SupportsBatching
    +@Tags({ "elasticsearch", "query", "read", "get", "http" })
    +@CapabilityDescription("Queries Elasticsearch using the specified connection properties. "
    +        + "Note that the full body of each page of documents will be read into memory before being "
    +        + "written to Flow Files for transfer.  Also note that the Elasticsearch max_result_window index "
    +        + "setting is the upper bound on the number of records that can be retrieved using this query.  "
    +        + "To retrieve more records, use the ScrollElasticsearchHttp processor.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"),
    +        @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
    +        @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type"),
    +        @WritesAttribute(attribute = "es.result.*", description = "If Target is 'Flow file attributes', the JSON attributes of "
    +                + "each result will be placed into corresponding attributes with this prefix.") })
    +public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
    +
    +    private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
    +    private static final String QUERY_QUERY_PARAM = "q";
    +    private static final String SORT_QUERY_PARAM = "sort";
    +    private static final String FROM_QUERY_PARAM = "from";
    +    private static final String SIZE_QUERY_PARAM = "size";
    +
    +    public static final String TARGET_FLOW_FILE_CONTENT = "Flow file content";
    +    public static final String TARGET_FLOW_FILE_ATTRIBUTES = "Flow file attributes";
    +    private static final String ATTRIBUTE_PREFIX = "es.result.";
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description(
    +                    "All FlowFiles that are read from Elasticsearch are routed to this relationship.")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description(
    +                    "All FlowFiles that cannot be read from Elasticsearch are routed to this relationship. Note that only incoming "
    +                            + "flow files will be routed to failure.").build();
    +
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description(
    +                    "A FlowFile is routed to this relationship if the document cannot be fetched but attempting the operation again may "
    +                            + "succeed. Note that if the processor has no incoming connections, flow files may still be sent to this relationship "
    +                            + "based on the processor properties and the results of the fetch operation.")
    +            .build();
    +
    +    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
    +            .name("query-es-query").displayName("Query")
    +            .description("The Lucene-style query to run against ElasticSearch").required(true)
    +            .expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +            .name("query-es-index").displayName("Index")
    +            .description("The name of the index to read from").required(true)
    +            .expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +            .name("query-es-type")
    +            .displayName("Type")
    +            .description(
    +                    "The (optional) type of this document, used by Elasticsearch for indexing and searching. If the property is empty or set "
    +                            + "to _all, the first document matching the identifier across all types will be retrieved.")
    +            .required(false).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    +
    +    public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
    +            .name("query-es-fields")
    +            .displayName("Fields")
    +            .description(
    +                    "A comma-separated list of fields to retrieve from the document. If the Fields property is left blank, "
    +                            + "then the entire document's source will be retrieved.")
    +            .required(false).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    +
    +    public static final PropertyDescriptor SORT = new PropertyDescriptor.Builder()
    +            .name("query-es-sort")
    +            .displayName("Sort")
    +            .description(
    +                    "A sort parameter (e.g., timestamp:asc). If the Sort property is left blank, "
    +                            + "then the results will be retrieved in document order.")
    +            .required(false).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    +
    +    public static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder()
    +            .name("query-es-size").displayName("Page Size").defaultValue("20")
    +            .description("Determines how many documents to return per page during scrolling.")
    +            .required(true).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    +
    +    public static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
    +            .name("query-es-limit").displayName("Limit")
    +            .description("If set, limits the number of results that will be returned.")
    +            .required(false).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    +
    +    public static final PropertyDescriptor TARGET = new PropertyDescriptor.Builder()
    +            .name("query-es-target")
    +            .displayName("Target")
    +            .description(
    +                    "Indicates where the results should be placed.  In the case of 'Flow file content', the JSON "
    +                            + "response will be written as the content of the flow file.  In the case of 'Flow file attributes', "
    +                            + "the original flow file (if applicable) will be cloned for each result, and all return fields will be placed "
    +                            + "in a flow file attribute of the same name, but prefixed by 'es.result.'")
    +            .required(true).expressionLanguageSupported(false)
    +            .defaultValue(TARGET_FLOW_FILE_CONTENT)
    +            .allowableValues(TARGET_FLOW_FILE_CONTENT, TARGET_FLOW_FILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        relationships.add(REL_RETRY);
    +        return Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        descriptors.add(ES_URL);
    +        descriptors.add(PROP_SSL_CONTEXT_SERVICE);
    +        descriptors.add(USERNAME);
    +        descriptors.add(PASSWORD);
    +        descriptors.add(CONNECT_TIMEOUT);
    +        descriptors.add(RESPONSE_TIMEOUT);
    +        descriptors.add(QUERY);
    +        descriptors.add(PAGE_SIZE);
    +        descriptors.add(INDEX);
    +        descriptors.add(TYPE);
    +        descriptors.add(FIELDS);
    +        descriptors.add(SORT);
    +        descriptors.add(LIMIT);
    +        descriptors.add(TARGET);
    +
    +        return Collections.unmodifiableList(descriptors);
    +    }
    +
    +    @OnScheduled
    +    public void setup(ProcessContext context) {
    +        super.setup(context);
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session)
    +            throws ProcessException {
    +
    +        FlowFile flowFile = null;
    +        if (context.hasIncomingConnection()) {
    +            flowFile = session.get();
    +
    +            // If we have no FlowFile, and all incoming connections are self-loops then we can
    +            // continue on.
    +            // However, if we have no FlowFile and we have connections coming from other Processors,
    +            // then
    +            // we know that we should run only if we have a FlowFile.
    +            if (flowFile == null && context.hasNonLoopConnection()) {
    +                return;
    +            }
    +        }
    +
    +        OkHttpClient okHttpClient = getClient();
    +
    +        final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile)
    +                .getValue();
    +        final String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile)
    +                .getValue();
    +        final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile)
    +                .getValue();
    +        final int pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile)
    +                .asInteger().intValue();
    +        final Integer limit = context.getProperty(LIMIT).isSet() ? context.getProperty(LIMIT)
    +                .evaluateAttributeExpressions(flowFile).asInteger().intValue() : null;
    +        final String fields = context.getProperty(FIELDS).isSet() ? context.getProperty(FIELDS)
    +                .evaluateAttributeExpressions(flowFile).getValue() : null;
    +        final String sort = context.getProperty(SORT).isSet() ? context.getProperty(SORT)
    +                .evaluateAttributeExpressions(flowFile).getValue() : null;
    +        final boolean targetIsContent = context.getProperty(TARGET).getValue()
    +                .equals(TARGET_FLOW_FILE_CONTENT);
    +
    +        // Authentication
    +        final String username = context.getProperty(USERNAME).getValue();
    +        final String password = context.getProperty(PASSWORD).getValue();
    +
    +        final ComponentLog logger = getLogger();
    +
    +        int fromIndex = 0;
    +        int numResults;
    +
    +        try {
    +            logger.debug("Querying {}/{} from Elasticsearch: {}", new Object[] { index, docType,
    +                    query });
    +
    +            final long startNanos = System.nanoTime();
    +            // read the url property from the context
    +            final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).getValue());
    +
    +            boolean hitLimit = false;
    +            do {
    +                int mPageSize = pageSize;
    +                if (limit != null && limit <= (fromIndex + pageSize)) {
    +                    mPageSize = limit - fromIndex;
    +                    hitLimit = true;
    +                }
    +
    +                final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort,
    +                        mPageSize, fromIndex);
    +
    +                final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
    +                        username, password, "GET", null);
    +                numResults = this.getPage(getResponse, queryUrl, context, session, flowFile,
    +                        logger, startNanos, targetIsContent);
    +                fromIndex += pageSize;
    +            } while (numResults > 0 && !hitLimit);
    +
    +            if (flowFile != null) {
    +                session.remove(flowFile);
    +            }
    +        } catch (IOException ioe) {
    +            logger.error(
    +                    "Failed to read from Elasticsearch due to {}, this may indicate an error in configuration "
    +                            + "(hosts, username/password, etc.). Routing to retry",
    +                    new Object[] { ioe.getLocalizedMessage() }, ioe);
    +            if (flowFile != null) {
    +                session.transfer(flowFile, REL_RETRY);
    +            }
    +            context.yield();
    +
    +        } catch (RetryableException e) {
    +            logger.error(e.getMessage(), new Object[] { e.getLocalizedMessage() }, e);
    +            if (flowFile != null) {
    +                session.transfer(flowFile, REL_RETRY);
    +            }
    +            context.yield();
    +        } catch (Exception e) {
    +            logger.error("Failed to read {} from Elasticsearch due to {}", new Object[] { flowFile,
    +                    e.getLocalizedMessage() }, e);
    +            if (flowFile != null) {
    +                session.transfer(flowFile, REL_FAILURE);
    +            }
    +            context.yield();
    +        }
    +    }
    +
    +    private int getPage(final Response getResponse, final URL url, final ProcessContext context,
    +            final ProcessSession session, FlowFile flowFile, final ComponentLog logger,
    +            final long startNanos, boolean targetIsContent)
    +            throws IOException {
    +        List<FlowFile> page = new ArrayList<>();
    +        final int statusCode = getResponse.code();
    +
    +        if (isSuccess(statusCode)) {
    +            ResponseBody body = getResponse.body();
    +            final byte[] bodyBytes = body.bytes();
    +            JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes));
    +            JsonNode hits = responseJson.get("hits").get("hits");
    +
    +            for(int i = 0; i < hits.size(); i++) {
    +                JsonNode hit = hits.get(i);
    +                String retrievedId = hit.get("_id").asText();
    +                String retrievedIndex = hit.get("_index").asText();
    +                String retrievedType = hit.get("_type").asText();
    +
    +                FlowFile documentFlowFile = null;
    +                if (flowFile != null) {
    +                    documentFlowFile = targetIsContent ? session.create(flowFile) : session.clone(flowFile);
    +                } else {
    +                    documentFlowFile = session.create();
    +                }
    +
    +                JsonNode source = hit.get("_source");
    +                documentFlowFile = session.putAttribute(documentFlowFile, "es.index", retrievedIndex);
    +                documentFlowFile = session.putAttribute(documentFlowFile, "es.type", retrievedType);
    +
    +                if (targetIsContent) {
    +                    documentFlowFile = session.putAttribute(documentFlowFile, "filename", retrievedId);
    +                    documentFlowFile = session.write(documentFlowFile, out -> {
    --- End diff --
    
    Since _source is a JSON object, I recommend setting the mime.type attribute to application/json when the target is content.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #733: NIFI-2417: Implementing QueryElasticsearchHttp and S...

Posted by gresockj <gi...@git.apache.org>.
Github user gresockj commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/733#discussion_r80111550
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java ---
    @@ -0,0 +1,415 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.elasticsearch;
    +
    +import java.io.IOException;
    +import java.net.MalformedURLException;
    +import java.net.URL;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.ByteArrayInputStream;
    +import org.codehaus.jackson.JsonNode;
    +
    +import okhttp3.HttpUrl;
    +import okhttp3.OkHttpClient;
    +import okhttp3.Response;
    +import okhttp3.ResponseBody;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@EventDriven
    +@SupportsBatching
    +@Tags({ "elasticsearch", "query", "scroll", "read", "get", "http" })
    +@CapabilityDescription("Scrolls through an Elasticsearch query using the specified connection properties. "
    +        + "This processor is intended to be run on the primary node, and is designed for scrolling through "
    +        + "huge result sets, as in the case of a reindex.  The state must be cleared before another query "
    +        + "can be run.  Each page of results is returned, wrapped in a JSON object like so: { \"hits\" : [ <doc1>, <doc2>, <docn> ] }.  "
    +        + "Note that the full body of each page of documents will be read into memory before being "
    +        + "written to a Flow File for transfer.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
    +        @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type") })
    +@Stateful(description = "After each successful scroll page, the latest scroll_id is persisted in scrollId as input for the next scroll call.  "
    +        + "Once the entire query is complete, finishedQuery state will be set to true, and the processor will not execute unless this is cleared.", scopes = { Scope.LOCAL })
    +public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
    +
    +    private static final String FINISHED_QUERY_STATE = "finishedQuery";
    +    private static final String SCROLL_ID_STATE = "scrollId";
    +    private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
    +    private static final String QUERY_QUERY_PARAM = "q";
    +    private static final String SORT_QUERY_PARAM = "sort";
    +    private static final String SCROLL_QUERY_PARAM = "scroll";
    +    private static final String SCROLL_ID_QUERY_PARAM = "scroll_id";
    +    private static final String SIZE_QUERY_PARAM = "size";
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description(
    +                    "All FlowFiles that are read from Elasticsearch are routed to this relationship.")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description(
    +                    "All FlowFiles that cannot be read from Elasticsearch are routed to this relationship. Note that only incoming "
    +                            + "flow files will be routed to failure.").build();
    +
    +    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
    +            .name("scroll-es-query").displayName("Query")
    +            .description("The Lucene-style query to run against ElasticSearch").required(true)
    +            .expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor SCROLL_DURATION = new PropertyDescriptor.Builder()
    +            .name("scroll-es-scroll")
    +            .displayName("Scroll Duration")
    +            .description("The scroll duration is how long each search context is kept in memory.")
    +            .defaultValue("1m")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(
    +                    StandardValidators.createRegexMatchingValidator(Pattern.compile("[0-9]+(m|h)")))
    +            .build();
    +
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +            .name("scroll-es-index").displayName("Index")
    +            .description("The name of the index to read from").required(true)
    +            .expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +            .name("scroll-es-type")
    +            .displayName("Type")
    +            .description(
    +                    "The (optional) type of this document, used by Elasticsearch for indexing and searching. If the property is empty or set "
    +                            + "to _all, the first document matching the identifier across all types will be retrieved.")
    +            .required(false).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    +
    +    public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
    +            .name("scroll-es-fields")
    +            .displayName("Fields")
    +            .description(
    +                    "A comma-separated list of fields to retrieve from the document. If the Fields property is left blank, "
    +                            + "then the entire document's source will be retrieved.")
    +            .required(false).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    +
    +    public static final PropertyDescriptor SORT = new PropertyDescriptor.Builder()
    +            .name("scroll-es-sort")
    +            .displayName("Sort")
    +            .description(
    +                    "A sort parameter (e.g., timestamp:asc). If the Sort property is left blank, "
    +                            + "then the results will be retrieved in document order.")
    +            .required(false).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    +
    +    public static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder()
    +            .name("scroll-es-size").displayName("Page Size").defaultValue("20")
    +            .description("Determines how many documents to return per page during scrolling.")
    +            .required(true).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        return Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        descriptors.add(ES_URL);
    +        descriptors.add(PROP_SSL_CONTEXT_SERVICE);
    +        descriptors.add(USERNAME);
    +        descriptors.add(PASSWORD);
    +        descriptors.add(CONNECT_TIMEOUT);
    +        descriptors.add(RESPONSE_TIMEOUT);
    +        descriptors.add(QUERY);
    +        descriptors.add(SCROLL_DURATION);
    +        descriptors.add(PAGE_SIZE);
    +        descriptors.add(INDEX);
    +        descriptors.add(TYPE);
    +        descriptors.add(FIELDS);
    +        descriptors.add(SORT);
    +
    +        return Collections.unmodifiableList(descriptors);
    +    }
    +
    +    @OnScheduled
    +    public void setup(ProcessContext context) {
    +        super.setup(context);
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session)
    +            throws ProcessException {
    +
    +        try {
    +            if (isQueryFinished(context.getStateManager())) {
    +                getLogger().trace(
    +                        "Query has been marked finished in the state manager.  "
    +                                + "To run another query, clear the state.");
    +                return;
    +            }
    +        } catch (IOException e) {
    +            throw new ProcessException("Could not retrieve state", e);
    +        }
    +
    +        OkHttpClient okHttpClient = getClient();
    +
    +        FlowFile flowFile = session.create();
    +
    +        final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile)
    +                .getValue();
    +        final String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile)
    +                .getValue();
    +        final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile)
    +                .getValue();
    +        final int pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile)
    +                .asInteger().intValue();
    +        final String fields = context.getProperty(FIELDS).isSet() ? context.getProperty(FIELDS)
    +                .evaluateAttributeExpressions(flowFile).getValue() : null;
    +        final String sort = context.getProperty(SORT).isSet() ? context.getProperty(SORT)
    +                .evaluateAttributeExpressions(flowFile).getValue() : null;
    +        final String scroll = context.getProperty(SCROLL_DURATION).isSet() ? context
    +                .getProperty(SCROLL_DURATION).evaluateAttributeExpressions(flowFile).getValue() : null;
    +
    +        // Authentication
    +        final String username = context.getProperty(USERNAME).getValue();
    +        final String password = context.getProperty(PASSWORD).getValue();
    +
    +        final ComponentLog logger = getLogger();
    +
    +        try {
    +            String scrollId = loadScrollId(context.getStateManager());
    +
    +            if (scrollId != null) {
    +                // read the url property from the context
    +                final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL)
    +                        .getValue());
    +                final URL scrollurl = buildRequestURL(urlstr, query, index, docType, fields, sort,
    +                        scrollId, pageSize, scroll);
    +                final long startNanos = System.nanoTime();
    +
    +                final Response getResponse = sendRequestToElasticsearch(okHttpClient, scrollurl,
    +                        username, password, "GET", null);
    +                this.getPage(getResponse, scrollurl, context, session, flowFile, logger, startNanos);
    +            } else {
    +                logger.debug("Querying {}/{} from Elasticsearch: {}", new Object[] { index,
    +                        docType, query });
    +
    +                // read the url property from the context
    +                final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL)
    +                        .getValue());
    +                final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort,
    +                        scrollId, pageSize, scroll);
    +                final long startNanos = System.nanoTime();
    +
    +                final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
    +                        username, password, "GET", null);
    +                this.getPage(getResponse, queryUrl, context, session, flowFile, logger, startNanos);
    +            }
    +
    +        } catch (IOException ioe) {
    +            logger.error(
    +                    "Failed to read from Elasticsearch due to {}, this may indicate an error in configuration "
    +                            + "(hosts, username/password, etc.).",
    +                    new Object[] { ioe.getLocalizedMessage() }, ioe);
    +            session.remove(flowFile);
    +            context.yield();
    +
    +        } catch (Exception e) {
    +            logger.error("Failed to read {} from Elasticsearch due to {}", new Object[] { flowFile,
    +                    e.getLocalizedMessage() }, e);
    +            session.transfer(flowFile, REL_FAILURE);
    +            context.yield();
    +        }
    +    }
    +
    +    private void getPage(final Response getResponse, final URL url, final ProcessContext context,
    +            final ProcessSession session, FlowFile flowFile, final ComponentLog logger, final long startNanos)
    +            throws IOException {
    +        final int statusCode = getResponse.code();
    +
    +        if (isSuccess(statusCode)) {
    +            ResponseBody body = getResponse.body();
    +            final byte[] bodyBytes = body.bytes();
    +            JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes));
    +            String scrollId = responseJson.get("_scroll_id").asText();
    +
    +            StringBuilder builder = new StringBuilder();
    +
    +            builder.append("{ \"hits\" : [");
    +
    +            JsonNode hits = responseJson.get("hits").get("hits");
    +            if (hits.size() == 0) {
    +                finishQuery(context.getStateManager());
    +                session.remove(flowFile);
    +                return;
    +            }
    +
    +            for(int i = 0; i < hits.size(); i++) {
    +                JsonNode hit = hits.get(i);
    +                String retrievedIndex = hit.get("_index").asText();
    +                String retrievedType = hit.get("_type").asText();
    +
    +                JsonNode source = hit.get("_source");
    +                flowFile = session.putAttribute(flowFile, "es.index", retrievedIndex);
    --- End diff --
    
    In the Scroll processor, it always returns a full page of JSON documents per flow file, so there isn't really an easy way to include the id in an attribute (though I suppose we could so something like es.result.id.1=[id], es.result.id.2=[id2], etc)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #733: NIFI-2417: Implementing QueryElasticsearchHttp and S...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/733#discussion_r80101565
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java ---
    @@ -0,0 +1,415 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.elasticsearch;
    +
    +import java.io.IOException;
    +import java.net.MalformedURLException;
    +import java.net.URL;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.ByteArrayInputStream;
    +import org.codehaus.jackson.JsonNode;
    +
    +import okhttp3.HttpUrl;
    +import okhttp3.OkHttpClient;
    +import okhttp3.Response;
    +import okhttp3.ResponseBody;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@EventDriven
    +@SupportsBatching
    +@Tags({ "elasticsearch", "query", "scroll", "read", "get", "http" })
    +@CapabilityDescription("Scrolls through an Elasticsearch query using the specified connection properties. "
    +        + "This processor is intended to be run on the primary node, and is designed for scrolling through "
    +        + "huge result sets, as in the case of a reindex.  The state must be cleared before another query "
    +        + "can be run.  Each page of results is returned, wrapped in a JSON object like so: { \"hits\" : [ <doc1>, <doc2>, <docn> ] }.  "
    +        + "Note that the full body of each page of documents will be read into memory before being "
    +        + "written to a Flow File for transfer.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
    +        @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type") })
    +@Stateful(description = "After each successful scroll page, the latest scroll_id is persisted in scrollId as input for the next scroll call.  "
    +        + "Once the entire query is complete, finishedQuery state will be set to true, and the processor will not execute unless this is cleared.", scopes = { Scope.LOCAL })
    +public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
    +
    +    private static final String FINISHED_QUERY_STATE = "finishedQuery";
    +    private static final String SCROLL_ID_STATE = "scrollId";
    +    private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
    +    private static final String QUERY_QUERY_PARAM = "q";
    +    private static final String SORT_QUERY_PARAM = "sort";
    +    private static final String SCROLL_QUERY_PARAM = "scroll";
    +    private static final String SCROLL_ID_QUERY_PARAM = "scroll_id";
    +    private static final String SIZE_QUERY_PARAM = "size";
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description(
    +                    "All FlowFiles that are read from Elasticsearch are routed to this relationship.")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description(
    +                    "All FlowFiles that cannot be read from Elasticsearch are routed to this relationship. Note that only incoming "
    +                            + "flow files will be routed to failure.").build();
    +
    +    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
    +            .name("scroll-es-query").displayName("Query")
    +            .description("The Lucene-style query to run against ElasticSearch").required(true)
    +            .expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor SCROLL_DURATION = new PropertyDescriptor.Builder()
    +            .name("scroll-es-scroll")
    +            .displayName("Scroll Duration")
    +            .description("The scroll duration is how long each search context is kept in memory.")
    +            .defaultValue("1m")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(
    +                    StandardValidators.createRegexMatchingValidator(Pattern.compile("[0-9]+(m|h)")))
    +            .build();
    +
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +            .name("scroll-es-index").displayName("Index")
    +            .description("The name of the index to read from").required(true)
    +            .expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +            .name("scroll-es-type")
    +            .displayName("Type")
    +            .description(
    +                    "The (optional) type of this document, used by Elasticsearch for indexing and searching. If the property is empty or set "
    +                            + "to _all, the first document matching the identifier across all types will be retrieved.")
    +            .required(false).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    +
    +    public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
    +            .name("scroll-es-fields")
    +            .displayName("Fields")
    +            .description(
    +                    "A comma-separated list of fields to retrieve from the document. If the Fields property is left blank, "
    +                            + "then the entire document's source will be retrieved.")
    +            .required(false).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    +
    +    public static final PropertyDescriptor SORT = new PropertyDescriptor.Builder()
    +            .name("scroll-es-sort")
    +            .displayName("Sort")
    +            .description(
    +                    "A sort parameter (e.g., timestamp:asc). If the Sort property is left blank, "
    +                            + "then the results will be retrieved in document order.")
    +            .required(false).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    +
    +    public static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder()
    +            .name("scroll-es-size").displayName("Page Size").defaultValue("20")
    +            .description("Determines how many documents to return per page during scrolling.")
    +            .required(true).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        return Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        descriptors.add(ES_URL);
    +        descriptors.add(PROP_SSL_CONTEXT_SERVICE);
    +        descriptors.add(USERNAME);
    +        descriptors.add(PASSWORD);
    +        descriptors.add(CONNECT_TIMEOUT);
    +        descriptors.add(RESPONSE_TIMEOUT);
    +        descriptors.add(QUERY);
    +        descriptors.add(SCROLL_DURATION);
    +        descriptors.add(PAGE_SIZE);
    +        descriptors.add(INDEX);
    +        descriptors.add(TYPE);
    +        descriptors.add(FIELDS);
    +        descriptors.add(SORT);
    +
    +        return Collections.unmodifiableList(descriptors);
    +    }
    +
    +    @OnScheduled
    +    public void setup(ProcessContext context) {
    +        super.setup(context);
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session)
    +            throws ProcessException {
    +
    +        try {
    +            if (isQueryFinished(context.getStateManager())) {
    +                getLogger().trace(
    +                        "Query has been marked finished in the state manager.  "
    +                                + "To run another query, clear the state.");
    +                return;
    +            }
    +        } catch (IOException e) {
    +            throw new ProcessException("Could not retrieve state", e);
    +        }
    +
    +        OkHttpClient okHttpClient = getClient();
    +
    +        FlowFile flowFile = session.create();
    +
    +        final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile)
    +                .getValue();
    +        final String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile)
    +                .getValue();
    +        final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile)
    +                .getValue();
    +        final int pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile)
    +                .asInteger().intValue();
    +        final String fields = context.getProperty(FIELDS).isSet() ? context.getProperty(FIELDS)
    +                .evaluateAttributeExpressions(flowFile).getValue() : null;
    +        final String sort = context.getProperty(SORT).isSet() ? context.getProperty(SORT)
    +                .evaluateAttributeExpressions(flowFile).getValue() : null;
    +        final String scroll = context.getProperty(SCROLL_DURATION).isSet() ? context
    +                .getProperty(SCROLL_DURATION).evaluateAttributeExpressions(flowFile).getValue() : null;
    +
    +        // Authentication
    +        final String username = context.getProperty(USERNAME).getValue();
    +        final String password = context.getProperty(PASSWORD).getValue();
    +
    +        final ComponentLog logger = getLogger();
    +
    +        try {
    +            String scrollId = loadScrollId(context.getStateManager());
    +
    +            if (scrollId != null) {
    +                // read the url property from the context
    +                final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL)
    +                        .getValue());
    +                final URL scrollurl = buildRequestURL(urlstr, query, index, docType, fields, sort,
    +                        scrollId, pageSize, scroll);
    +                final long startNanos = System.nanoTime();
    +
    +                final Response getResponse = sendRequestToElasticsearch(okHttpClient, scrollurl,
    +                        username, password, "GET", null);
    +                this.getPage(getResponse, scrollurl, context, session, flowFile, logger, startNanos);
    +            } else {
    +                logger.debug("Querying {}/{} from Elasticsearch: {}", new Object[] { index,
    +                        docType, query });
    +
    +                // read the url property from the context
    +                final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL)
    +                        .getValue());
    +                final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort,
    +                        scrollId, pageSize, scroll);
    +                final long startNanos = System.nanoTime();
    +
    +                final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
    +                        username, password, "GET", null);
    +                this.getPage(getResponse, queryUrl, context, session, flowFile, logger, startNanos);
    +            }
    +
    +        } catch (IOException ioe) {
    +            logger.error(
    +                    "Failed to read from Elasticsearch due to {}, this may indicate an error in configuration "
    +                            + "(hosts, username/password, etc.).",
    +                    new Object[] { ioe.getLocalizedMessage() }, ioe);
    +            session.remove(flowFile);
    +            context.yield();
    +
    +        } catch (Exception e) {
    +            logger.error("Failed to read {} from Elasticsearch due to {}", new Object[] { flowFile,
    +                    e.getLocalizedMessage() }, e);
    +            session.transfer(flowFile, REL_FAILURE);
    +            context.yield();
    +        }
    +    }
    +
    +    private void getPage(final Response getResponse, final URL url, final ProcessContext context,
    +            final ProcessSession session, FlowFile flowFile, final ComponentLog logger, final long startNanos)
    +            throws IOException {
    +        final int statusCode = getResponse.code();
    +
    +        if (isSuccess(statusCode)) {
    +            ResponseBody body = getResponse.body();
    +            final byte[] bodyBytes = body.bytes();
    +            JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes));
    +            String scrollId = responseJson.get("_scroll_id").asText();
    +
    +            StringBuilder builder = new StringBuilder();
    +
    +            builder.append("{ \"hits\" : [");
    +
    +            JsonNode hits = responseJson.get("hits").get("hits");
    +            if (hits.size() == 0) {
    +                finishQuery(context.getStateManager());
    +                session.remove(flowFile);
    +                return;
    +            }
    +
    +            for(int i = 0; i < hits.size(); i++) {
    +                JsonNode hit = hits.get(i);
    +                String retrievedIndex = hit.get("_index").asText();
    +                String retrievedType = hit.get("_type").asText();
    +
    +                JsonNode source = hit.get("_source");
    +                flowFile = session.putAttribute(flowFile, "es.index", retrievedIndex);
    --- End diff --
    
    I see the id was added to the other processor, should we add it here too?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #733: NIFI-2417: Implementing QueryElasticsearchHttp and S...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/733#discussion_r79831856
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java ---
    @@ -0,0 +1,415 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.elasticsearch;
    +
    +import java.io.IOException;
    +import java.net.MalformedURLException;
    +import java.net.URL;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.ByteArrayInputStream;
    +import org.codehaus.jackson.JsonNode;
    +
    +import okhttp3.HttpUrl;
    +import okhttp3.OkHttpClient;
    +import okhttp3.Response;
    +import okhttp3.ResponseBody;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@EventDriven
    +@SupportsBatching
    +@Tags({ "elasticsearch", "query", "scroll", "read", "get", "http" })
    +@CapabilityDescription("Scrolls through an Elasticsearch query using the specified connection properties. "
    +        + "This processor is intended to be run on the primary node, and is designed for scrolling through "
    +        + "huge result sets, as in the case of a reindex.  The state must be cleared before another query "
    +        + "can be run.  Each page of results is returned, wrapped in a JSON object like so: { \"hits\" : [ <doc1>, <doc2>, <docn> ] }.  "
    +        + "Note that the full body of each page of documents will be read into memory before being "
    +        + "written to a Flow File for transfer.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
    +        @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type") })
    +@Stateful(description = "After each successful scroll page, the latest scroll_id is persisted in scrollId as input for the next scroll call.  "
    +        + "Once the entire query is complete, finishedQuery state will be set to true, and the processor will not execute unless this is cleared.", scopes = { Scope.LOCAL })
    +public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
    +
    +    private static final String FINISHED_QUERY_STATE = "finishedQuery";
    +    private static final String SCROLL_ID_STATE = "scrollId";
    +    private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
    +    private static final String QUERY_QUERY_PARAM = "q";
    +    private static final String SORT_QUERY_PARAM = "sort";
    +    private static final String SCROLL_QUERY_PARAM = "scroll";
    +    private static final String SCROLL_ID_QUERY_PARAM = "scroll_id";
    +    private static final String SIZE_QUERY_PARAM = "size";
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description(
    +                    "All FlowFiles that are read from Elasticsearch are routed to this relationship.")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description(
    +                    "All FlowFiles that cannot be read from Elasticsearch are routed to this relationship. Note that only incoming "
    +                            + "flow files will be routed to failure.").build();
    +
    +    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
    +            .name("scroll-es-query").displayName("Query")
    +            .description("The Lucene-style query to run against ElasticSearch").required(true)
    +            .expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor SCROLL_DURATION = new PropertyDescriptor.Builder()
    +            .name("scroll-es-scroll")
    +            .displayName("Scroll Duration")
    +            .description("The scroll duration is how long each search context is kept in memory.")
    +            .defaultValue("1m")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(
    +                    StandardValidators.createRegexMatchingValidator(Pattern.compile("[0-9]+(m|h)")))
    +            .build();
    +
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +            .name("scroll-es-index").displayName("Index")
    +            .description("The name of the index to read from").required(true)
    +            .expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +            .name("scroll-es-type")
    +            .displayName("Type")
    +            .description(
    +                    "The (optional) type of this document, used by Elasticsearch for indexing and searching. If the property is empty or set "
    +                            + "to _all, the first document matching the identifier across all types will be retrieved.")
    +            .required(false).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    +
    +    public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
    +            .name("scroll-es-fields")
    +            .displayName("Fields")
    +            .description(
    +                    "A comma-separated list of fields to retrieve from the document. If the Fields property is left blank, "
    +                            + "then the entire document's source will be retrieved.")
    +            .required(false).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    +
    +    public static final PropertyDescriptor SORT = new PropertyDescriptor.Builder()
    +            .name("scroll-es-sort")
    +            .displayName("Sort")
    +            .description(
    +                    "A sort parameter (e.g., timestamp:asc). If the Sort property is left blank, "
    +                            + "then the results will be retrieved in document order.")
    +            .required(false).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    +
    +    public static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder()
    +            .name("scroll-es-size").displayName("Page Size").defaultValue("20")
    +            .description("Determines how many documents to return per page during scrolling.")
    +            .required(true).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        return Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        descriptors.add(ES_URL);
    +        descriptors.add(PROP_SSL_CONTEXT_SERVICE);
    +        descriptors.add(USERNAME);
    +        descriptors.add(PASSWORD);
    +        descriptors.add(CONNECT_TIMEOUT);
    +        descriptors.add(RESPONSE_TIMEOUT);
    +        descriptors.add(QUERY);
    +        descriptors.add(SCROLL_DURATION);
    +        descriptors.add(PAGE_SIZE);
    +        descriptors.add(INDEX);
    +        descriptors.add(TYPE);
    +        descriptors.add(FIELDS);
    +        descriptors.add(SORT);
    +
    +        return Collections.unmodifiableList(descriptors);
    +    }
    +
    +    @OnScheduled
    +    public void setup(ProcessContext context) {
    +        super.setup(context);
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session)
    +            throws ProcessException {
    +
    +        try {
    +            if (isQueryFinished(context.getStateManager())) {
    +                getLogger().trace(
    +                        "Query has been marked finished in the state manager.  "
    +                                + "To run another query, clear the state.");
    +                return;
    +            }
    +        } catch (IOException e) {
    +            throw new ProcessException("Could not retrieve state", e);
    +        }
    +
    +        OkHttpClient okHttpClient = getClient();
    +
    +        FlowFile flowFile = session.create();
    +
    +        final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile)
    +                .getValue();
    +        final String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile)
    +                .getValue();
    +        final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile)
    +                .getValue();
    +        final int pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile)
    +                .asInteger().intValue();
    +        final String fields = context.getProperty(FIELDS).isSet() ? context.getProperty(FIELDS)
    +                .evaluateAttributeExpressions(flowFile).getValue() : null;
    +        final String sort = context.getProperty(SORT).isSet() ? context.getProperty(SORT)
    +                .evaluateAttributeExpressions(flowFile).getValue() : null;
    +        final String scroll = context.getProperty(SCROLL_DURATION).isSet() ? context
    +                .getProperty(SCROLL_DURATION).evaluateAttributeExpressions(flowFile).getValue() : null;
    +
    +        // Authentication
    +        final String username = context.getProperty(USERNAME).getValue();
    +        final String password = context.getProperty(PASSWORD).getValue();
    +
    +        final ComponentLog logger = getLogger();
    +
    +        try {
    +            String scrollId = loadScrollId(context.getStateManager());
    +
    +            if (scrollId != null) {
    +                // read the url property from the context
    +                final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL)
    +                        .getValue());
    +                final URL scrollurl = buildRequestURL(urlstr, query, index, docType, fields, sort,
    +                        scrollId, pageSize, scroll);
    +                final long startNanos = System.nanoTime();
    +
    +                final Response getResponse = sendRequestToElasticsearch(okHttpClient, scrollurl,
    +                        username, password, "GET", null);
    +                this.getPage(getResponse, scrollurl, context, session, flowFile, logger, startNanos);
    +            } else {
    +                logger.debug("Querying {}/{} from Elasticsearch: {}", new Object[] { index,
    +                        docType, query });
    +
    +                // read the url property from the context
    +                final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL)
    +                        .getValue());
    +                final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort,
    +                        scrollId, pageSize, scroll);
    +                final long startNanos = System.nanoTime();
    +
    +                final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
    +                        username, password, "GET", null);
    +                this.getPage(getResponse, queryUrl, context, session, flowFile, logger, startNanos);
    +            }
    +
    +        } catch (IOException ioe) {
    +            logger.error(
    +                    "Failed to read from Elasticsearch due to {}, this may indicate an error in configuration "
    +                            + "(hosts, username/password, etc.).",
    +                    new Object[] { ioe.getLocalizedMessage() }, ioe);
    +            session.remove(flowFile);
    +            context.yield();
    +
    +        } catch (Exception e) {
    +            logger.error("Failed to read {} from Elasticsearch due to {}", new Object[] { flowFile,
    +                    e.getLocalizedMessage() }, e);
    +            session.transfer(flowFile, REL_FAILURE);
    +            context.yield();
    +        }
    +    }
    +
    +    private void getPage(final Response getResponse, final URL url, final ProcessContext context,
    +            final ProcessSession session, FlowFile flowFile, final ComponentLog logger, final long startNanos)
    +            throws IOException {
    +        final int statusCode = getResponse.code();
    +
    +        if (isSuccess(statusCode)) {
    +            ResponseBody body = getResponse.body();
    +            final byte[] bodyBytes = body.bytes();
    +            JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes));
    +            String scrollId = responseJson.get("_scroll_id").asText();
    +
    +            StringBuilder builder = new StringBuilder();
    +
    +            builder.append("{ \"hits\" : [");
    +
    +            JsonNode hits = responseJson.get("hits").get("hits");
    +            if (hits.size() == 0) {
    +                finishQuery(context.getStateManager());
    +                session.remove(flowFile);
    +                return;
    +            }
    +
    +            for(int i = 0; i < hits.size(); i++) {
    +                JsonNode hit = hits.get(i);
    +                String retrievedIndex = hit.get("_index").asText();
    +                String retrievedType = hit.get("_type").asText();
    +
    +                JsonNode source = hit.get("_source");
    +                flowFile = session.putAttribute(flowFile, "es.index", retrievedIndex);
    --- End diff --
    
    Perhaps include the document id as an attribute here as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #733: NIFI-2417: Implementing QueryElasticsearchHttp and S...

Posted by gresockj <gi...@git.apache.org>.
Github user gresockj commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/733#discussion_r79914319
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java ---
    @@ -0,0 +1,415 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.elasticsearch;
    +
    +import java.io.IOException;
    +import java.net.MalformedURLException;
    +import java.net.URL;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.ByteArrayInputStream;
    +import org.codehaus.jackson.JsonNode;
    +
    +import okhttp3.HttpUrl;
    +import okhttp3.OkHttpClient;
    +import okhttp3.Response;
    +import okhttp3.ResponseBody;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@EventDriven
    +@SupportsBatching
    +@Tags({ "elasticsearch", "query", "scroll", "read", "get", "http" })
    +@CapabilityDescription("Scrolls through an Elasticsearch query using the specified connection properties. "
    +        + "This processor is intended to be run on the primary node, and is designed for scrolling through "
    +        + "huge result sets, as in the case of a reindex.  The state must be cleared before another query "
    +        + "can be run.  Each page of results is returned, wrapped in a JSON object like so: { \"hits\" : [ <doc1>, <doc2>, <docn> ] }.  "
    +        + "Note that the full body of each page of documents will be read into memory before being "
    +        + "written to a Flow File for transfer.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
    +        @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type") })
    +@Stateful(description = "After each successful scroll page, the latest scroll_id is persisted in scrollId as input for the next scroll call.  "
    +        + "Once the entire query is complete, finishedQuery state will be set to true, and the processor will not execute unless this is cleared.", scopes = { Scope.LOCAL })
    +public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
    +
    +    private static final String FINISHED_QUERY_STATE = "finishedQuery";
    +    private static final String SCROLL_ID_STATE = "scrollId";
    +    private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
    +    private static final String QUERY_QUERY_PARAM = "q";
    +    private static final String SORT_QUERY_PARAM = "sort";
    +    private static final String SCROLL_QUERY_PARAM = "scroll";
    +    private static final String SCROLL_ID_QUERY_PARAM = "scroll_id";
    +    private static final String SIZE_QUERY_PARAM = "size";
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description(
    +                    "All FlowFiles that are read from Elasticsearch are routed to this relationship.")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description(
    +                    "All FlowFiles that cannot be read from Elasticsearch are routed to this relationship. Note that only incoming "
    +                            + "flow files will be routed to failure.").build();
    +
    +    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
    +            .name("scroll-es-query").displayName("Query")
    +            .description("The Lucene-style query to run against ElasticSearch").required(true)
    --- End diff --
    
    I'll add an example, but I feel like username:tiger should work.  Are you sure the username field is analyzed in your schema?  Does this query work in your browser against your ES instance?
    http://localhost:9200/<index>/_search?q=username:tiger


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #733: NIFI-2417: Implementing QueryElasticsearchHttp and S...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/733#discussion_r80098194
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java ---
    @@ -0,0 +1,415 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.elasticsearch;
    +
    +import java.io.IOException;
    +import java.net.MalformedURLException;
    +import java.net.URL;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.ByteArrayInputStream;
    +import org.codehaus.jackson.JsonNode;
    +
    +import okhttp3.HttpUrl;
    +import okhttp3.OkHttpClient;
    +import okhttp3.Response;
    +import okhttp3.ResponseBody;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@EventDriven
    +@SupportsBatching
    +@Tags({ "elasticsearch", "query", "scroll", "read", "get", "http" })
    +@CapabilityDescription("Scrolls through an Elasticsearch query using the specified connection properties. "
    +        + "This processor is intended to be run on the primary node, and is designed for scrolling through "
    +        + "huge result sets, as in the case of a reindex.  The state must be cleared before another query "
    +        + "can be run.  Each page of results is returned, wrapped in a JSON object like so: { \"hits\" : [ <doc1>, <doc2>, <docn> ] }.  "
    +        + "Note that the full body of each page of documents will be read into memory before being "
    +        + "written to a Flow File for transfer.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
    +        @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type") })
    +@Stateful(description = "After each successful scroll page, the latest scroll_id is persisted in scrollId as input for the next scroll call.  "
    +        + "Once the entire query is complete, finishedQuery state will be set to true, and the processor will not execute unless this is cleared.", scopes = { Scope.LOCAL })
    +public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
    +
    +    private static final String FINISHED_QUERY_STATE = "finishedQuery";
    +    private static final String SCROLL_ID_STATE = "scrollId";
    +    private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
    +    private static final String QUERY_QUERY_PARAM = "q";
    +    private static final String SORT_QUERY_PARAM = "sort";
    +    private static final String SCROLL_QUERY_PARAM = "scroll";
    +    private static final String SCROLL_ID_QUERY_PARAM = "scroll_id";
    +    private static final String SIZE_QUERY_PARAM = "size";
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description(
    +                    "All FlowFiles that are read from Elasticsearch are routed to this relationship.")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description(
    +                    "All FlowFiles that cannot be read from Elasticsearch are routed to this relationship. Note that only incoming "
    +                            + "flow files will be routed to failure.").build();
    +
    +    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
    +            .name("scroll-es-query").displayName("Query")
    +            .description("The Lucene-style query to run against ElasticSearch").required(true)
    --- End diff --
    
    Not it doesn't. However it's a string field and by default those should be analyzed right? Do I need to explicitly update the mapping to set index = analyzed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #733: NIFI-2417: Implementing QueryElasticsearchHttp and S...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/733#discussion_r80112353
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java ---
    @@ -0,0 +1,415 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.elasticsearch;
    +
    +import java.io.IOException;
    +import java.net.MalformedURLException;
    +import java.net.URL;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.ByteArrayInputStream;
    +import org.codehaus.jackson.JsonNode;
    +
    +import okhttp3.HttpUrl;
    +import okhttp3.OkHttpClient;
    +import okhttp3.Response;
    +import okhttp3.ResponseBody;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@EventDriven
    +@SupportsBatching
    +@Tags({ "elasticsearch", "query", "scroll", "read", "get", "http" })
    +@CapabilityDescription("Scrolls through an Elasticsearch query using the specified connection properties. "
    +        + "This processor is intended to be run on the primary node, and is designed for scrolling through "
    +        + "huge result sets, as in the case of a reindex.  The state must be cleared before another query "
    +        + "can be run.  Each page of results is returned, wrapped in a JSON object like so: { \"hits\" : [ <doc1>, <doc2>, <docn> ] }.  "
    +        + "Note that the full body of each page of documents will be read into memory before being "
    +        + "written to a Flow File for transfer.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
    +        @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type") })
    +@Stateful(description = "After each successful scroll page, the latest scroll_id is persisted in scrollId as input for the next scroll call.  "
    +        + "Once the entire query is complete, finishedQuery state will be set to true, and the processor will not execute unless this is cleared.", scopes = { Scope.LOCAL })
    +public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
    +
    +    private static final String FINISHED_QUERY_STATE = "finishedQuery";
    +    private static final String SCROLL_ID_STATE = "scrollId";
    +    private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
    +    private static final String QUERY_QUERY_PARAM = "q";
    +    private static final String SORT_QUERY_PARAM = "sort";
    +    private static final String SCROLL_QUERY_PARAM = "scroll";
    +    private static final String SCROLL_ID_QUERY_PARAM = "scroll_id";
    +    private static final String SIZE_QUERY_PARAM = "size";
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description(
    +                    "All FlowFiles that are read from Elasticsearch are routed to this relationship.")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description(
    +                    "All FlowFiles that cannot be read from Elasticsearch are routed to this relationship. Note that only incoming "
    +                            + "flow files will be routed to failure.").build();
    +
    +    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
    +            .name("scroll-es-query").displayName("Query")
    +            .description("The Lucene-style query to run against ElasticSearch").required(true)
    +            .expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor SCROLL_DURATION = new PropertyDescriptor.Builder()
    +            .name("scroll-es-scroll")
    +            .displayName("Scroll Duration")
    +            .description("The scroll duration is how long each search context is kept in memory.")
    +            .defaultValue("1m")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(
    +                    StandardValidators.createRegexMatchingValidator(Pattern.compile("[0-9]+(m|h)")))
    +            .build();
    +
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +            .name("scroll-es-index").displayName("Index")
    +            .description("The name of the index to read from").required(true)
    +            .expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +            .name("scroll-es-type")
    +            .displayName("Type")
    +            .description(
    +                    "The (optional) type of this document, used by Elasticsearch for indexing and searching. If the property is empty or set "
    +                            + "to _all, the first document matching the identifier across all types will be retrieved.")
    +            .required(false).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    +
    +    public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
    +            .name("scroll-es-fields")
    +            .displayName("Fields")
    +            .description(
    +                    "A comma-separated list of fields to retrieve from the document. If the Fields property is left blank, "
    +                            + "then the entire document's source will be retrieved.")
    +            .required(false).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    +
    +    public static final PropertyDescriptor SORT = new PropertyDescriptor.Builder()
    +            .name("scroll-es-sort")
    +            .displayName("Sort")
    +            .description(
    +                    "A sort parameter (e.g., timestamp:asc). If the Sort property is left blank, "
    +                            + "then the results will be retrieved in document order.")
    +            .required(false).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    +
    +    public static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder()
    +            .name("scroll-es-size").displayName("Page Size").defaultValue("20")
    +            .description("Determines how many documents to return per page during scrolling.")
    +            .required(true).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        return Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        descriptors.add(ES_URL);
    +        descriptors.add(PROP_SSL_CONTEXT_SERVICE);
    +        descriptors.add(USERNAME);
    +        descriptors.add(PASSWORD);
    +        descriptors.add(CONNECT_TIMEOUT);
    +        descriptors.add(RESPONSE_TIMEOUT);
    +        descriptors.add(QUERY);
    +        descriptors.add(SCROLL_DURATION);
    +        descriptors.add(PAGE_SIZE);
    +        descriptors.add(INDEX);
    +        descriptors.add(TYPE);
    +        descriptors.add(FIELDS);
    +        descriptors.add(SORT);
    +
    +        return Collections.unmodifiableList(descriptors);
    +    }
    +
    +    @OnScheduled
    +    public void setup(ProcessContext context) {
    +        super.setup(context);
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session)
    +            throws ProcessException {
    +
    +        try {
    +            if (isQueryFinished(context.getStateManager())) {
    +                getLogger().trace(
    +                        "Query has been marked finished in the state manager.  "
    +                                + "To run another query, clear the state.");
    +                return;
    +            }
    +        } catch (IOException e) {
    +            throw new ProcessException("Could not retrieve state", e);
    +        }
    +
    +        OkHttpClient okHttpClient = getClient();
    +
    +        FlowFile flowFile = session.create();
    +
    +        final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile)
    +                .getValue();
    +        final String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile)
    +                .getValue();
    +        final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile)
    +                .getValue();
    +        final int pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile)
    +                .asInteger().intValue();
    +        final String fields = context.getProperty(FIELDS).isSet() ? context.getProperty(FIELDS)
    +                .evaluateAttributeExpressions(flowFile).getValue() : null;
    +        final String sort = context.getProperty(SORT).isSet() ? context.getProperty(SORT)
    +                .evaluateAttributeExpressions(flowFile).getValue() : null;
    +        final String scroll = context.getProperty(SCROLL_DURATION).isSet() ? context
    +                .getProperty(SCROLL_DURATION).evaluateAttributeExpressions(flowFile).getValue() : null;
    +
    +        // Authentication
    +        final String username = context.getProperty(USERNAME).getValue();
    +        final String password = context.getProperty(PASSWORD).getValue();
    +
    +        final ComponentLog logger = getLogger();
    +
    +        try {
    +            String scrollId = loadScrollId(context.getStateManager());
    +
    +            if (scrollId != null) {
    +                // read the url property from the context
    +                final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL)
    +                        .getValue());
    +                final URL scrollurl = buildRequestURL(urlstr, query, index, docType, fields, sort,
    +                        scrollId, pageSize, scroll);
    +                final long startNanos = System.nanoTime();
    +
    +                final Response getResponse = sendRequestToElasticsearch(okHttpClient, scrollurl,
    +                        username, password, "GET", null);
    +                this.getPage(getResponse, scrollurl, context, session, flowFile, logger, startNanos);
    +            } else {
    +                logger.debug("Querying {}/{} from Elasticsearch: {}", new Object[] { index,
    +                        docType, query });
    +
    +                // read the url property from the context
    +                final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL)
    +                        .getValue());
    +                final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort,
    +                        scrollId, pageSize, scroll);
    +                final long startNanos = System.nanoTime();
    +
    +                final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
    +                        username, password, "GET", null);
    +                this.getPage(getResponse, queryUrl, context, session, flowFile, logger, startNanos);
    +            }
    +
    +        } catch (IOException ioe) {
    +            logger.error(
    +                    "Failed to read from Elasticsearch due to {}, this may indicate an error in configuration "
    +                            + "(hosts, username/password, etc.).",
    +                    new Object[] { ioe.getLocalizedMessage() }, ioe);
    +            session.remove(flowFile);
    +            context.yield();
    +
    +        } catch (Exception e) {
    +            logger.error("Failed to read {} from Elasticsearch due to {}", new Object[] { flowFile,
    +                    e.getLocalizedMessage() }, e);
    +            session.transfer(flowFile, REL_FAILURE);
    +            context.yield();
    +        }
    +    }
    +
    +    private void getPage(final Response getResponse, final URL url, final ProcessContext context,
    +            final ProcessSession session, FlowFile flowFile, final ComponentLog logger, final long startNanos)
    +            throws IOException {
    +        final int statusCode = getResponse.code();
    +
    +        if (isSuccess(statusCode)) {
    +            ResponseBody body = getResponse.body();
    +            final byte[] bodyBytes = body.bytes();
    +            JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes));
    +            String scrollId = responseJson.get("_scroll_id").asText();
    +
    +            StringBuilder builder = new StringBuilder();
    +
    +            builder.append("{ \"hits\" : [");
    +
    +            JsonNode hits = responseJson.get("hits").get("hits");
    +            if (hits.size() == 0) {
    +                finishQuery(context.getStateManager());
    +                session.remove(flowFile);
    +                return;
    +            }
    +
    +            for(int i = 0; i < hits.size(); i++) {
    +                JsonNode hit = hits.get(i);
    +                String retrievedIndex = hit.get("_index").asText();
    +                String retrievedType = hit.get("_type").asText();
    +
    +                JsonNode source = hit.get("_source");
    +                flowFile = session.putAttribute(flowFile, "es.index", retrievedIndex);
    --- End diff --
    
    ah good point. Ok looks good will comment on the whole PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #733: NIFI-2417: Implementing QueryElasticsearchHttp and S...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/733#discussion_r79832571
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java ---
    @@ -0,0 +1,415 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.elasticsearch;
    +
    +import java.io.IOException;
    +import java.net.MalformedURLException;
    +import java.net.URL;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.ByteArrayInputStream;
    +import org.codehaus.jackson.JsonNode;
    +
    +import okhttp3.HttpUrl;
    +import okhttp3.OkHttpClient;
    +import okhttp3.Response;
    +import okhttp3.ResponseBody;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@EventDriven
    +@SupportsBatching
    +@Tags({ "elasticsearch", "query", "scroll", "read", "get", "http" })
    +@CapabilityDescription("Scrolls through an Elasticsearch query using the specified connection properties. "
    +        + "This processor is intended to be run on the primary node, and is designed for scrolling through "
    +        + "huge result sets, as in the case of a reindex.  The state must be cleared before another query "
    +        + "can be run.  Each page of results is returned, wrapped in a JSON object like so: { \"hits\" : [ <doc1>, <doc2>, <docn> ] }.  "
    +        + "Note that the full body of each page of documents will be read into memory before being "
    +        + "written to a Flow File for transfer.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
    +        @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type") })
    +@Stateful(description = "After each successful scroll page, the latest scroll_id is persisted in scrollId as input for the next scroll call.  "
    +        + "Once the entire query is complete, finishedQuery state will be set to true, and the processor will not execute unless this is cleared.", scopes = { Scope.LOCAL })
    +public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
    +
    +    private static final String FINISHED_QUERY_STATE = "finishedQuery";
    +    private static final String SCROLL_ID_STATE = "scrollId";
    +    private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
    +    private static final String QUERY_QUERY_PARAM = "q";
    +    private static final String SORT_QUERY_PARAM = "sort";
    +    private static final String SCROLL_QUERY_PARAM = "scroll";
    +    private static final String SCROLL_ID_QUERY_PARAM = "scroll_id";
    +    private static final String SIZE_QUERY_PARAM = "size";
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description(
    +                    "All FlowFiles that are read from Elasticsearch are routed to this relationship.")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description(
    +                    "All FlowFiles that cannot be read from Elasticsearch are routed to this relationship. Note that only incoming "
    +                            + "flow files will be routed to failure.").build();
    +
    +    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
    +            .name("scroll-es-query").displayName("Query")
    +            .description("The Lucene-style query to run against ElasticSearch").required(true)
    --- End diff --
    
    It might be helpful to add a trivial example here. Also I couldn't use something like (username:tiger) to find usernames that contain the word tiger, I had to use (username:\*tiger\*). Again though, that might be my ES setup, just wanted to make you aware.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #733: NIFI-2417: Implementing QueryElasticsearchHttp and S...

Posted by gresockj <gi...@git.apache.org>.
Github user gresockj commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/733#discussion_r80111838
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java ---
    @@ -0,0 +1,415 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.elasticsearch;
    +
    +import java.io.IOException;
    +import java.net.MalformedURLException;
    +import java.net.URL;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.ByteArrayInputStream;
    +import org.codehaus.jackson.JsonNode;
    +
    +import okhttp3.HttpUrl;
    +import okhttp3.OkHttpClient;
    +import okhttp3.Response;
    +import okhttp3.ResponseBody;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@EventDriven
    +@SupportsBatching
    +@Tags({ "elasticsearch", "query", "scroll", "read", "get", "http" })
    +@CapabilityDescription("Scrolls through an Elasticsearch query using the specified connection properties. "
    +        + "This processor is intended to be run on the primary node, and is designed for scrolling through "
    +        + "huge result sets, as in the case of a reindex.  The state must be cleared before another query "
    +        + "can be run.  Each page of results is returned, wrapped in a JSON object like so: { \"hits\" : [ <doc1>, <doc2>, <docn> ] }.  "
    +        + "Note that the full body of each page of documents will be read into memory before being "
    +        + "written to a Flow File for transfer.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
    +        @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type") })
    +@Stateful(description = "After each successful scroll page, the latest scroll_id is persisted in scrollId as input for the next scroll call.  "
    +        + "Once the entire query is complete, finishedQuery state will be set to true, and the processor will not execute unless this is cleared.", scopes = { Scope.LOCAL })
    +public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
    +
    +    private static final String FINISHED_QUERY_STATE = "finishedQuery";
    +    private static final String SCROLL_ID_STATE = "scrollId";
    +    private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
    +    private static final String QUERY_QUERY_PARAM = "q";
    +    private static final String SORT_QUERY_PARAM = "sort";
    +    private static final String SCROLL_QUERY_PARAM = "scroll";
    +    private static final String SCROLL_ID_QUERY_PARAM = "scroll_id";
    +    private static final String SIZE_QUERY_PARAM = "size";
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description(
    +                    "All FlowFiles that are read from Elasticsearch are routed to this relationship.")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description(
    +                    "All FlowFiles that cannot be read from Elasticsearch are routed to this relationship. Note that only incoming "
    +                            + "flow files will be routed to failure.").build();
    +
    +    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
    +            .name("scroll-es-query").displayName("Query")
    +            .description("The Lucene-style query to run against ElasticSearch").required(true)
    --- End diff --
    
    Interesting, yeah by default I believe string field should be analyzed.  I'm not an ES expert, so it could be something in the configuration.  I think the important thing is that the NiFi processor behavior matches the ES REST API behavior, though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #733: NIFI-2417: Implementing QueryElasticsearchHttp and S...

Posted by gresockj <gi...@git.apache.org>.
Github user gresockj commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/733#discussion_r79913709
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java ---
    @@ -0,0 +1,410 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.elasticsearch;
    +
    +import java.io.IOException;
    +import java.net.MalformedURLException;
    +import java.net.URL;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.ByteArrayInputStream;
    +import org.codehaus.jackson.JsonNode;
    +
    +import okhttp3.HttpUrl;
    +import okhttp3.OkHttpClient;
    +import okhttp3.Response;
    +import okhttp3.ResponseBody;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@EventDriven
    +@SupportsBatching
    +@Tags({ "elasticsearch", "query", "read", "get", "http" })
    +@CapabilityDescription("Queries Elasticsearch using the specified connection properties. "
    +        + "Note that the full body of each page of documents will be read into memory before being "
    +        + "written to Flow Files for transfer.  Also note that the Elasticsearch max_result_window index "
    +        + "setting is the upper bound on the number of records that can be retrieved using this query.  "
    +        + "To retrieve more records, use the ScrollElasticsearchHttp processor.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"),
    +        @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
    +        @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type"),
    +        @WritesAttribute(attribute = "es.result.*", description = "If Target is 'Flow file attributes', the JSON attributes of "
    +                + "each result will be placed into corresponding attributes with this prefix.") })
    +public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
    +
    +    private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
    +    private static final String QUERY_QUERY_PARAM = "q";
    +    private static final String SORT_QUERY_PARAM = "sort";
    +    private static final String FROM_QUERY_PARAM = "from";
    +    private static final String SIZE_QUERY_PARAM = "size";
    +
    +    public static final String TARGET_FLOW_FILE_CONTENT = "Flow file content";
    +    public static final String TARGET_FLOW_FILE_ATTRIBUTES = "Flow file attributes";
    +    private static final String ATTRIBUTE_PREFIX = "es.result.";
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description(
    +                    "All FlowFiles that are read from Elasticsearch are routed to this relationship.")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description(
    +                    "All FlowFiles that cannot be read from Elasticsearch are routed to this relationship. Note that only incoming "
    +                            + "flow files will be routed to failure.").build();
    +
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description(
    +                    "A FlowFile is routed to this relationship if the document cannot be fetched but attempting the operation again may "
    +                            + "succeed. Note that if the processor has no incoming connections, flow files may still be sent to this relationship "
    +                            + "based on the processor properties and the results of the fetch operation.")
    +            .build();
    +
    +    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
    +            .name("query-es-query").displayName("Query")
    +            .description("The Lucene-style query to run against ElasticSearch").required(true)
    +            .expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +            .name("query-es-index").displayName("Index")
    +            .description("The name of the index to read from").required(true)
    +            .expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +            .name("query-es-type")
    +            .displayName("Type")
    +            .description(
    +                    "The (optional) type of this document, used by Elasticsearch for indexing and searching. If the property is empty or set "
    +                            + "to _all, the first document matching the identifier across all types will be retrieved.")
    --- End diff --
    
    You're right.. I copied this from FetchElasticsearchHttp, and the bug exists there as well.  It looks like the original author had the logic backwards between index and type: https://www.elastic.co/guide/en/elasticsearch/guide/current/multi-index-multi-type.html
    
    I think we should change the "Index" property to use the _all syntax if empty or if "_all" is specified.  It looks like the ES syntax for searching all types is to simply leave out the "Type" path parameter.  I'll address this in all 3 processors when I get a chance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #733: NIFI-2417: Implementing QueryElasticsearchHttp and ScrollEl...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on the issue:

    https://github.com/apache/nifi/pull/733
  
    Oops, forgot to rebase and add the comment that auto-closes this PR. Do you mind closing it? I'll Resolve the Jira. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #733: NIFI-2417: Implementing QueryElasticsearchHttp and ScrollEl...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on the issue:

    https://github.com/apache/nifi/pull/733
  
    +1 LGTM, Thanks for the great contribution! Lots of folks have asked about this capability, glad to have it in NiFi :) Merging to master


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #733: NIFI-2417: Implementing QueryElasticsearchHttp and ScrollEl...

Posted by gresockj <gi...@git.apache.org>.
Github user gresockj commented on the issue:

    https://github.com/apache/nifi/pull/733
  
    Thanks @mattyb149 !  Good comments, I'll see if I can work on these this week.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #733: NIFI-2417: Implementing QueryElasticsearchHttp and S...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/733#discussion_r79826524
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java ---
    @@ -0,0 +1,410 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.elasticsearch;
    +
    +import java.io.IOException;
    +import java.net.MalformedURLException;
    +import java.net.URL;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.ByteArrayInputStream;
    +import org.codehaus.jackson.JsonNode;
    +
    +import okhttp3.HttpUrl;
    +import okhttp3.OkHttpClient;
    +import okhttp3.Response;
    +import okhttp3.ResponseBody;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@EventDriven
    +@SupportsBatching
    +@Tags({ "elasticsearch", "query", "read", "get", "http" })
    +@CapabilityDescription("Queries Elasticsearch using the specified connection properties. "
    +        + "Note that the full body of each page of documents will be read into memory before being "
    +        + "written to Flow Files for transfer.  Also note that the Elasticsearch max_result_window index "
    +        + "setting is the upper bound on the number of records that can be retrieved using this query.  "
    +        + "To retrieve more records, use the ScrollElasticsearchHttp processor.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"),
    +        @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
    +        @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type"),
    +        @WritesAttribute(attribute = "es.result.*", description = "If Target is 'Flow file attributes', the JSON attributes of "
    +                + "each result will be placed into corresponding attributes with this prefix.") })
    +public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
    +
    +    private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
    +    private static final String QUERY_QUERY_PARAM = "q";
    +    private static final String SORT_QUERY_PARAM = "sort";
    +    private static final String FROM_QUERY_PARAM = "from";
    +    private static final String SIZE_QUERY_PARAM = "size";
    +
    +    public static final String TARGET_FLOW_FILE_CONTENT = "Flow file content";
    +    public static final String TARGET_FLOW_FILE_ATTRIBUTES = "Flow file attributes";
    +    private static final String ATTRIBUTE_PREFIX = "es.result.";
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description(
    +                    "All FlowFiles that are read from Elasticsearch are routed to this relationship.")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description(
    +                    "All FlowFiles that cannot be read from Elasticsearch are routed to this relationship. Note that only incoming "
    +                            + "flow files will be routed to failure.").build();
    +
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description(
    +                    "A FlowFile is routed to this relationship if the document cannot be fetched but attempting the operation again may "
    +                            + "succeed. Note that if the processor has no incoming connections, flow files may still be sent to this relationship "
    +                            + "based on the processor properties and the results of the fetch operation.")
    +            .build();
    +
    +    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
    +            .name("query-es-query").displayName("Query")
    +            .description("The Lucene-style query to run against ElasticSearch").required(true)
    +            .expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +            .name("query-es-index").displayName("Index")
    +            .description("The name of the index to read from").required(true)
    +            .expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +            .name("query-es-type")
    +            .displayName("Type")
    +            .description(
    +                    "The (optional) type of this document, used by Elasticsearch for indexing and searching. If the property is empty or set "
    +                            + "to _all, the first document matching the identifier across all types will be retrieved.")
    +            .required(false).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    +
    +    public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
    +            .name("query-es-fields")
    +            .displayName("Fields")
    +            .description(
    +                    "A comma-separated list of fields to retrieve from the document. If the Fields property is left blank, "
    +                            + "then the entire document's source will be retrieved.")
    +            .required(false).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    +
    +    public static final PropertyDescriptor SORT = new PropertyDescriptor.Builder()
    +            .name("query-es-sort")
    +            .displayName("Sort")
    +            .description(
    +                    "A sort parameter (e.g., timestamp:asc). If the Sort property is left blank, "
    +                            + "then the results will be retrieved in document order.")
    +            .required(false).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    +
    +    public static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder()
    +            .name("query-es-size").displayName("Page Size").defaultValue("20")
    +            .description("Determines how many documents to return per page during scrolling.")
    +            .required(true).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    +
    +    public static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
    +            .name("query-es-limit").displayName("Limit")
    +            .description("If set, limits the number of results that will be returned.")
    +            .required(false).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    +
    +    public static final PropertyDescriptor TARGET = new PropertyDescriptor.Builder()
    +            .name("query-es-target")
    +            .displayName("Target")
    +            .description(
    +                    "Indicates where the results should be placed.  In the case of 'Flow file content', the JSON "
    +                            + "response will be written as the content of the flow file.  In the case of 'Flow file attributes', "
    +                            + "the original flow file (if applicable) will be cloned for each result, and all return fields will be placed "
    +                            + "in a flow file attribute of the same name, but prefixed by 'es.result.'")
    +            .required(true).expressionLanguageSupported(false)
    +            .defaultValue(TARGET_FLOW_FILE_CONTENT)
    +            .allowableValues(TARGET_FLOW_FILE_CONTENT, TARGET_FLOW_FILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        relationships.add(REL_RETRY);
    +        return Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        descriptors.add(ES_URL);
    +        descriptors.add(PROP_SSL_CONTEXT_SERVICE);
    +        descriptors.add(USERNAME);
    +        descriptors.add(PASSWORD);
    +        descriptors.add(CONNECT_TIMEOUT);
    +        descriptors.add(RESPONSE_TIMEOUT);
    +        descriptors.add(QUERY);
    +        descriptors.add(PAGE_SIZE);
    +        descriptors.add(INDEX);
    +        descriptors.add(TYPE);
    +        descriptors.add(FIELDS);
    +        descriptors.add(SORT);
    +        descriptors.add(LIMIT);
    +        descriptors.add(TARGET);
    +
    +        return Collections.unmodifiableList(descriptors);
    +    }
    +
    +    @OnScheduled
    +    public void setup(ProcessContext context) {
    +        super.setup(context);
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session)
    +            throws ProcessException {
    +
    +        FlowFile flowFile = null;
    +        if (context.hasIncomingConnection()) {
    +            flowFile = session.get();
    +
    +            // If we have no FlowFile, and all incoming connections are self-loops then we can
    +            // continue on.
    +            // However, if we have no FlowFile and we have connections coming from other Processors,
    +            // then
    +            // we know that we should run only if we have a FlowFile.
    +            if (flowFile == null && context.hasNonLoopConnection()) {
    +                return;
    +            }
    +        }
    +
    +        OkHttpClient okHttpClient = getClient();
    +
    +        final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile)
    +                .getValue();
    +        final String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile)
    +                .getValue();
    +        final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile)
    +                .getValue();
    +        final int pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile)
    +                .asInteger().intValue();
    +        final Integer limit = context.getProperty(LIMIT).isSet() ? context.getProperty(LIMIT)
    +                .evaluateAttributeExpressions(flowFile).asInteger().intValue() : null;
    +        final String fields = context.getProperty(FIELDS).isSet() ? context.getProperty(FIELDS)
    +                .evaluateAttributeExpressions(flowFile).getValue() : null;
    +        final String sort = context.getProperty(SORT).isSet() ? context.getProperty(SORT)
    +                .evaluateAttributeExpressions(flowFile).getValue() : null;
    +        final boolean targetIsContent = context.getProperty(TARGET).getValue()
    +                .equals(TARGET_FLOW_FILE_CONTENT);
    +
    +        // Authentication
    +        final String username = context.getProperty(USERNAME).getValue();
    +        final String password = context.getProperty(PASSWORD).getValue();
    +
    +        final ComponentLog logger = getLogger();
    +
    +        int fromIndex = 0;
    +        int numResults;
    +
    +        try {
    +            logger.debug("Querying {}/{} from Elasticsearch: {}", new Object[] { index, docType,
    +                    query });
    +
    +            final long startNanos = System.nanoTime();
    +            // read the url property from the context
    +            final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).getValue());
    +
    +            boolean hitLimit = false;
    +            do {
    +                int mPageSize = pageSize;
    +                if (limit != null && limit <= (fromIndex + pageSize)) {
    +                    mPageSize = limit - fromIndex;
    +                    hitLimit = true;
    +                }
    +
    +                final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort,
    +                        mPageSize, fromIndex);
    +
    +                final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
    +                        username, password, "GET", null);
    +                numResults = this.getPage(getResponse, queryUrl, context, session, flowFile,
    +                        logger, startNanos, targetIsContent);
    +                fromIndex += pageSize;
    +            } while (numResults > 0 && !hitLimit);
    +
    +            if (flowFile != null) {
    +                session.remove(flowFile);
    +            }
    +        } catch (IOException ioe) {
    +            logger.error(
    +                    "Failed to read from Elasticsearch due to {}, this may indicate an error in configuration "
    +                            + "(hosts, username/password, etc.). Routing to retry",
    +                    new Object[] { ioe.getLocalizedMessage() }, ioe);
    +            if (flowFile != null) {
    +                session.transfer(flowFile, REL_RETRY);
    +            }
    +            context.yield();
    +
    +        } catch (RetryableException e) {
    +            logger.error(e.getMessage(), new Object[] { e.getLocalizedMessage() }, e);
    +            if (flowFile != null) {
    +                session.transfer(flowFile, REL_RETRY);
    +            }
    +            context.yield();
    +        } catch (Exception e) {
    +            logger.error("Failed to read {} from Elasticsearch due to {}", new Object[] { flowFile,
    +                    e.getLocalizedMessage() }, e);
    +            if (flowFile != null) {
    +                session.transfer(flowFile, REL_FAILURE);
    +            }
    +            context.yield();
    +        }
    +    }
    +
    +    private int getPage(final Response getResponse, final URL url, final ProcessContext context,
    +            final ProcessSession session, FlowFile flowFile, final ComponentLog logger,
    +            final long startNanos, boolean targetIsContent)
    +            throws IOException {
    +        List<FlowFile> page = new ArrayList<>();
    +        final int statusCode = getResponse.code();
    +
    +        if (isSuccess(statusCode)) {
    +            ResponseBody body = getResponse.body();
    +            final byte[] bodyBytes = body.bytes();
    +            JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes));
    +            JsonNode hits = responseJson.get("hits").get("hits");
    +
    +            for(int i = 0; i < hits.size(); i++) {
    +                JsonNode hit = hits.get(i);
    +                String retrievedId = hit.get("_id").asText();
    +                String retrievedIndex = hit.get("_index").asText();
    +                String retrievedType = hit.get("_type").asText();
    +
    +                FlowFile documentFlowFile = null;
    +                if (flowFile != null) {
    +                    documentFlowFile = targetIsContent ? session.create(flowFile) : session.clone(flowFile);
    +                } else {
    +                    documentFlowFile = session.create();
    +                }
    +
    +                JsonNode source = hit.get("_source");
    +                documentFlowFile = session.putAttribute(documentFlowFile, "es.index", retrievedIndex);
    +                documentFlowFile = session.putAttribute(documentFlowFile, "es.type", retrievedType);
    +
    +                if (targetIsContent) {
    +                    documentFlowFile = session.putAttribute(documentFlowFile, "filename", retrievedId);
    --- End diff --
    
    The retrieved ID is only used if the target is content, perhaps it should be added as an attribute when the target is attributes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #733: NIFI-2417: Implementing QueryElasticsearchHttp and S...

Posted by gresockj <gi...@git.apache.org>.
Github user gresockj closed the pull request at:

    https://github.com/apache/nifi/pull/733


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #733: NIFI-2417: Implementing QueryElasticsearchHttp and S...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/733#discussion_r80096977
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java ---
    @@ -0,0 +1,410 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.elasticsearch;
    +
    +import java.io.IOException;
    +import java.net.MalformedURLException;
    +import java.net.URL;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.ByteArrayInputStream;
    +import org.codehaus.jackson.JsonNode;
    +
    +import okhttp3.HttpUrl;
    +import okhttp3.OkHttpClient;
    +import okhttp3.Response;
    +import okhttp3.ResponseBody;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@EventDriven
    +@SupportsBatching
    +@Tags({ "elasticsearch", "query", "read", "get", "http" })
    +@CapabilityDescription("Queries Elasticsearch using the specified connection properties. "
    +        + "Note that the full body of each page of documents will be read into memory before being "
    +        + "written to Flow Files for transfer.  Also note that the Elasticsearch max_result_window index "
    +        + "setting is the upper bound on the number of records that can be retrieved using this query.  "
    +        + "To retrieve more records, use the ScrollElasticsearchHttp processor.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"),
    +        @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
    +        @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type"),
    +        @WritesAttribute(attribute = "es.result.*", description = "If Target is 'Flow file attributes', the JSON attributes of "
    +                + "each result will be placed into corresponding attributes with this prefix.") })
    +public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
    +
    +    private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
    +    private static final String QUERY_QUERY_PARAM = "q";
    +    private static final String SORT_QUERY_PARAM = "sort";
    +    private static final String FROM_QUERY_PARAM = "from";
    +    private static final String SIZE_QUERY_PARAM = "size";
    +
    +    public static final String TARGET_FLOW_FILE_CONTENT = "Flow file content";
    +    public static final String TARGET_FLOW_FILE_ATTRIBUTES = "Flow file attributes";
    +    private static final String ATTRIBUTE_PREFIX = "es.result.";
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description(
    +                    "All FlowFiles that are read from Elasticsearch are routed to this relationship.")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description(
    +                    "All FlowFiles that cannot be read from Elasticsearch are routed to this relationship. Note that only incoming "
    +                            + "flow files will be routed to failure.").build();
    +
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description(
    +                    "A FlowFile is routed to this relationship if the document cannot be fetched but attempting the operation again may "
    +                            + "succeed. Note that if the processor has no incoming connections, flow files may still be sent to this relationship "
    +                            + "based on the processor properties and the results of the fetch operation.")
    +            .build();
    +
    +    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
    +            .name("query-es-query").displayName("Query")
    +            .description("The Lucene-style query to run against ElasticSearch").required(true)
    +            .expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +            .name("query-es-index").displayName("Index")
    +            .description("The name of the index to read from").required(true)
    +            .expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +            .name("query-es-type")
    +            .displayName("Type")
    +            .description(
    +                    "The (optional) type of this document, used by Elasticsearch for indexing and searching. If the property is empty or set "
    +                            + "to _all, the first document matching the identifier across all types will be retrieved.")
    --- End diff --
    
    Whoops! I am the original author ;) Good catch!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #733: NIFI-2417: Implementing QueryElasticsearchHttp and S...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/733#discussion_r79831289
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java ---
    @@ -0,0 +1,415 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.elasticsearch;
    +
    +import java.io.IOException;
    +import java.net.MalformedURLException;
    +import java.net.URL;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.ByteArrayInputStream;
    +import org.codehaus.jackson.JsonNode;
    +
    +import okhttp3.HttpUrl;
    +import okhttp3.OkHttpClient;
    +import okhttp3.Response;
    +import okhttp3.ResponseBody;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@EventDriven
    +@SupportsBatching
    +@Tags({ "elasticsearch", "query", "scroll", "read", "get", "http" })
    +@CapabilityDescription("Scrolls through an Elasticsearch query using the specified connection properties. "
    +        + "This processor is intended to be run on the primary node, and is designed for scrolling through "
    +        + "huge result sets, as in the case of a reindex.  The state must be cleared before another query "
    +        + "can be run.  Each page of results is returned, wrapped in a JSON object like so: { \"hits\" : [ <doc1>, <doc2>, <docn> ] }.  "
    +        + "Note that the full body of each page of documents will be read into memory before being "
    +        + "written to a Flow File for transfer.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
    +        @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type") })
    +@Stateful(description = "After each successful scroll page, the latest scroll_id is persisted in scrollId as input for the next scroll call.  "
    +        + "Once the entire query is complete, finishedQuery state will be set to true, and the processor will not execute unless this is cleared.", scopes = { Scope.LOCAL })
    +public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
    +
    +    private static final String FINISHED_QUERY_STATE = "finishedQuery";
    +    private static final String SCROLL_ID_STATE = "scrollId";
    +    private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
    +    private static final String QUERY_QUERY_PARAM = "q";
    +    private static final String SORT_QUERY_PARAM = "sort";
    +    private static final String SCROLL_QUERY_PARAM = "scroll";
    +    private static final String SCROLL_ID_QUERY_PARAM = "scroll_id";
    +    private static final String SIZE_QUERY_PARAM = "size";
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description(
    +                    "All FlowFiles that are read from Elasticsearch are routed to this relationship.")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description(
    +                    "All FlowFiles that cannot be read from Elasticsearch are routed to this relationship. Note that only incoming "
    +                            + "flow files will be routed to failure.").build();
    +
    +    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
    +            .name("scroll-es-query").displayName("Query")
    +            .description("The Lucene-style query to run against ElasticSearch").required(true)
    +            .expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor SCROLL_DURATION = new PropertyDescriptor.Builder()
    +            .name("scroll-es-scroll")
    +            .displayName("Scroll Duration")
    +            .description("The scroll duration is how long each search context is kept in memory.")
    +            .defaultValue("1m")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(
    +                    StandardValidators.createRegexMatchingValidator(Pattern.compile("[0-9]+(m|h)")))
    +            .build();
    +
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +            .name("scroll-es-index").displayName("Index")
    +            .description("The name of the index to read from").required(true)
    +            .expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +            .name("scroll-es-type")
    +            .displayName("Type")
    +            .description(
    +                    "The (optional) type of this document, used by Elasticsearch for indexing and searching. If the property is empty or set "
    +                            + "to _all, the first document matching the identifier across all types will be retrieved.")
    +            .required(false).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    +
    +    public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
    +            .name("scroll-es-fields")
    +            .displayName("Fields")
    +            .description(
    +                    "A comma-separated list of fields to retrieve from the document. If the Fields property is left blank, "
    +                            + "then the entire document's source will be retrieved.")
    +            .required(false).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    +
    +    public static final PropertyDescriptor SORT = new PropertyDescriptor.Builder()
    +            .name("scroll-es-sort")
    +            .displayName("Sort")
    +            .description(
    +                    "A sort parameter (e.g., timestamp:asc). If the Sort property is left blank, "
    +                            + "then the results will be retrieved in document order.")
    +            .required(false).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    +
    +    public static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder()
    +            .name("scroll-es-size").displayName("Page Size").defaultValue("20")
    +            .description("Determines how many documents to return per page during scrolling.")
    +            .required(true).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        return Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        descriptors.add(ES_URL);
    +        descriptors.add(PROP_SSL_CONTEXT_SERVICE);
    +        descriptors.add(USERNAME);
    +        descriptors.add(PASSWORD);
    +        descriptors.add(CONNECT_TIMEOUT);
    +        descriptors.add(RESPONSE_TIMEOUT);
    +        descriptors.add(QUERY);
    +        descriptors.add(SCROLL_DURATION);
    +        descriptors.add(PAGE_SIZE);
    +        descriptors.add(INDEX);
    +        descriptors.add(TYPE);
    +        descriptors.add(FIELDS);
    +        descriptors.add(SORT);
    +
    +        return Collections.unmodifiableList(descriptors);
    +    }
    +
    +    @OnScheduled
    +    public void setup(ProcessContext context) {
    +        super.setup(context);
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session)
    +            throws ProcessException {
    +
    +        try {
    +            if (isQueryFinished(context.getStateManager())) {
    +                getLogger().trace(
    +                        "Query has been marked finished in the state manager.  "
    +                                + "To run another query, clear the state.");
    +                return;
    +            }
    +        } catch (IOException e) {
    +            throw new ProcessException("Could not retrieve state", e);
    +        }
    +
    +        OkHttpClient okHttpClient = getClient();
    +
    +        FlowFile flowFile = session.create();
    +
    +        final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile)
    +                .getValue();
    +        final String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile)
    +                .getValue();
    +        final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile)
    +                .getValue();
    +        final int pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile)
    +                .asInteger().intValue();
    +        final String fields = context.getProperty(FIELDS).isSet() ? context.getProperty(FIELDS)
    +                .evaluateAttributeExpressions(flowFile).getValue() : null;
    +        final String sort = context.getProperty(SORT).isSet() ? context.getProperty(SORT)
    +                .evaluateAttributeExpressions(flowFile).getValue() : null;
    +        final String scroll = context.getProperty(SCROLL_DURATION).isSet() ? context
    +                .getProperty(SCROLL_DURATION).evaluateAttributeExpressions(flowFile).getValue() : null;
    +
    +        // Authentication
    +        final String username = context.getProperty(USERNAME).getValue();
    +        final String password = context.getProperty(PASSWORD).getValue();
    +
    +        final ComponentLog logger = getLogger();
    +
    +        try {
    +            String scrollId = loadScrollId(context.getStateManager());
    +
    +            if (scrollId != null) {
    +                // read the url property from the context
    +                final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL)
    +                        .getValue());
    +                final URL scrollurl = buildRequestURL(urlstr, query, index, docType, fields, sort,
    +                        scrollId, pageSize, scroll);
    +                final long startNanos = System.nanoTime();
    +
    +                final Response getResponse = sendRequestToElasticsearch(okHttpClient, scrollurl,
    +                        username, password, "GET", null);
    +                this.getPage(getResponse, scrollurl, context, session, flowFile, logger, startNanos);
    +            } else {
    +                logger.debug("Querying {}/{} from Elasticsearch: {}", new Object[] { index,
    +                        docType, query });
    +
    +                // read the url property from the context
    +                final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL)
    +                        .getValue());
    +                final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort,
    +                        scrollId, pageSize, scroll);
    +                final long startNanos = System.nanoTime();
    +
    +                final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
    +                        username, password, "GET", null);
    +                this.getPage(getResponse, queryUrl, context, session, flowFile, logger, startNanos);
    +            }
    +
    +        } catch (IOException ioe) {
    +            logger.error(
    +                    "Failed to read from Elasticsearch due to {}, this may indicate an error in configuration "
    +                            + "(hosts, username/password, etc.).",
    +                    new Object[] { ioe.getLocalizedMessage() }, ioe);
    +            session.remove(flowFile);
    +            context.yield();
    +
    +        } catch (Exception e) {
    +            logger.error("Failed to read {} from Elasticsearch due to {}", new Object[] { flowFile,
    +                    e.getLocalizedMessage() }, e);
    +            session.transfer(flowFile, REL_FAILURE);
    +            context.yield();
    +        }
    +    }
    +
    +    private void getPage(final Response getResponse, final URL url, final ProcessContext context,
    +            final ProcessSession session, FlowFile flowFile, final ComponentLog logger, final long startNanos)
    +            throws IOException {
    +        final int statusCode = getResponse.code();
    +
    +        if (isSuccess(statusCode)) {
    +            ResponseBody body = getResponse.body();
    +            final byte[] bodyBytes = body.bytes();
    +            JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes));
    +            String scrollId = responseJson.get("_scroll_id").asText();
    +
    +            StringBuilder builder = new StringBuilder();
    +
    +            builder.append("{ \"hits\" : [");
    +
    +            JsonNode hits = responseJson.get("hits").get("hits");
    +            if (hits.size() == 0) {
    +                finishQuery(context.getStateManager());
    +                session.remove(flowFile);
    +                return;
    +            }
    +
    +            for(int i = 0; i < hits.size(); i++) {
    +                JsonNode hit = hits.get(i);
    +                String retrievedIndex = hit.get("_index").asText();
    +                String retrievedType = hit.get("_type").asText();
    +
    +                JsonNode source = hit.get("_source");
    +                flowFile = session.putAttribute(flowFile, "es.index", retrievedIndex);
    +                flowFile = session.putAttribute(flowFile, "es.type", retrievedType);
    +
    +                builder.append(source.toString());
    +                if (i < hits.size() - 1) {
    +                    builder.append(", ");
    +                }
    +            }
    +            builder.append("] }");
    +            logger.debug("Elasticsearch retrieved " + responseJson.size() + " documents, routing to success");
    +
    +            flowFile = session.write(flowFile, out -> {
    +                out.write(builder.toString().getBytes());
    +            });
    +            session.transfer(flowFile, REL_SUCCESS);
    --- End diff --
    
    Since the content of the flow file is a JSON object, I recommend setting the mime.type attribute to application/json.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #733: NIFI-2417: Implementing QueryElasticsearchHttp and S...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/733#discussion_r79826318
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java ---
    @@ -0,0 +1,410 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.elasticsearch;
    +
    +import java.io.IOException;
    +import java.net.MalformedURLException;
    +import java.net.URL;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.ByteArrayInputStream;
    +import org.codehaus.jackson.JsonNode;
    +
    +import okhttp3.HttpUrl;
    +import okhttp3.OkHttpClient;
    +import okhttp3.Response;
    +import okhttp3.ResponseBody;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@EventDriven
    +@SupportsBatching
    +@Tags({ "elasticsearch", "query", "read", "get", "http" })
    +@CapabilityDescription("Queries Elasticsearch using the specified connection properties. "
    +        + "Note that the full body of each page of documents will be read into memory before being "
    +        + "written to Flow Files for transfer.  Also note that the Elasticsearch max_result_window index "
    +        + "setting is the upper bound on the number of records that can be retrieved using this query.  "
    +        + "To retrieve more records, use the ScrollElasticsearchHttp processor.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"),
    +        @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
    +        @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type"),
    +        @WritesAttribute(attribute = "es.result.*", description = "If Target is 'Flow file attributes', the JSON attributes of "
    +                + "each result will be placed into corresponding attributes with this prefix.") })
    +public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
    +
    +    private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
    +    private static final String QUERY_QUERY_PARAM = "q";
    +    private static final String SORT_QUERY_PARAM = "sort";
    +    private static final String FROM_QUERY_PARAM = "from";
    +    private static final String SIZE_QUERY_PARAM = "size";
    +
    +    public static final String TARGET_FLOW_FILE_CONTENT = "Flow file content";
    +    public static final String TARGET_FLOW_FILE_ATTRIBUTES = "Flow file attributes";
    +    private static final String ATTRIBUTE_PREFIX = "es.result.";
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description(
    +                    "All FlowFiles that are read from Elasticsearch are routed to this relationship.")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description(
    +                    "All FlowFiles that cannot be read from Elasticsearch are routed to this relationship. Note that only incoming "
    +                            + "flow files will be routed to failure.").build();
    +
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description(
    +                    "A FlowFile is routed to this relationship if the document cannot be fetched but attempting the operation again may "
    +                            + "succeed. Note that if the processor has no incoming connections, flow files may still be sent to this relationship "
    +                            + "based on the processor properties and the results of the fetch operation.")
    +            .build();
    +
    +    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
    +            .name("query-es-query").displayName("Query")
    +            .description("The Lucene-style query to run against ElasticSearch").required(true)
    +            .expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +            .name("query-es-index").displayName("Index")
    +            .description("The name of the index to read from").required(true)
    +            .expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +            .name("query-es-type")
    +            .displayName("Type")
    +            .description(
    +                    "The (optional) type of this document, used by Elasticsearch for indexing and searching. If the property is empty or set "
    +                            + "to _all, the first document matching the identifier across all types will be retrieved.")
    --- End diff --
    
    I couldn't get this to work with a setting of _all, but that might be my Elasticsearch config. Any pointers?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---