You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by MikeThomsen <gi...@git.apache.org> on 2018/04/07 09:35:06 UTC

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

GitHub user MikeThomsen opened a pull request:

    https://github.com/apache/nifi/pull/2615

    NIFI-5051 Created ElasticSearch lookup service.

    Thank you for submitting a contribution to Apache NiFi.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
         in the commit message?
    
    - [ ] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [ ] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
    - [ ] Have you written or updated unit tests to verify your changes?
    - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
    - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
    - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
    - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
    
    ### For documentation related changes:
    - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/MikeThomsen/nifi NIFI-5051

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/2615.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2615
    
----
commit dd7176ded6c359b78ab1265789b6567b2d574c8f
Author: Mike Thomsen <mi...@...>
Date:   2018-04-07T01:38:07Z

    NIFI-5051 Created ElasticSearch lookup service.

----


---

[GitHub] nifi issue #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by alopresto <gi...@git.apache.org>.
Github user alopresto commented on the issue:

    https://github.com/apache/nifi/pull/2615
  
    @MikeThomsen I would maybe put a comment in the `src/test/java/.gitignore` file explaining why it's there so someone in the future doesn't see it as a superfluous tooling artifact and remove it, and then your test is silently no longer executed and we don't catch regressions. We've had similar occurrences in some of the other modules. 


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r192050258
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml ---
    @@ -120,6 +132,22 @@
                 <version>1.7.0-SNAPSHOT</version>
                 <scope>provided</scope>
             </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-avro-record-utils</artifactId>
    +            <version>1.7.0-SNAPSHOT</version>
    +            <scope>compile</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-avro-record-utils</artifactId>
    --- End diff --
    
    Done.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r184746728
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,290 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.schemaregistry.services.SchemaRegistry;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.SchemaIdentifier;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +
    +public class ElasticSearchLookupService extends AbstractControllerService implements LookupService {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
    +        .name("el-lookup-schema-registry")
    +        .displayName("Schema Registry")
    +        .description("If specified, this avro schema will be used for all objects loaded from MongoDB using this service. If left blank, " +
    +                "the service will attempt to determine the schema from the results.")
    +        .required(false)
    +        .identifiesControllerService(SchemaRegistry.class)
    +        .build();
    +
    +    public static final PropertyDescriptor RECORD_SCHEMA_NAME = new PropertyDescriptor.Builder()
    --- End diff --
    
    Now that #2661 has been merged, could you extend SchemaRegistryService instead of AbstractControllerService (which you'll still get as SchemaRegistryService's parent)? That way you'll have immediate access to the same properties and logic as other schema-registry-aware processors to give a consistent UX.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r209696028
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml ---
    @@ -127,8 +133,113 @@
                 <version>5.6.8</version>
                 <scope>compile</scope>
             </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-avro-record-utils</artifactId>
    +            <version>1.7.0-SNAPSHOT</version>
    --- End diff --
    
    Done.


---

[GitHub] nifi issue #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2615
  
    @mattyb149 changed the query model per the discussion above and changed the tests to be Groovy so that the inline JSON, etc. would be a lot cleaner to read.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by alopresto <gi...@git.apache.org>.
Github user alopresto commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r210029087
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml ---
    @@ -127,8 +133,113 @@
                 <version>5.6.8</version>
                 <scope>compile</scope>
             </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-avro-record-utils</artifactId>
    +            <version>1.7.0-SNAPSHOT</version>
    +            <scope>compile</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-schema-registry-service-api</artifactId>
    +            <scope>compile</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.mockito</groupId>
    +            <artifactId>mockito-all</artifactId>
    +            <scope>test</scope>
    +        </dependency>
         </dependencies>
     
    +    <build>
    --- End diff --
    
    If there is nothing in `src/test/java`, the Groovy tests won't be detected unless a plugin references them directly. In this case, the `build-helper-maven-plugin` is accomplishing that. In other locations, the `maven-compiler-plugin` is set to use `groovy-eclipse-compiler` to achieve the same result.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r217898996
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml ---
    @@ -150,6 +172,7 @@
                                 <httpPort>9400</httpPort>
                                 <version>5.6.2</version>
                                 <timeout>90</timeout>
    +                            <pathInitScript>src/test/resources/setup.script</pathInitScript>
    --- End diff --
    
    Resolved.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r189676414
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,253 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.SchemaRegistryService;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +public class ElasticSearchLookupService extends SchemaRegistryService implements LookupService {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +
    +    public static final PropertyDescriptor RECORD_SCHEMA_NAME = new PropertyDescriptor.Builder()
    --- End diff --
    
    You don't need this property anymore, as you get one from SchemaRegistryService


---

[GitHub] nifi issue #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2615
  
    Should be all good to go now.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r189665908
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml ---
    @@ -120,6 +132,22 @@
                 <version>1.7.0-SNAPSHOT</version>
                 <scope>provided</scope>
             </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-avro-record-utils</artifactId>
    +            <version>1.7.0-SNAPSHOT</version>
    +            <scope>compile</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-avro-record-utils</artifactId>
    --- End diff --
    
    Still twice, I can remove on merge


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r209698134
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml ---
    @@ -127,8 +133,113 @@
                 <version>5.6.8</version>
                 <scope>compile</scope>
             </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-avro-record-utils</artifactId>
    +            <version>1.7.0-SNAPSHOT</version>
    +            <scope>compile</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-schema-registry-service-api</artifactId>
    +            <scope>compile</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.mockito</groupId>
    +            <artifactId>mockito-all</artifactId>
    +            <scope>test</scope>
    +        </dependency>
         </dependencies>
     
    +    <build>
    --- End diff --
    
    I'm leaving in the helper plugin for now because for some reason, it won't even detect the groovy test source without it. I'll remove it if you have any suggestions on how to fix that.


---

[GitHub] nifi issue #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2615
  
    @mattyb149 I converted it over to be a subclass of `SchemaRegistryService`. Let me know if it needs anything else.


---

[GitHub] nifi issue #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2615
  
    @mattyb149 @pvillard31 Changed the query model as requested and it's ready for final review AFAICT.


---

[GitHub] nifi issue #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2615
  
    @mattyb149 Refactored the query builder.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r217899033
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.JsonInferenceSchemaRegistryService;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryService implements LookupService<Record> {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    private ElasticSearchClientService clientService;
    +
    +    private String index;
    +    private String type;
    +    private ObjectMapper mapper;
    +
    +    private final List<PropertyDescriptor> DESCRIPTORS;
    +
    +    public ElasticSearchLookupService() {
    +        List<PropertyDescriptor> _desc = new ArrayList<>();
    +        _desc.addAll(super.getSupportedPropertyDescriptors());
    +        _desc.add(CLIENT_SERVICE);
    +        _desc.add(INDEX);
    +        _desc.add(TYPE);
    +        DESCRIPTORS = Collections.unmodifiableList(_desc);
    +    }
    +
    +    @Override
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) {
    +        clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
    +        index = context.getProperty(INDEX).evaluateAttributeExpressions().getValue();
    +        type  = context.getProperty(TYPE).evaluateAttributeExpressions().getValue();
    +        mapper = new ObjectMapper();
    +
    +        super.onEnabled(context);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return DESCRIPTORS;
    +    }
    +
    +    @Override
    +    public Optional<Record> lookup(Map<String, Object> coordinates) throws LookupFailureException {
    +        Map<String, String> context = coordinates.entrySet().stream()
    +            .collect(Collectors.toMap(
    +                e -> e.getKey(),
    +                e -> e.getValue().toString()
    +            ));
    +        return lookup(coordinates, context);
    +    }
    +
    +    @Override
    +    public Optional<Record> lookup(Map<String, Object> coordinates, Map<String, String> context) throws LookupFailureException {
    +        validateCoordinates(coordinates);
    +
    +        try {
    +            Record record;
    +            if (coordinates.containsKey("_id")) {
    +                record = getById((String)coordinates.get("_id"), context);
    +            } else {
    +                record = getByQuery(coordinates, context);
    +            }
    +
    +            return record == null ? Optional.empty() : Optional.of(record);
    +        } catch (Exception ex) {
    +            getLogger().error("Error during lookup.", ex);
    +            throw new LookupFailureException(ex);
    +        }
    +    }
    +
    +    private RecordSchema getSchemaFromCoordinates(Map<String, Object> coordinates) {
    +        Map<String, String> variables = coordinates.entrySet().stream()
    +            .collect(Collectors.toMap(
    +                e -> e.getKey(),
    +                e -> e.getValue().toString()
    +            ));
    +        try {
    +            return getSchema(variables, null);
    +        } catch (SchemaNotFoundException | IOException e) {
    +            if (getLogger().isDebugEnabled()) {
    +                getLogger().debug("Could not load schema, will create one from the results.", e);
    +            }
    +            return null;
    +        }
    +    }
    +
    +    private void validateCoordinates(Map coordinates) throws LookupFailureException {
    +        List<String> reasons = new ArrayList<>();
    +
    +        if (coordinates.containsKey("_id") && !(coordinates.get("_id") instanceof String)) {
    +            reasons.add("_id was supplied, but it was not a String.");
    +        }
    +
    +        if (coordinates.containsKey("_id") && coordinates.size() > 1) {
    +            reasons.add("When _id is used, it can be the only key used in the lookup.");
    +        }
    +
    +        if (reasons.size() > 0) {
    +            String error = String.join("\n", reasons);
    +            throw new LookupFailureException(error);
    +        }
    +    }
    +
    +    private Record getById(final String _id, Map<String, String> context) throws IOException, LookupFailureException, SchemaNotFoundException {
    +        Map<String, Object> query = new HashMap<String, Object>(){{
    +            put("query", new HashMap<String, Object>() {{
    +                put("match", new HashMap<String, String>(){{
    +                    put("_id", _id);
    +                }});
    +            }});
    +        }};
    +
    +        String json = mapper.writeValueAsString(query);
    +
    +        SearchResponse response = clientService.search(json, index, type);
    +
    +        if (response.getNumberOfHits() > 1) {
    +            throw new LookupFailureException(String.format("Expected 1 response, got %d for query %s",
    +                response.getNumberOfHits(), json));
    +        } else if (response.getNumberOfHits() == 0) {
    +            return null;
    +        }
    +
    +        final Map<String, Object> source = (Map)response.getHits().get(0).get("_source");
    +
    +        RecordSchema toUse = getSchema(context, source, null);
    +
    +        return new MapRecord(toUse, source);
    +    }
    +
    +    Map<String, Object> getNested(String key, Object value) {
    +        String path = key.substring(0, key.lastIndexOf("."));
    +
    +        return new HashMap<String, Object>(){{
    +            put("path", path);
    +            put("query", new HashMap<String, Object>(){{
    +                put("match", new HashMap<String, Object>(){{
    +                    put(key, value);
    +                }});
    +            }});
    +        }};
    +    }
    +
    +    private Map<String, Object> buildQuery(Map<String, Object> coordinates) {
    +        Map<String, Object> query = new HashMap<String, Object>(){{
    +            put("bool", new HashMap<String, Object>(){{
    +                put("must", coordinates.entrySet().stream()
    +                    .map(e -> new HashMap<String, Object>(){{
    +                        if (e.getKey().contains(".")) {
    --- End diff --
    
    > but got an error saying I was trying to do a nested query on a field that wasn't nested.
    
    I think you are. ES can be weird about detecting nested documents. I've only had consistent good results when explicitly defining them. I'll try to set up a test example.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r217751008
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.JsonInferenceSchemaRegistryService;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryService implements LookupService<Record> {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    private ElasticSearchClientService clientService;
    +
    +    private String index;
    +    private String type;
    +    private ObjectMapper mapper;
    +
    +    private final List<PropertyDescriptor> DESCRIPTORS;
    +
    +    ElasticSearchLookupService() {
    --- End diff --
    
    *facepalm*
    
    One fix, coming right up...


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r189727653
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,253 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.SchemaRegistryService;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +public class ElasticSearchLookupService extends SchemaRegistryService implements LookupService {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +
    +    public static final PropertyDescriptor RECORD_SCHEMA_NAME = new PropertyDescriptor.Builder()
    +        .name("el-lookup-record-schema-name")
    +        .displayName("Record Schema Name")
    +        .description("If specified, the value will be used to lookup a schema in the configured schema registry.")
    +        .required(false)
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    private ElasticSearchClientService clientService;
    +
    +    private String index;
    +    private String type;
    +    private ObjectMapper mapper;
    +
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) throws InitializationException {
    +        clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
    +        index = context.getProperty(INDEX).getValue();
    +        type  = context.getProperty(TYPE).getValue();
    +        mapper = new ObjectMapper();
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        List<PropertyDescriptor> _desc = new ArrayList<>();
    +        _desc.addAll(super.getSupportedPropertyDescriptors());
    +        _desc.add(CLIENT_SERVICE);
    +        _desc.add(INDEX);
    +        _desc.add(TYPE);
    +        _desc.add(RECORD_SCHEMA_NAME);
    +
    +        return Collections.unmodifiableList(_desc);
    +    }
    +
    +    @Override
    +    public Optional lookup(Map coordinates) throws LookupFailureException {
    --- End diff --
    
    For nested, it could be trickier. I think this would work:
    
    ```
    /user/email => "user.contact.email"
    ```
    
    ```
    {
        "query": {
             "nested": {
                 "path": "user.contact",
                  "query": {
                      "match": {
                          "email": "john.smith@company.com"
                       }
                  }
             }
         }
    }
    ```


---

[GitHub] nifi issue #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2615
  
    @mattyb149 can we close this out?


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r217884439
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.JsonInferenceSchemaRegistryService;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryService implements LookupService<Record> {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    private ElasticSearchClientService clientService;
    +
    +    private String index;
    +    private String type;
    +    private ObjectMapper mapper;
    +
    +    private final List<PropertyDescriptor> DESCRIPTORS;
    +
    +    public ElasticSearchLookupService() {
    +        List<PropertyDescriptor> _desc = new ArrayList<>();
    +        _desc.addAll(super.getSupportedPropertyDescriptors());
    +        _desc.add(CLIENT_SERVICE);
    +        _desc.add(INDEX);
    +        _desc.add(TYPE);
    +        DESCRIPTORS = Collections.unmodifiableList(_desc);
    +    }
    +
    +    @Override
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) {
    +        clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
    +        index = context.getProperty(INDEX).evaluateAttributeExpressions().getValue();
    +        type  = context.getProperty(TYPE).evaluateAttributeExpressions().getValue();
    +        mapper = new ObjectMapper();
    +
    +        super.onEnabled(context);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return DESCRIPTORS;
    +    }
    +
    +    @Override
    +    public Optional<Record> lookup(Map<String, Object> coordinates) throws LookupFailureException {
    +        Map<String, String> context = coordinates.entrySet().stream()
    +            .collect(Collectors.toMap(
    +                e -> e.getKey(),
    +                e -> e.getValue().toString()
    +            ));
    +        return lookup(coordinates, context);
    +    }
    +
    +    @Override
    +    public Optional<Record> lookup(Map<String, Object> coordinates, Map<String, String> context) throws LookupFailureException {
    +        validateCoordinates(coordinates);
    +
    +        try {
    +            Record record;
    +            if (coordinates.containsKey("_id")) {
    +                record = getById((String)coordinates.get("_id"), context);
    +            } else {
    +                record = getByQuery(coordinates, context);
    +            }
    +
    +            return record == null ? Optional.empty() : Optional.of(record);
    +        } catch (Exception ex) {
    +            getLogger().error("Error during lookup.", ex);
    +            throw new LookupFailureException(ex);
    +        }
    +    }
    +
    +    private RecordSchema getSchemaFromCoordinates(Map<String, Object> coordinates) {
    +        Map<String, String> variables = coordinates.entrySet().stream()
    +            .collect(Collectors.toMap(
    +                e -> e.getKey(),
    +                e -> e.getValue().toString()
    +            ));
    +        try {
    +            return getSchema(variables, null);
    +        } catch (SchemaNotFoundException | IOException e) {
    +            if (getLogger().isDebugEnabled()) {
    +                getLogger().debug("Could not load schema, will create one from the results.", e);
    +            }
    +            return null;
    +        }
    +    }
    +
    +    private void validateCoordinates(Map coordinates) throws LookupFailureException {
    +        List<String> reasons = new ArrayList<>();
    +
    +        if (coordinates.containsKey("_id") && !(coordinates.get("_id") instanceof String)) {
    +            reasons.add("_id was supplied, but it was not a String.");
    +        }
    +
    +        if (coordinates.containsKey("_id") && coordinates.size() > 1) {
    +            reasons.add("When _id is used, it can be the only key used in the lookup.");
    +        }
    +
    +        if (reasons.size() > 0) {
    +            String error = String.join("\n", reasons);
    +            throw new LookupFailureException(error);
    +        }
    +    }
    +
    +    private Record getById(final String _id, Map<String, String> context) throws IOException, LookupFailureException, SchemaNotFoundException {
    +        Map<String, Object> query = new HashMap<String, Object>(){{
    +            put("query", new HashMap<String, Object>() {{
    +                put("match", new HashMap<String, String>(){{
    +                    put("_id", _id);
    +                }});
    +            }});
    +        }};
    +
    +        String json = mapper.writeValueAsString(query);
    +
    +        SearchResponse response = clientService.search(json, index, type);
    +
    +        if (response.getNumberOfHits() > 1) {
    +            throw new LookupFailureException(String.format("Expected 1 response, got %d for query %s",
    +                response.getNumberOfHits(), json));
    +        } else if (response.getNumberOfHits() == 0) {
    +            return null;
    +        }
    +
    +        final Map<String, Object> source = (Map)response.getHits().get(0).get("_source");
    +
    +        RecordSchema toUse = getSchema(context, source, null);
    +
    +        return new MapRecord(toUse, source);
    +    }
    +
    +    Map<String, Object> getNested(String key, Object value) {
    +        String path = key.substring(0, key.lastIndexOf("."));
    +
    +        return new HashMap<String, Object>(){{
    +            put("path", path);
    +            put("query", new HashMap<String, Object>(){{
    +                put("match", new HashMap<String, Object>(){{
    +                    put(key, value);
    +                }});
    +            }});
    +        }};
    +    }
    +
    +    private Map<String, Object> buildQuery(Map<String, Object> coordinates) {
    +        Map<String, Object> query = new HashMap<String, Object>(){{
    +            put("bool", new HashMap<String, Object>(){{
    +                put("must", coordinates.entrySet().stream()
    +                    .map(e -> new HashMap<String, Object>(){{
    +                        if (e.getKey().contains(".")) {
    --- End diff --
    
    I'll look into that. Should be able to get something resolved this weekend.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r209966944
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,253 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.SchemaRegistryService;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +public class ElasticSearchLookupService extends SchemaRegistryService implements LookupService {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +
    +    public static final PropertyDescriptor RECORD_SCHEMA_NAME = new PropertyDescriptor.Builder()
    +        .name("el-lookup-record-schema-name")
    +        .displayName("Record Schema Name")
    +        .description("If specified, the value will be used to lookup a schema in the configured schema registry.")
    +        .required(false)
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    private ElasticSearchClientService clientService;
    +
    +    private String index;
    +    private String type;
    +    private ObjectMapper mapper;
    +
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) throws InitializationException {
    +        clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
    +        index = context.getProperty(INDEX).getValue();
    +        type  = context.getProperty(TYPE).getValue();
    +        mapper = new ObjectMapper();
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        List<PropertyDescriptor> _desc = new ArrayList<>();
    +        _desc.addAll(super.getSupportedPropertyDescriptors());
    +        _desc.add(CLIENT_SERVICE);
    +        _desc.add(INDEX);
    +        _desc.add(TYPE);
    +        _desc.add(RECORD_SCHEMA_NAME);
    +
    +        return Collections.unmodifiableList(_desc);
    +    }
    +
    +    @Override
    +    public Optional lookup(Map coordinates) throws LookupFailureException {
    --- End diff --
    
    Yeah, didn't make it in. Good news is that after kicking the tires with Kibana looks like it'll be pretty easy to do if I get some time after work.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r209657628
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml ---
    @@ -127,8 +133,113 @@
                 <version>5.6.8</version>
                 <scope>compile</scope>
             </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-avro-record-utils</artifactId>
    +            <version>1.7.0-SNAPSHOT</version>
    +            <scope>compile</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-schema-registry-service-api</artifactId>
    +            <scope>compile</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.mockito</groupId>
    +            <artifactId>mockito-all</artifactId>
    +            <scope>test</scope>
    +        </dependency>
         </dependencies>
     
    +    <build>
    --- End diff --
    
    Probably best to remove the groovy and Jacoco stuff, let's get a discussion going on the dev mailing list about code coverage?


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r191744175
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,266 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.SchemaRegistryService;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +public class ElasticSearchLookupService extends SchemaRegistryService implements LookupService {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +
    +    public static final PropertyDescriptor RECORD_SCHEMA_NAME = new PropertyDescriptor.Builder()
    +        .name("el-lookup-record-schema-name")
    +        .displayName("Record Schema Name")
    +        .description("If specified, the value will be used to lookup a schema in the configured schema registry.")
    +        .required(false)
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    private ElasticSearchClientService clientService;
    +
    +    private String index;
    +    private String type;
    +    private ObjectMapper mapper;
    +
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) throws InitializationException {
    +        clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
    +        index = context.getProperty(INDEX).getValue();
    +        type  = context.getProperty(TYPE).getValue();
    --- End diff --
    
    ``.evaluateExpressionLanguage()`` ?


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r192083397
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml ---
    @@ -212,6 +212,31 @@
                         </execution>
                     </executions>
                 </plugin>
    +
    +            <plugin>
    +                <groupId>org.jacoco</groupId>
    --- End diff --
    
    If we were to add a code coverage plugin to Maven, this is probably something that should be added to the root-level pom (and disabled by default?) What was the impetus for including it in a single bundle?


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r184393411
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,290 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.schemaregistry.services.SchemaRegistry;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.SchemaIdentifier;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +
    +public class ElasticSearchLookupService extends AbstractControllerService implements LookupService {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
    +        .name("el-lookup-schema-registry")
    +        .displayName("Schema Registry")
    +        .description("If specified, this avro schema will be used for all objects loaded from MongoDB using this service. If left blank, " +
    +                "the service will attempt to determine the schema from the results.")
    +        .required(false)
    +        .identifiesControllerService(SchemaRegistry.class)
    +        .build();
    +
    +    public static final PropertyDescriptor RECORD_SCHEMA_NAME = new PropertyDescriptor.Builder()
    --- End diff --
    
    Most record-based processors (usually because they're in the same NAR) extend from SchemaRegistryService and thus offer the same schema-related properties. I realize you can't do that here (currently), but I think we should do one of two things: 1) Move SchemaRegistryService into the API, or 2) Offer the same properties as other record-based processors, including Schema Access Strategy, Schema Text, Schema Version, etc. I think the first one would be easier and more helpful for anyone creating a record-based processor in another bundle, what do you think?


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r192049919
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,253 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.SchemaRegistryService;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +public class ElasticSearchLookupService extends SchemaRegistryService implements LookupService {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +
    +    public static final PropertyDescriptor RECORD_SCHEMA_NAME = new PropertyDescriptor.Builder()
    --- End diff --
    
    Done.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r209656707
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml ---
    @@ -127,8 +133,113 @@
                 <version>5.6.8</version>
                 <scope>compile</scope>
             </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-avro-record-utils</artifactId>
    +            <version>1.7.0-SNAPSHOT</version>
    --- End diff --
    
    This one should be 1.8.0-SNAPSHOT now, sorry it's taken so long to get through


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r184689922
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml ---
    @@ -57,6 +57,18 @@
                 <scope>provided</scope>
             </dependency>
     
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-lookup-service-api</artifactId>
    +            <scope>provided</scope>
    +        </dependency>
    +
    +        <dependency>
    +            <groupId>org.apache.avro</groupId>
    +            <artifactId>avro</artifactId>
    +            <version>1.8.2</version>
    --- End diff --
    
    TBH I think that might have just been IntelliJ acting up.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r184770080
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,290 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.schemaregistry.services.SchemaRegistry;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.SchemaIdentifier;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +
    +public class ElasticSearchLookupService extends AbstractControllerService implements LookupService {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
    +        .name("el-lookup-schema-registry")
    +        .displayName("Schema Registry")
    +        .description("If specified, this avro schema will be used for all objects loaded from MongoDB using this service. If left blank, " +
    +                "the service will attempt to determine the schema from the results.")
    +        .required(false)
    +        .identifiesControllerService(SchemaRegistry.class)
    +        .build();
    +
    +    public static final PropertyDescriptor RECORD_SCHEMA_NAME = new PropertyDescriptor.Builder()
    --- End diff --
    
    Ok, I'll give it a shot.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r209658382
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,255 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.SchemaRegistryService;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +public class ElasticSearchLookupService extends SchemaRegistryService implements LookupService {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    private ElasticSearchClientService clientService;
    +
    +    private String index;
    +    private String type;
    +    private ObjectMapper mapper;
    +
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) throws InitializationException {
    +        clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
    +        index = context.getProperty(INDEX).evaluateAttributeExpressions().getValue();
    +        type  = context.getProperty(TYPE).evaluateAttributeExpressions().getValue();
    +        mapper = new ObjectMapper();
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        List<PropertyDescriptor> _desc = new ArrayList<>();
    --- End diff --
    
    Just a style nitpick, these can be set up in the constructor or a static block (I think the former is preferred?). Unless they're dynamic the list only needs to be created once, where this method gets called often IIRC.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r184851636
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,290 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.schemaregistry.services.SchemaRegistry;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.SchemaIdentifier;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +
    +public class ElasticSearchLookupService extends AbstractControllerService implements LookupService {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
    +        .name("el-lookup-schema-registry")
    +        .displayName("Schema Registry")
    +        .description("If specified, this avro schema will be used for all objects loaded from MongoDB using this service. If left blank, " +
    +                "the service will attempt to determine the schema from the results.")
    +        .required(false)
    +        .identifiesControllerService(SchemaRegistry.class)
    +        .build();
    +
    +    public static final PropertyDescriptor RECORD_SCHEMA_NAME = new PropertyDescriptor.Builder()
    --- End diff --
    
    @mattyb149 I got started on that, but I'm not sure how this is supposed to work. The only entry point that is standard for `LookupService` is `lookup(Map)`. How did you envision communicating the info for the other lookup strategies? The schema name one makes sense; it can be a property on the service or simply `schema.name` in the `Map` passed to `lookup`. The rest, I'm not sure about.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r184690560
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.java ---
    @@ -0,0 +1,246 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch.integration;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.elasticsearch.ElasticSearchClientService;
    +import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl;
    +import org.apache.nifi.elasticsearch.ElasticSearchLookupService;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.schemaregistry.services.SchemaRegistry;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Optional;
    +
    +public class ElasticSearchLookupService_IT {
    --- End diff --
    
    Ok, I'll add some.


---

[GitHub] nifi issue #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2615
  
    @mattyb149 Done.


---

[GitHub] nifi issue #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2615
  
    [ES_Lookup_Test.xml.txt](https://github.com/apache/nifi/files/2386507/ES_Lookup_Test.xml.txt)
    
    [5051 Kibana Commands](https://github.com/apache/nifi/files/2386508/5051.kibana.txt)
    
    [docker-compose.yml.txt](https://github.com/apache/nifi/files/2386510/docker-compose.yml.txt)
    
    



---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r184392070
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,290 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.schemaregistry.services.SchemaRegistry;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.SchemaIdentifier;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +
    +public class ElasticSearchLookupService extends AbstractControllerService implements LookupService {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    --- End diff --
    
    Since the lookup is performed on an incoming flow file, is there any reason the Index, Type, etc. properties couldn't support attributes coming from the flow file? If it is this way because the ES Client Service CS can't use them, perhaps we should write up a separate improvement Jira to do something like NIFI-5121.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r209707403
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,253 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.SchemaRegistryService;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +public class ElasticSearchLookupService extends SchemaRegistryService implements LookupService {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +
    +    public static final PropertyDescriptor RECORD_SCHEMA_NAME = new PropertyDescriptor.Builder()
    +        .name("el-lookup-record-schema-name")
    +        .displayName("Record Schema Name")
    +        .description("If specified, the value will be used to lookup a schema in the configured schema registry.")
    +        .required(false)
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    private ElasticSearchClientService clientService;
    +
    +    private String index;
    +    private String type;
    +    private ObjectMapper mapper;
    +
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) throws InitializationException {
    +        clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
    +        index = context.getProperty(INDEX).getValue();
    +        type  = context.getProperty(TYPE).getValue();
    +        mapper = new ObjectMapper();
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        List<PropertyDescriptor> _desc = new ArrayList<>();
    +        _desc.addAll(super.getSupportedPropertyDescriptors());
    +        _desc.add(CLIENT_SERVICE);
    +        _desc.add(INDEX);
    +        _desc.add(TYPE);
    +        _desc.add(RECORD_SCHEMA_NAME);
    +
    +        return Collections.unmodifiableList(_desc);
    +    }
    +
    +    @Override
    +    public Optional lookup(Map coordinates) throws LookupFailureException {
    --- End diff --
    
    TBH, I'm not sure. It's been a while. I'll look at let you know today. Got most of your feedback squared away.


---

[GitHub] nifi issue #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on the issue:

    https://github.com/apache/nifi/pull/2615
  
    Reviewing...


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r189727019
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,253 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.SchemaRegistryService;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +public class ElasticSearchLookupService extends SchemaRegistryService implements LookupService {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +
    +    public static final PropertyDescriptor RECORD_SCHEMA_NAME = new PropertyDescriptor.Builder()
    +        .name("el-lookup-record-schema-name")
    +        .displayName("Record Schema Name")
    +        .description("If specified, the value will be used to lookup a schema in the configured schema registry.")
    +        .required(false)
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    private ElasticSearchClientService clientService;
    +
    +    private String index;
    +    private String type;
    +    private ObjectMapper mapper;
    +
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) throws InitializationException {
    +        clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
    +        index = context.getProperty(INDEX).getValue();
    +        type  = context.getProperty(TYPE).getValue();
    +        mapper = new ObjectMapper();
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        List<PropertyDescriptor> _desc = new ArrayList<>();
    +        _desc.addAll(super.getSupportedPropertyDescriptors());
    +        _desc.add(CLIENT_SERVICE);
    +        _desc.add(INDEX);
    +        _desc.add(TYPE);
    +        _desc.add(RECORD_SCHEMA_NAME);
    +
    +        return Collections.unmodifiableList(_desc);
    +    }
    +
    +    @Override
    +    public Optional lookup(Map coordinates) throws LookupFailureException {
    --- End diff --
    
    Yeah, we can do that. I think what it would look like is this:
    
    ```
    {
        "bool": {
            "must": [
                 {
                     "match": {
                         "username": "john.smith"
                     }
                 },
                 {
                     "match": {
                         "email": "john.smith@company.com"
                     }
                 }
             ]
        }
    }
    ```
    
    For input:
    
    ```
    {
        "username": "john.smith",
         "email": "john.smith@company.com"
    }
    ```
    
    as lookup keys.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r191742124
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml ---
    @@ -127,8 +133,104 @@
                 <version>5.6.8</version>
                 <scope>compile</scope>
             </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-avro-record-utils</artifactId>
    +            <version>1.7.0-SNAPSHOT</version>
    +            <scope>compile</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-schema-registry-service-api</artifactId>
    +            <scope>compile</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.mockito</groupId>
    +            <artifactId>mockito-all</artifactId>
    +            <scope>test</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-avro-record-utils</artifactId>
    +            <version>1.7.0-SNAPSHOT</version>
    +            <scope>compile</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-avro-record-utils</artifactId>
    +            <version>1.7.0-SNAPSHOT</version>
    +            <scope>compile</scope>
    +        </dependency>
         </dependencies>
     
    +    <build>
    +        <plugins>
    +            <plugin>
    +                <groupId>org.apache.maven.plugins</groupId>
    +                <artifactId>maven-compiler-plugin</artifactId>
    +                <executions>
    +                    <!-- Only run for tests -->
    +                    <execution>
    +                        <id>groovy-tests</id>
    +                        <goals>
    +                            <goal>testCompile</goal>
    +                        </goals>
    +                        <configuration>
    +                            <compilerId>groovy-eclipse-compiler</compilerId>
    +                        </configuration>
    +                    </execution>
    +                </executions>
    +                <configuration>
    +                    <source>1.8</source>
    --- End diff --
    
    Do we want to have this kind of configuration in low-level poms? Wondering if it'd be an issue with current modifications to support Java 9/10


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r184690426
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,290 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.schemaregistry.services.SchemaRegistry;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.SchemaIdentifier;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +
    +public class ElasticSearchLookupService extends AbstractControllerService implements LookupService {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    --- End diff --
    
    I could definitely see some value to that, but since this is a LookupService implementation, we should discuss it in that context. NIFI-5121 only describes one particular interface, and LookupService is more expansive in use.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r192050876
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,266 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.SchemaRegistryService;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +public class ElasticSearchLookupService extends SchemaRegistryService implements LookupService {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +
    +    public static final PropertyDescriptor RECORD_SCHEMA_NAME = new PropertyDescriptor.Builder()
    +        .name("el-lookup-record-schema-name")
    +        .displayName("Record Schema Name")
    +        .description("If specified, the value will be used to lookup a schema in the configured schema registry.")
    +        .required(false)
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    private ElasticSearchClientService clientService;
    +
    +    private String index;
    +    private String type;
    +    private ObjectMapper mapper;
    +
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) throws InitializationException {
    +        clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
    +        index = context.getProperty(INDEX).getValue();
    +        type  = context.getProperty(TYPE).getValue();
    --- End diff --
    
    Done.


---

[GitHub] nifi issue #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on the issue:

    https://github.com/apache/nifi/pull/2615
  
    +1 LGTM, ran full build with unit tests, tried the lookup service with a nested record and everything worked fine. Thanks for the improvement! Merging to master


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r184390680
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml ---
    @@ -120,6 +132,22 @@
                 <version>1.7.0-SNAPSHOT</version>
                 <scope>provided</scope>
             </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-avro-record-utils</artifactId>
    +            <version>1.7.0-SNAPSHOT</version>
    +            <scope>compile</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-avro-record-utils</artifactId>
    --- End diff --
    
    This is included twice


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r184396268
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml ---
    @@ -57,6 +57,18 @@
                 <scope>provided</scope>
             </dependency>
     
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-lookup-service-api</artifactId>
    +            <scope>provided</scope>
    +        </dependency>
    +
    +        <dependency>
    +            <groupId>org.apache.avro</groupId>
    +            <artifactId>avro</artifactId>
    +            <version>1.8.2</version>
    --- End diff --
    
    Won't nifi-avro-record-utils bring in the Avro library?


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r192050600
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml ---
    @@ -127,8 +133,104 @@
                 <version>5.6.8</version>
                 <scope>compile</scope>
             </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-avro-record-utils</artifactId>
    +            <version>1.7.0-SNAPSHOT</version>
    +            <scope>compile</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-schema-registry-service-api</artifactId>
    +            <scope>compile</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.mockito</groupId>
    +            <artifactId>mockito-all</artifactId>
    +            <scope>test</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-avro-record-utils</artifactId>
    +            <version>1.7.0-SNAPSHOT</version>
    +            <scope>compile</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-avro-record-utils</artifactId>
    +            <version>1.7.0-SNAPSHOT</version>
    +            <scope>compile</scope>
    +        </dependency>
         </dependencies>
     
    +    <build>
    +        <plugins>
    +            <plugin>
    +                <groupId>org.apache.maven.plugins</groupId>
    +                <artifactId>maven-compiler-plugin</artifactId>
    +                <executions>
    +                    <!-- Only run for tests -->
    +                    <execution>
    +                        <id>groovy-tests</id>
    +                        <goals>
    +                            <goal>testCompile</goal>
    +                        </goals>
    +                        <configuration>
    +                            <compilerId>groovy-eclipse-compiler</compilerId>
    +                        </configuration>
    +                    </execution>
    +                </executions>
    +                <configuration>
    +                    <source>1.8</source>
    --- End diff --
    
    Probably not. Removed.


---

[GitHub] nifi issue #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on the issue:

    https://github.com/apache/nifi/pull/2615
  
    Reviewing...


---

[GitHub] nifi issue #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2615
  
    @mattyb149 Can you comment on the schema detection strategy issue I raised here?


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r184395918
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.java ---
    @@ -0,0 +1,246 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch.integration;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.elasticsearch.ElasticSearchClientService;
    +import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl;
    +import org.apache.nifi.elasticsearch.ElasticSearchLookupService;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.schemaregistry.services.SchemaRegistry;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Optional;
    +
    +public class ElasticSearchLookupService_IT {
    --- End diff --
    
    It would be nice to have some unit tests as well, since the integration tests do not get run as part of any automated build. I think you could mock the ES Client Service or something? There appears to be something similar in TestFetchElasticsearchHttp (and the other ES processor unit tests).


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r184407808
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,290 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.schemaregistry.services.SchemaRegistry;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.SchemaIdentifier;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +
    +public class ElasticSearchLookupService extends AbstractControllerService implements LookupService {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
    +        .name("el-lookup-schema-registry")
    +        .displayName("Schema Registry")
    +        .description("If specified, this avro schema will be used for all objects loaded from MongoDB using this service. If left blank, " +
    +                "the service will attempt to determine the schema from the results.")
    +        .required(false)
    +        .identifiesControllerService(SchemaRegistry.class)
    +        .build();
    +
    +    public static final PropertyDescriptor RECORD_SCHEMA_NAME = new PropertyDescriptor.Builder()
    --- End diff --
    
    I put up #2661 to address option 1 above


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r209707241
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearch5ClientService_IT.groovy ---
    @@ -0,0 +1,147 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License") you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch.integration
    +
    +import org.apache.nifi.elasticsearch.DeleteOperationResponse
    +import org.apache.nifi.elasticsearch.ElasticSearchClientService
    +import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl
    +import org.apache.nifi.elasticsearch.SearchResponse
    +import org.apache.nifi.util.TestRunner
    +import org.apache.nifi.util.TestRunners
    +import org.junit.After
    +import org.junit.Assert
    +import org.junit.Before
    +import org.junit.Test
    +
    +import static groovy.json.JsonOutput.*
    --- End diff --
    
    Manually fixed that.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r217870054
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.JsonInferenceSchemaRegistryService;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryService implements LookupService<Record> {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    private ElasticSearchClientService clientService;
    +
    +    private String index;
    +    private String type;
    +    private ObjectMapper mapper;
    +
    +    private final List<PropertyDescriptor> DESCRIPTORS;
    +
    +    public ElasticSearchLookupService() {
    +        List<PropertyDescriptor> _desc = new ArrayList<>();
    +        _desc.addAll(super.getSupportedPropertyDescriptors());
    +        _desc.add(CLIENT_SERVICE);
    +        _desc.add(INDEX);
    +        _desc.add(TYPE);
    +        DESCRIPTORS = Collections.unmodifiableList(_desc);
    +    }
    +
    +    @Override
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) {
    +        clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
    +        index = context.getProperty(INDEX).evaluateAttributeExpressions().getValue();
    +        type  = context.getProperty(TYPE).evaluateAttributeExpressions().getValue();
    +        mapper = new ObjectMapper();
    +
    +        super.onEnabled(context);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return DESCRIPTORS;
    +    }
    +
    +    @Override
    +    public Optional<Record> lookup(Map<String, Object> coordinates) throws LookupFailureException {
    +        Map<String, String> context = coordinates.entrySet().stream()
    +            .collect(Collectors.toMap(
    +                e -> e.getKey(),
    +                e -> e.getValue().toString()
    +            ));
    +        return lookup(coordinates, context);
    +    }
    +
    +    @Override
    +    public Optional<Record> lookup(Map<String, Object> coordinates, Map<String, String> context) throws LookupFailureException {
    +        validateCoordinates(coordinates);
    +
    +        try {
    +            Record record;
    +            if (coordinates.containsKey("_id")) {
    +                record = getById((String)coordinates.get("_id"), context);
    +            } else {
    +                record = getByQuery(coordinates, context);
    +            }
    +
    +            return record == null ? Optional.empty() : Optional.of(record);
    +        } catch (Exception ex) {
    +            getLogger().error("Error during lookup.", ex);
    +            throw new LookupFailureException(ex);
    +        }
    +    }
    +
    +    private RecordSchema getSchemaFromCoordinates(Map<String, Object> coordinates) {
    +        Map<String, String> variables = coordinates.entrySet().stream()
    +            .collect(Collectors.toMap(
    +                e -> e.getKey(),
    +                e -> e.getValue().toString()
    +            ));
    +        try {
    +            return getSchema(variables, null);
    +        } catch (SchemaNotFoundException | IOException e) {
    +            if (getLogger().isDebugEnabled()) {
    +                getLogger().debug("Could not load schema, will create one from the results.", e);
    +            }
    +            return null;
    +        }
    +    }
    +
    +    private void validateCoordinates(Map coordinates) throws LookupFailureException {
    +        List<String> reasons = new ArrayList<>();
    +
    +        if (coordinates.containsKey("_id") && !(coordinates.get("_id") instanceof String)) {
    +            reasons.add("_id was supplied, but it was not a String.");
    +        }
    +
    +        if (coordinates.containsKey("_id") && coordinates.size() > 1) {
    +            reasons.add("When _id is used, it can be the only key used in the lookup.");
    +        }
    +
    +        if (reasons.size() > 0) {
    +            String error = String.join("\n", reasons);
    +            throw new LookupFailureException(error);
    +        }
    +    }
    +
    +    private Record getById(final String _id, Map<String, String> context) throws IOException, LookupFailureException, SchemaNotFoundException {
    +        Map<String, Object> query = new HashMap<String, Object>(){{
    +            put("query", new HashMap<String, Object>() {{
    +                put("match", new HashMap<String, String>(){{
    +                    put("_id", _id);
    +                }});
    +            }});
    +        }};
    +
    +        String json = mapper.writeValueAsString(query);
    +
    +        SearchResponse response = clientService.search(json, index, type);
    +
    +        if (response.getNumberOfHits() > 1) {
    +            throw new LookupFailureException(String.format("Expected 1 response, got %d for query %s",
    +                response.getNumberOfHits(), json));
    +        } else if (response.getNumberOfHits() == 0) {
    +            return null;
    +        }
    +
    +        final Map<String, Object> source = (Map)response.getHits().get(0).get("_source");
    +
    +        RecordSchema toUse = getSchema(context, source, null);
    +
    +        return new MapRecord(toUse, source);
    +    }
    +
    +    Map<String, Object> getNested(String key, Object value) {
    +        String path = key.substring(0, key.lastIndexOf("."));
    +
    +        return new HashMap<String, Object>(){{
    +            put("path", path);
    +            put("query", new HashMap<String, Object>(){{
    +                put("match", new HashMap<String, Object>(){{
    +                    put(key, value);
    +                }});
    +            }});
    +        }};
    +    }
    +
    +    private Map<String, Object> buildQuery(Map<String, Object> coordinates) {
    +        Map<String, Object> query = new HashMap<String, Object>(){{
    +            put("bool", new HashMap<String, Object>(){{
    +                put("must", coordinates.entrySet().stream()
    +                    .map(e -> new HashMap<String, Object>(){{
    +                        if (e.getKey().contains(".")) {
    --- End diff --
    
    I've run the unit and integration tests and the code LGTM, but I'd feel better if I could get an example going where I do the lookup on a field that's not at the top level. I have a document containing a "user" field, which contains other fields such as "name", and "name" contains other fields like "first" and "last". I tried using this with a simple CSV input containing an id and a first name, and tried to use the lookup service to match "user.name.first" and return the value of "user.name.last", but got an error saying I was trying to do a nested query on a field that wasn't nested. I didn't add an explicit mapping for the index, just put the complex JSON docs into ES. Am I configuring it wrong, or is this not supported, or could there be a bug?


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r191744098
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,266 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.SchemaRegistryService;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +public class ElasticSearchLookupService extends SchemaRegistryService implements LookupService {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +
    +    public static final PropertyDescriptor RECORD_SCHEMA_NAME = new PropertyDescriptor.Builder()
    +        .name("el-lookup-record-schema-name")
    +        .displayName("Record Schema Name")
    +        .description("If specified, the value will be used to lookup a schema in the configured schema registry.")
    +        .required(false)
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    private ElasticSearchClientService clientService;
    +
    +    private String index;
    +    private String type;
    +    private ObjectMapper mapper;
    +
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) throws InitializationException {
    +        clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
    +        index = context.getProperty(INDEX).getValue();
    --- End diff --
    
    ``.evaluateExpressionLanguage()`` ?


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r217756404
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.JsonInferenceSchemaRegistryService;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryService implements LookupService<Record> {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    private ElasticSearchClientService clientService;
    +
    +    private String index;
    +    private String type;
    +    private ObjectMapper mapper;
    +
    +    private final List<PropertyDescriptor> DESCRIPTORS;
    +
    +    ElasticSearchLookupService() {
    --- End diff --
    
    Done. NiFi is able to load it and assign it as the lookupservice for lookuprecord.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r218170885
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.JsonInferenceSchemaRegistryService;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryService implements LookupService<Record> {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    private ElasticSearchClientService clientService;
    +
    +    private String index;
    +    private String type;
    +    private ObjectMapper mapper;
    +
    +    private final List<PropertyDescriptor> DESCRIPTORS;
    +
    +    public ElasticSearchLookupService() {
    +        List<PropertyDescriptor> _desc = new ArrayList<>();
    +        _desc.addAll(super.getSupportedPropertyDescriptors());
    +        _desc.add(CLIENT_SERVICE);
    +        _desc.add(INDEX);
    +        _desc.add(TYPE);
    +        DESCRIPTORS = Collections.unmodifiableList(_desc);
    +    }
    +
    +    @Override
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) {
    +        clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
    +        index = context.getProperty(INDEX).evaluateAttributeExpressions().getValue();
    +        type  = context.getProperty(TYPE).evaluateAttributeExpressions().getValue();
    +        mapper = new ObjectMapper();
    +
    +        super.onEnabled(context);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return DESCRIPTORS;
    +    }
    +
    +    @Override
    +    public Optional<Record> lookup(Map<String, Object> coordinates) throws LookupFailureException {
    +        Map<String, String> context = coordinates.entrySet().stream()
    +            .collect(Collectors.toMap(
    +                e -> e.getKey(),
    +                e -> e.getValue().toString()
    +            ));
    +        return lookup(coordinates, context);
    +    }
    +
    +    @Override
    +    public Optional<Record> lookup(Map<String, Object> coordinates, Map<String, String> context) throws LookupFailureException {
    +        validateCoordinates(coordinates);
    +
    +        try {
    +            Record record;
    +            if (coordinates.containsKey("_id")) {
    +                record = getById((String)coordinates.get("_id"), context);
    +            } else {
    +                record = getByQuery(coordinates, context);
    +            }
    +
    +            return record == null ? Optional.empty() : Optional.of(record);
    +        } catch (Exception ex) {
    +            getLogger().error("Error during lookup.", ex);
    +            throw new LookupFailureException(ex);
    +        }
    +    }
    +
    +    private RecordSchema getSchemaFromCoordinates(Map<String, Object> coordinates) {
    +        Map<String, String> variables = coordinates.entrySet().stream()
    +            .collect(Collectors.toMap(
    +                e -> e.getKey(),
    +                e -> e.getValue().toString()
    +            ));
    +        try {
    +            return getSchema(variables, null);
    +        } catch (SchemaNotFoundException | IOException e) {
    +            if (getLogger().isDebugEnabled()) {
    +                getLogger().debug("Could not load schema, will create one from the results.", e);
    +            }
    +            return null;
    +        }
    +    }
    +
    +    private void validateCoordinates(Map coordinates) throws LookupFailureException {
    +        List<String> reasons = new ArrayList<>();
    +
    +        if (coordinates.containsKey("_id") && !(coordinates.get("_id") instanceof String)) {
    +            reasons.add("_id was supplied, but it was not a String.");
    +        }
    +
    +        if (coordinates.containsKey("_id") && coordinates.size() > 1) {
    +            reasons.add("When _id is used, it can be the only key used in the lookup.");
    +        }
    +
    +        if (reasons.size() > 0) {
    +            String error = String.join("\n", reasons);
    +            throw new LookupFailureException(error);
    +        }
    +    }
    +
    +    private Record getById(final String _id, Map<String, String> context) throws IOException, LookupFailureException, SchemaNotFoundException {
    +        Map<String, Object> query = new HashMap<String, Object>(){{
    +            put("query", new HashMap<String, Object>() {{
    +                put("match", new HashMap<String, String>(){{
    +                    put("_id", _id);
    +                }});
    +            }});
    +        }};
    +
    +        String json = mapper.writeValueAsString(query);
    +
    +        SearchResponse response = clientService.search(json, index, type);
    +
    +        if (response.getNumberOfHits() > 1) {
    +            throw new LookupFailureException(String.format("Expected 1 response, got %d for query %s",
    +                response.getNumberOfHits(), json));
    +        } else if (response.getNumberOfHits() == 0) {
    +            return null;
    +        }
    +
    +        final Map<String, Object> source = (Map)response.getHits().get(0).get("_source");
    +
    +        RecordSchema toUse = getSchema(context, source, null);
    +
    +        return new MapRecord(toUse, source);
    +    }
    +
    +    Map<String, Object> getNested(String key, Object value) {
    +        String path = key.substring(0, key.lastIndexOf("."));
    +
    +        return new HashMap<String, Object>(){{
    +            put("path", path);
    +            put("query", new HashMap<String, Object>(){{
    +                put("match", new HashMap<String, Object>(){{
    +                    put(key, value);
    +                }});
    +            }});
    +        }};
    +    }
    +
    +    private Map<String, Object> buildQuery(Map<String, Object> coordinates) {
    +        Map<String, Object> query = new HashMap<String, Object>(){{
    +            put("bool", new HashMap<String, Object>(){{
    +                put("must", coordinates.entrySet().stream()
    +                    .map(e -> new HashMap<String, Object>(){{
    +                        if (e.getKey().contains(".")) {
    --- End diff --
    
    See my comment below. It has a sample flow, the commands for Kibana and a docker compose file.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r209699519
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,255 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.SchemaRegistryService;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +public class ElasticSearchLookupService extends SchemaRegistryService implements LookupService {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    private ElasticSearchClientService clientService;
    +
    +    private String index;
    +    private String type;
    +    private ObjectMapper mapper;
    +
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) throws InitializationException {
    +        clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
    +        index = context.getProperty(INDEX).evaluateAttributeExpressions().getValue();
    +        type  = context.getProperty(TYPE).evaluateAttributeExpressions().getValue();
    +        mapper = new ObjectMapper();
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        List<PropertyDescriptor> _desc = new ArrayList<>();
    --- End diff --
    
    Done.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r192169823
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml ---
    @@ -212,6 +212,31 @@
                         </execution>
                     </executions>
                 </plugin>
    +
    +            <plugin>
    +                <groupId>org.jacoco</groupId>
    --- End diff --
    
    I'm not sure how well that would work at root level because there are plenty of integration tests that have to be run to get a full sense of code coverage. So maybe I should back this out or one of you can drop it when rebasing for a merge if you think it makes more sense to add a root level profile for code coverage.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r189753159
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,253 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.SchemaRegistryService;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +public class ElasticSearchLookupService extends SchemaRegistryService implements LookupService {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +
    +    public static final PropertyDescriptor RECORD_SCHEMA_NAME = new PropertyDescriptor.Builder()
    +        .name("el-lookup-record-schema-name")
    +        .displayName("Record Schema Name")
    +        .description("If specified, the value will be used to lookup a schema in the configured schema registry.")
    +        .required(false)
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    private ElasticSearchClientService clientService;
    +
    +    private String index;
    +    private String type;
    +    private ObjectMapper mapper;
    +
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) throws InitializationException {
    +        clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
    +        index = context.getProperty(INDEX).getValue();
    +        type  = context.getProperty(TYPE).getValue();
    +        mapper = new ObjectMapper();
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        List<PropertyDescriptor> _desc = new ArrayList<>();
    +        _desc.addAll(super.getSupportedPropertyDescriptors());
    +        _desc.add(CLIENT_SERVICE);
    +        _desc.add(INDEX);
    +        _desc.add(TYPE);
    +        _desc.add(RECORD_SCHEMA_NAME);
    +
    +        return Collections.unmodifiableList(_desc);
    +    }
    +
    +    @Override
    +    public Optional lookup(Map coordinates) throws LookupFailureException {
    --- End diff --
    
    @mattyb149 I'm going to work on knocking the basic query builder changes out tonight. Could be a big change, so I apologize in advance :)


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r189705031
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,253 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.SchemaRegistryService;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +public class ElasticSearchLookupService extends SchemaRegistryService implements LookupService {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +
    +    public static final PropertyDescriptor RECORD_SCHEMA_NAME = new PropertyDescriptor.Builder()
    --- End diff --
    
    Thought I removed that.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/2615


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r209658506
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,253 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.SchemaRegistryService;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +public class ElasticSearchLookupService extends SchemaRegistryService implements LookupService {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +
    +    public static final PropertyDescriptor RECORD_SCHEMA_NAME = new PropertyDescriptor.Builder()
    +        .name("el-lookup-record-schema-name")
    +        .displayName("Record Schema Name")
    +        .description("If specified, the value will be used to lookup a schema in the configured schema registry.")
    +        .required(false)
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    private ElasticSearchClientService clientService;
    +
    +    private String index;
    +    private String type;
    +    private ObjectMapper mapper;
    +
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) throws InitializationException {
    +        clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
    +        index = context.getProperty(INDEX).getValue();
    +        type  = context.getProperty(TYPE).getValue();
    +        mapper = new ObjectMapper();
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        List<PropertyDescriptor> _desc = new ArrayList<>();
    +        _desc.addAll(super.getSupportedPropertyDescriptors());
    +        _desc.add(CLIENT_SERVICE);
    +        _desc.add(INDEX);
    +        _desc.add(TYPE);
    +        _desc.add(RECORD_SCHEMA_NAME);
    +
    +        return Collections.unmodifiableList(_desc);
    +    }
    +
    +    @Override
    +    public Optional lookup(Map coordinates) throws LookupFailureException {
    --- End diff --
    
    Did those changes make it into the latest PR?


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r217746687
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.JsonInferenceSchemaRegistryService;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryService implements LookupService<Record> {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    private ElasticSearchClientService clientService;
    +
    +    private String index;
    +    private String type;
    +    private ObjectMapper mapper;
    +
    +    private final List<PropertyDescriptor> DESCRIPTORS;
    +
    +    ElasticSearchLookupService() {
    --- End diff --
    
    I think this has to be public for ServiceLoader to find it, I'm getting errors when trying to load it into NiFi.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r209658892
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearch5ClientService_IT.groovy ---
    @@ -0,0 +1,147 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License") you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch.integration
    +
    +import org.apache.nifi.elasticsearch.DeleteOperationResponse
    +import org.apache.nifi.elasticsearch.ElasticSearchClientService
    +import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl
    +import org.apache.nifi.elasticsearch.SearchResponse
    +import org.apache.nifi.util.TestRunner
    +import org.apache.nifi.util.TestRunners
    +import org.junit.After
    +import org.junit.Assert
    +import org.junit.Before
    +import org.junit.Test
    +
    +import static groovy.json.JsonOutput.*
    --- End diff --
    
    This shouldn't pass CheckStyle as we don't allow star imports in Java, we probably just don't have an existing (or complete) CheckStyle rule for Groovy files.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r217869883
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml ---
    @@ -150,6 +172,7 @@
                                 <httpPort>9400</httpPort>
                                 <version>5.6.2</version>
                                 <timeout>90</timeout>
    +                            <pathInitScript>src/test/resources/setup.script</pathInitScript>
    --- End diff --
    
    I don't think this works if you run -Pintegration-tests from the bundle level, but it does from the client-service level


---

[GitHub] nifi issue #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2615
  
    @mattyb149 can we merge?


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r191742961
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,253 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.SchemaRegistryService;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +public class ElasticSearchLookupService extends SchemaRegistryService implements LookupService {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +
    +    public static final PropertyDescriptor RECORD_SCHEMA_NAME = new PropertyDescriptor.Builder()
    --- End diff --
    
    Do you want to remove it @MikeThomsen ?


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r184689786
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,290 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.schemaregistry.services.SchemaRegistry;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.SchemaIdentifier;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +
    +public class ElasticSearchLookupService extends AbstractControllerService implements LookupService {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
    +        .name("el-lookup-schema-registry")
    +        .displayName("Schema Registry")
    +        .description("If specified, this avro schema will be used for all objects loaded from MongoDB using this service. If left blank, " +
    +                "the service will attempt to determine the schema from the results.")
    +        .required(false)
    +        .identifiesControllerService(SchemaRegistry.class)
    +        .build();
    +
    +    public static final PropertyDescriptor RECORD_SCHEMA_NAME = new PropertyDescriptor.Builder()
    +        .name("el-lookup-record-schema-name")
    +        .displayName("Record Schema Name")
    +        .description("If specified, the value will be used to lookup a schema in the configured schema registry.")
    +        .required(false)
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    static final List<PropertyDescriptor> lookupDescriptors;
    +
    +    static {
    +        List<PropertyDescriptor> _desc = new ArrayList<>();
    +        _desc.add(CLIENT_SERVICE);
    +        _desc.add(INDEX);
    +        _desc.add(TYPE);
    +        _desc.add(SCHEMA_REGISTRY);
    +        _desc.add(RECORD_SCHEMA_NAME);
    +
    +        lookupDescriptors = Collections.unmodifiableList(_desc);
    +    }
    +
    +    private ElasticSearchClientService clientService;
    +
    +    private String index;
    +    private String type;
    +    private ObjectMapper mapper;
    +    private RecordSchema recordSchema;
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
    +        List<ValidationResult> problems = new ArrayList<>();
    +
    +        PropertyValue registry = validationContext.getProperty(SCHEMA_REGISTRY);
    +        PropertyValue schemaName = validationContext.getProperty(RECORD_SCHEMA_NAME);
    +
    +        if (registry.isSet() && !schemaName.isSet()) {
    +            problems.add(new ValidationResult.Builder()
    +                    .explanation("If the registry is set, the schema name parameter must be set too.")
    +                    .build());
    +        } else if (!registry.isSet() && schemaName.isSet()) {
    +            problems.add(new ValidationResult.Builder()
    +                    .explanation("If the schema name is set, the schema registry parameter must be set too.")
    +                    .build());
    +        }
    +
    +        return problems;
    +    }
    +
    +
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) throws InitializationException {
    +        clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
    +        index = context.getProperty(INDEX).getValue();
    +        type  = context.getProperty(TYPE).getValue();
    +        mapper = new ObjectMapper();
    +
    +        SchemaRegistry registry = context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
    +        final String name = context.getProperty(RECORD_SCHEMA_NAME).getValue();
    +        if (registry != null) {
    +            try {
    +                SchemaIdentifier identifier = SchemaIdentifier.builder()
    +                    .name(name)
    +                    .build();
    +                recordSchema = registry.retrieveSchema(identifier);
    +            } catch (Exception ex) {
    +                getLogger().error(String.format("Could not find schema named %s", name), ex);
    +                throw new InitializationException(ex);
    +            }
    +        }
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return lookupDescriptors;
    +    }
    +
    +    @Override
    +    public Optional lookup(Map coordinates) throws LookupFailureException {
    +        validateCoordinates(coordinates);
    +
    +        try {
    +            Record record;
    +            if (coordinates.containsKey("_id")) {
    +                record = getById((String)coordinates.get("_id"));
    +            } else {
    +                record = getByQuery((String)coordinates.get("query"));
    +            }
    +
    +            return record == null ? Optional.empty() : Optional.of(record);
    +        } catch (IOException ex) {
    +            getLogger().error("Error during lookup.", ex);
    +            throw new LookupFailureException(ex);
    +        }
    +    }
    +
    +    private void validateCoordinates(Map coordinates) throws LookupFailureException {
    +        List<String> reasons = new ArrayList<>();
    +
    +        if (coordinates.containsKey("_id") && !(coordinates.get("_id") instanceof String)) {
    +            reasons.add("_id was supplied, but it was not a String.");
    +        } else if (coordinates.containsKey("query") && !(coordinates.get("query") instanceof String)) {
    +            reasons.add("query was supplied, but it was not a String.");
    +        } else if (!coordinates.containsKey("_id") && !coordinates.containsKey("query")) {
    +            reasons.add("Either \"_id\" or \"query\" must be supplied as keys to lookup(Map)");
    +        } else if (coordinates.containsKey("_id") && coordinates.containsKey("query")) {
    +            reasons.add("\"_id\" and \"query\" cannot be used at the same time as keys.");
    +        }
    +
    +        if (reasons.size() > 0) {
    +            String error = String.join("\n", reasons);
    +            throw new LookupFailureException(error);
    +        }
    +    }
    +
    +    private Record getById(final String _id) throws IOException, LookupFailureException {
    +        Map<String, Object> query = new HashMap<String, Object>(){{
    +            put("query", new HashMap<String, Object>() {{
    +                put("match", new HashMap<String, String>(){{
    +                    put("_id", _id);
    +                }});
    +            }});
    +        }};
    +
    +        String json = mapper.writeValueAsString(query);
    +
    +        SearchResponse response = clientService.search(json, index, type);
    +
    +        if (response.getNumberOfHits() > 1) {
    +            throw new LookupFailureException(String.format("Expected 1 response, got %d for query %s",
    +                response.getNumberOfHits(), json));
    +        } else if (response.getNumberOfHits() == 0) {
    +            return null;
    +        }
    +
    +        final Map<String, Object> source = (Map)response.getHits().get(0).get("_source");
    +
    +        RecordSchema toUse = recordSchema != null ? recordSchema : convertSchema(source);
    +
    +        return new MapRecord(toUse, source);
    +    }
    +
    +    private Record getByQuery(final String query) throws LookupFailureException {
    +        Map<String, Object> parsed;
    +        try {
    +            parsed = mapper.readValue(query, Map.class);
    +            parsed.remove("from");
    +            parsed.remove("aggs");
    +            parsed.put("size", 1);
    +
    +            final String json = mapper.writeValueAsString(parsed);
    +
    +            SearchResponse response = clientService.search(json, index, type);
    +
    +            if (response.getNumberOfHits() == 0) {
    +                return null;
    +            } else {
    +                final Map<String, Object> source = (Map)response.getHits().get(0).get("_source");
    +                RecordSchema toUse = recordSchema != null ? recordSchema : convertSchema(source);
    +                return new MapRecord(toUse, source);
    +            }
    +
    +        } catch (IOException e) {
    +            throw new LookupFailureException(e);
    +        }
    +    }
    +
    +    private RecordSchema convertSchema(Map<String, Object> result) {
    --- End diff --
    
    I agree. I just wrote up [a Jira ticket](https://issues.apache.org/jira/browse/NIFI-5127) for this. Let's table it for now because we need to think about things like date strings.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r189696292
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,253 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.SchemaRegistryService;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +public class ElasticSearchLookupService extends SchemaRegistryService implements LookupService {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +
    +    public static final PropertyDescriptor RECORD_SCHEMA_NAME = new PropertyDescriptor.Builder()
    +        .name("el-lookup-record-schema-name")
    +        .displayName("Record Schema Name")
    +        .description("If specified, the value will be used to lookup a schema in the configured schema registry.")
    +        .required(false)
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    private ElasticSearchClientService clientService;
    +
    +    private String index;
    +    private String type;
    +    private ObjectMapper mapper;
    +
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) throws InitializationException {
    +        clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
    +        index = context.getProperty(INDEX).getValue();
    +        type  = context.getProperty(TYPE).getValue();
    +        mapper = new ObjectMapper();
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        List<PropertyDescriptor> _desc = new ArrayList<>();
    +        _desc.addAll(super.getSupportedPropertyDescriptors());
    +        _desc.add(CLIENT_SERVICE);
    +        _desc.add(INDEX);
    +        _desc.add(TYPE);
    +        _desc.add(RECORD_SCHEMA_NAME);
    +
    +        return Collections.unmodifiableList(_desc);
    +    }
    +
    +    @Override
    +    public Optional lookup(Map coordinates) throws LookupFailureException {
    --- End diff --
    
    I kinda thought this LookupService would behave a bit like the Mongo one, where you could give it multiple keys and it would do the query based on that (the fields and the values for each record). This one seems a bit awkward to me, as the user would have to build up their own query field in each record, putting the value they want to match inside a JSON query body.
    
    Is there a different use case here, or could/should we make it more consistent with the other "NoSQL" lookup service(s)? We'd have to generate the query body but that shouldn't be too hard. Also you'd only be able to query top-level fields for lookup, but that seems like it would cover most use cases. If there is a way to specify a nested field for lookup (such as a qualified name with period delimiters), we could do that (although we'd likely have to use a "nested" operator in the generated query), seems like a good (but separate) improvement. Thoughts?


---

[GitHub] nifi issue #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2615
  
    @pvillard31 @mattyb149 changes checked in.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r218162804
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.JsonInferenceSchemaRegistryService;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryService implements LookupService<Record> {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    private ElasticSearchClientService clientService;
    +
    +    private String index;
    +    private String type;
    +    private ObjectMapper mapper;
    +
    +    private final List<PropertyDescriptor> DESCRIPTORS;
    +
    +    public ElasticSearchLookupService() {
    +        List<PropertyDescriptor> _desc = new ArrayList<>();
    +        _desc.addAll(super.getSupportedPropertyDescriptors());
    +        _desc.add(CLIENT_SERVICE);
    +        _desc.add(INDEX);
    +        _desc.add(TYPE);
    +        DESCRIPTORS = Collections.unmodifiableList(_desc);
    +    }
    +
    +    @Override
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) {
    +        clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
    +        index = context.getProperty(INDEX).evaluateAttributeExpressions().getValue();
    +        type  = context.getProperty(TYPE).evaluateAttributeExpressions().getValue();
    +        mapper = new ObjectMapper();
    +
    +        super.onEnabled(context);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return DESCRIPTORS;
    +    }
    +
    +    @Override
    +    public Optional<Record> lookup(Map<String, Object> coordinates) throws LookupFailureException {
    +        Map<String, String> context = coordinates.entrySet().stream()
    +            .collect(Collectors.toMap(
    +                e -> e.getKey(),
    +                e -> e.getValue().toString()
    +            ));
    +        return lookup(coordinates, context);
    +    }
    +
    +    @Override
    +    public Optional<Record> lookup(Map<String, Object> coordinates, Map<String, String> context) throws LookupFailureException {
    +        validateCoordinates(coordinates);
    +
    +        try {
    +            Record record;
    +            if (coordinates.containsKey("_id")) {
    +                record = getById((String)coordinates.get("_id"), context);
    +            } else {
    +                record = getByQuery(coordinates, context);
    +            }
    +
    +            return record == null ? Optional.empty() : Optional.of(record);
    +        } catch (Exception ex) {
    +            getLogger().error("Error during lookup.", ex);
    +            throw new LookupFailureException(ex);
    +        }
    +    }
    +
    +    private RecordSchema getSchemaFromCoordinates(Map<String, Object> coordinates) {
    +        Map<String, String> variables = coordinates.entrySet().stream()
    +            .collect(Collectors.toMap(
    +                e -> e.getKey(),
    +                e -> e.getValue().toString()
    +            ));
    +        try {
    +            return getSchema(variables, null);
    +        } catch (SchemaNotFoundException | IOException e) {
    +            if (getLogger().isDebugEnabled()) {
    +                getLogger().debug("Could not load schema, will create one from the results.", e);
    +            }
    +            return null;
    +        }
    +    }
    +
    +    private void validateCoordinates(Map coordinates) throws LookupFailureException {
    +        List<String> reasons = new ArrayList<>();
    +
    +        if (coordinates.containsKey("_id") && !(coordinates.get("_id") instanceof String)) {
    +            reasons.add("_id was supplied, but it was not a String.");
    +        }
    +
    +        if (coordinates.containsKey("_id") && coordinates.size() > 1) {
    +            reasons.add("When _id is used, it can be the only key used in the lookup.");
    +        }
    +
    +        if (reasons.size() > 0) {
    +            String error = String.join("\n", reasons);
    +            throw new LookupFailureException(error);
    +        }
    +    }
    +
    +    private Record getById(final String _id, Map<String, String> context) throws IOException, LookupFailureException, SchemaNotFoundException {
    +        Map<String, Object> query = new HashMap<String, Object>(){{
    +            put("query", new HashMap<String, Object>() {{
    +                put("match", new HashMap<String, String>(){{
    +                    put("_id", _id);
    +                }});
    +            }});
    +        }};
    +
    +        String json = mapper.writeValueAsString(query);
    +
    +        SearchResponse response = clientService.search(json, index, type);
    +
    +        if (response.getNumberOfHits() > 1) {
    +            throw new LookupFailureException(String.format("Expected 1 response, got %d for query %s",
    +                response.getNumberOfHits(), json));
    +        } else if (response.getNumberOfHits() == 0) {
    +            return null;
    +        }
    +
    +        final Map<String, Object> source = (Map)response.getHits().get(0).get("_source");
    +
    +        RecordSchema toUse = getSchema(context, source, null);
    +
    +        return new MapRecord(toUse, source);
    +    }
    +
    +    Map<String, Object> getNested(String key, Object value) {
    +        String path = key.substring(0, key.lastIndexOf("."));
    +
    +        return new HashMap<String, Object>(){{
    +            put("path", path);
    +            put("query", new HashMap<String, Object>(){{
    +                put("match", new HashMap<String, Object>(){{
    +                    put(key, value);
    +                }});
    +            }});
    +        }};
    +    }
    +
    +    private Map<String, Object> buildQuery(Map<String, Object> coordinates) {
    +        Map<String, Object> query = new HashMap<String, Object>(){{
    +            put("bool", new HashMap<String, Object>(){{
    +                put("must", coordinates.entrySet().stream()
    +                    .map(e -> new HashMap<String, Object>(){{
    +                        if (e.getKey().contains(".")) {
    --- End diff --
    
    I'm still getting the same error (nested object under path [user.name] is not of nested type) on my flow. I tried yours but I don't have any documents/mappings in ES (such as a doc with "subfield.longfield), can you share an example doc I can put in there? I have my own ES so I didn't start up the Docker Compose stuff you attached.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r210070170
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml ---
    @@ -127,8 +133,113 @@
                 <version>5.6.8</version>
                 <scope>compile</scope>
             </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-avro-record-utils</artifactId>
    +            <version>1.7.0-SNAPSHOT</version>
    +            <scope>compile</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-schema-registry-service-api</artifactId>
    +            <scope>compile</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.mockito</groupId>
    +            <artifactId>mockito-all</artifactId>
    +            <scope>test</scope>
    +        </dependency>
         </dependencies>
     
    +    <build>
    --- End diff --
    
    @alopresto thanks for explaining that. I just added a .gitignore file into src/test/java and that did the trick.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r217899955
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.JsonInferenceSchemaRegistryService;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryService implements LookupService<Record> {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    private ElasticSearchClientService clientService;
    +
    +    private String index;
    +    private String type;
    +    private ObjectMapper mapper;
    +
    +    private final List<PropertyDescriptor> DESCRIPTORS;
    +
    +    public ElasticSearchLookupService() {
    +        List<PropertyDescriptor> _desc = new ArrayList<>();
    +        _desc.addAll(super.getSupportedPropertyDescriptors());
    +        _desc.add(CLIENT_SERVICE);
    +        _desc.add(INDEX);
    +        _desc.add(TYPE);
    +        DESCRIPTORS = Collections.unmodifiableList(_desc);
    +    }
    +
    +    @Override
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) {
    +        clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
    +        index = context.getProperty(INDEX).evaluateAttributeExpressions().getValue();
    +        type  = context.getProperty(TYPE).evaluateAttributeExpressions().getValue();
    +        mapper = new ObjectMapper();
    +
    +        super.onEnabled(context);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return DESCRIPTORS;
    +    }
    +
    +    @Override
    +    public Optional<Record> lookup(Map<String, Object> coordinates) throws LookupFailureException {
    +        Map<String, String> context = coordinates.entrySet().stream()
    +            .collect(Collectors.toMap(
    +                e -> e.getKey(),
    +                e -> e.getValue().toString()
    +            ));
    +        return lookup(coordinates, context);
    +    }
    +
    +    @Override
    +    public Optional<Record> lookup(Map<String, Object> coordinates, Map<String, String> context) throws LookupFailureException {
    +        validateCoordinates(coordinates);
    +
    +        try {
    +            Record record;
    +            if (coordinates.containsKey("_id")) {
    +                record = getById((String)coordinates.get("_id"), context);
    +            } else {
    +                record = getByQuery(coordinates, context);
    +            }
    +
    +            return record == null ? Optional.empty() : Optional.of(record);
    +        } catch (Exception ex) {
    +            getLogger().error("Error during lookup.", ex);
    +            throw new LookupFailureException(ex);
    +        }
    +    }
    +
    +    private RecordSchema getSchemaFromCoordinates(Map<String, Object> coordinates) {
    +        Map<String, String> variables = coordinates.entrySet().stream()
    +            .collect(Collectors.toMap(
    +                e -> e.getKey(),
    +                e -> e.getValue().toString()
    +            ));
    +        try {
    +            return getSchema(variables, null);
    +        } catch (SchemaNotFoundException | IOException e) {
    +            if (getLogger().isDebugEnabled()) {
    +                getLogger().debug("Could not load schema, will create one from the results.", e);
    +            }
    +            return null;
    +        }
    +    }
    +
    +    private void validateCoordinates(Map coordinates) throws LookupFailureException {
    +        List<String> reasons = new ArrayList<>();
    +
    +        if (coordinates.containsKey("_id") && !(coordinates.get("_id") instanceof String)) {
    +            reasons.add("_id was supplied, but it was not a String.");
    +        }
    +
    +        if (coordinates.containsKey("_id") && coordinates.size() > 1) {
    +            reasons.add("When _id is used, it can be the only key used in the lookup.");
    +        }
    +
    +        if (reasons.size() > 0) {
    +            String error = String.join("\n", reasons);
    +            throw new LookupFailureException(error);
    +        }
    +    }
    +
    +    private Record getById(final String _id, Map<String, String> context) throws IOException, LookupFailureException, SchemaNotFoundException {
    +        Map<String, Object> query = new HashMap<String, Object>(){{
    +            put("query", new HashMap<String, Object>() {{
    +                put("match", new HashMap<String, String>(){{
    +                    put("_id", _id);
    +                }});
    +            }});
    +        }};
    +
    +        String json = mapper.writeValueAsString(query);
    +
    +        SearchResponse response = clientService.search(json, index, type);
    +
    +        if (response.getNumberOfHits() > 1) {
    +            throw new LookupFailureException(String.format("Expected 1 response, got %d for query %s",
    +                response.getNumberOfHits(), json));
    +        } else if (response.getNumberOfHits() == 0) {
    +            return null;
    +        }
    +
    +        final Map<String, Object> source = (Map)response.getHits().get(0).get("_source");
    +
    +        RecordSchema toUse = getSchema(context, source, null);
    +
    +        return new MapRecord(toUse, source);
    +    }
    +
    +    Map<String, Object> getNested(String key, Object value) {
    +        String path = key.substring(0, key.lastIndexOf("."));
    +
    +        return new HashMap<String, Object>(){{
    +            put("path", path);
    +            put("query", new HashMap<String, Object>(){{
    +                put("match", new HashMap<String, Object>(){{
    +                    put(key, value);
    +                }});
    +            }});
    +        }};
    +    }
    +
    +    private Map<String, Object> buildQuery(Map<String, Object> coordinates) {
    +        Map<String, Object> query = new HashMap<String, Object>(){{
    +            put("bool", new HashMap<String, Object>(){{
    +                put("must", coordinates.entrySet().stream()
    +                    .map(e -> new HashMap<String, Object>(){{
    +                        if (e.getKey().contains(".")) {
    --- End diff --
    
    So, I got it working and will share some artifacts tomorrow if I get a chance so you can watch them in action. I'm thinking some of the behavior still needs a second opinion on the flexibility/user-friendliness.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r184394354
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,290 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.schemaregistry.services.SchemaRegistry;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.SchemaIdentifier;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +
    +public class ElasticSearchLookupService extends AbstractControllerService implements LookupService {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
    +        .name("el-lookup-schema-registry")
    +        .displayName("Schema Registry")
    +        .description("If specified, this avro schema will be used for all objects loaded from MongoDB using this service. If left blank, " +
    +                "the service will attempt to determine the schema from the results.")
    +        .required(false)
    +        .identifiesControllerService(SchemaRegistry.class)
    +        .build();
    +
    +    public static final PropertyDescriptor RECORD_SCHEMA_NAME = new PropertyDescriptor.Builder()
    +        .name("el-lookup-record-schema-name")
    +        .displayName("Record Schema Name")
    +        .description("If specified, the value will be used to lookup a schema in the configured schema registry.")
    +        .required(false)
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    static final List<PropertyDescriptor> lookupDescriptors;
    +
    +    static {
    +        List<PropertyDescriptor> _desc = new ArrayList<>();
    +        _desc.add(CLIENT_SERVICE);
    +        _desc.add(INDEX);
    +        _desc.add(TYPE);
    +        _desc.add(SCHEMA_REGISTRY);
    +        _desc.add(RECORD_SCHEMA_NAME);
    +
    +        lookupDescriptors = Collections.unmodifiableList(_desc);
    +    }
    +
    +    private ElasticSearchClientService clientService;
    +
    +    private String index;
    +    private String type;
    +    private ObjectMapper mapper;
    +    private RecordSchema recordSchema;
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
    +        List<ValidationResult> problems = new ArrayList<>();
    +
    +        PropertyValue registry = validationContext.getProperty(SCHEMA_REGISTRY);
    +        PropertyValue schemaName = validationContext.getProperty(RECORD_SCHEMA_NAME);
    +
    +        if (registry.isSet() && !schemaName.isSet()) {
    +            problems.add(new ValidationResult.Builder()
    +                    .explanation("If the registry is set, the schema name parameter must be set too.")
    +                    .build());
    +        } else if (!registry.isSet() && schemaName.isSet()) {
    +            problems.add(new ValidationResult.Builder()
    +                    .explanation("If the schema name is set, the schema registry parameter must be set too.")
    +                    .build());
    +        }
    +
    +        return problems;
    +    }
    +
    +
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) throws InitializationException {
    +        clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
    +        index = context.getProperty(INDEX).getValue();
    +        type  = context.getProperty(TYPE).getValue();
    +        mapper = new ObjectMapper();
    +
    +        SchemaRegistry registry = context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
    +        final String name = context.getProperty(RECORD_SCHEMA_NAME).getValue();
    +        if (registry != null) {
    +            try {
    +                SchemaIdentifier identifier = SchemaIdentifier.builder()
    +                    .name(name)
    +                    .build();
    +                recordSchema = registry.retrieveSchema(identifier);
    +            } catch (Exception ex) {
    +                getLogger().error(String.format("Could not find schema named %s", name), ex);
    +                throw new InitializationException(ex);
    +            }
    +        }
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return lookupDescriptors;
    +    }
    +
    +    @Override
    +    public Optional lookup(Map coordinates) throws LookupFailureException {
    +        validateCoordinates(coordinates);
    +
    +        try {
    +            Record record;
    +            if (coordinates.containsKey("_id")) {
    +                record = getById((String)coordinates.get("_id"));
    +            } else {
    +                record = getByQuery((String)coordinates.get("query"));
    +            }
    +
    +            return record == null ? Optional.empty() : Optional.of(record);
    +        } catch (IOException ex) {
    +            getLogger().error("Error during lookup.", ex);
    +            throw new LookupFailureException(ex);
    +        }
    +    }
    +
    +    private void validateCoordinates(Map coordinates) throws LookupFailureException {
    +        List<String> reasons = new ArrayList<>();
    +
    +        if (coordinates.containsKey("_id") && !(coordinates.get("_id") instanceof String)) {
    +            reasons.add("_id was supplied, but it was not a String.");
    +        } else if (coordinates.containsKey("query") && !(coordinates.get("query") instanceof String)) {
    +            reasons.add("query was supplied, but it was not a String.");
    +        } else if (!coordinates.containsKey("_id") && !coordinates.containsKey("query")) {
    +            reasons.add("Either \"_id\" or \"query\" must be supplied as keys to lookup(Map)");
    +        } else if (coordinates.containsKey("_id") && coordinates.containsKey("query")) {
    +            reasons.add("\"_id\" and \"query\" cannot be used at the same time as keys.");
    +        }
    +
    +        if (reasons.size() > 0) {
    +            String error = String.join("\n", reasons);
    +            throw new LookupFailureException(error);
    +        }
    +    }
    +
    +    private Record getById(final String _id) throws IOException, LookupFailureException {
    +        Map<String, Object> query = new HashMap<String, Object>(){{
    +            put("query", new HashMap<String, Object>() {{
    +                put("match", new HashMap<String, String>(){{
    +                    put("_id", _id);
    +                }});
    +            }});
    +        }};
    +
    +        String json = mapper.writeValueAsString(query);
    +
    +        SearchResponse response = clientService.search(json, index, type);
    +
    +        if (response.getNumberOfHits() > 1) {
    +            throw new LookupFailureException(String.format("Expected 1 response, got %d for query %s",
    +                response.getNumberOfHits(), json));
    +        } else if (response.getNumberOfHits() == 0) {
    +            return null;
    +        }
    +
    +        final Map<String, Object> source = (Map)response.getHits().get(0).get("_source");
    +
    +        RecordSchema toUse = recordSchema != null ? recordSchema : convertSchema(source);
    +
    +        return new MapRecord(toUse, source);
    +    }
    +
    +    private Record getByQuery(final String query) throws LookupFailureException {
    +        Map<String, Object> parsed;
    +        try {
    +            parsed = mapper.readValue(query, Map.class);
    +            parsed.remove("from");
    +            parsed.remove("aggs");
    +            parsed.put("size", 1);
    +
    +            final String json = mapper.writeValueAsString(parsed);
    +
    +            SearchResponse response = clientService.search(json, index, type);
    +
    +            if (response.getNumberOfHits() == 0) {
    +                return null;
    +            } else {
    +                final Map<String, Object> source = (Map)response.getHits().get(0).get("_source");
    +                RecordSchema toUse = recordSchema != null ? recordSchema : convertSchema(source);
    +                return new MapRecord(toUse, source);
    +            }
    +
    +        } catch (IOException e) {
    +            throw new LookupFailureException(e);
    +        }
    +    }
    +
    +    private RecordSchema convertSchema(Map<String, Object> result) {
    --- End diff --
    
    Should this be in a utilities class (if there isn't already such a method in one)? Seems pretty helpful for JSON-to-schema conversions (or any Map-to-Schema) in general.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r217884483
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml ---
    @@ -150,6 +172,7 @@
                                 <httpPort>9400</httpPort>
                                 <version>5.6.2</version>
                                 <timeout>90</timeout>
    +                            <pathInitScript>src/test/resources/setup.script</pathInitScript>
    --- End diff --
    
    Verified that's the case. Going to have to look into that.


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r191741767
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml ---
    @@ -120,6 +132,22 @@
                 <version>1.7.0-SNAPSHOT</version>
                 <scope>provided</scope>
             </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-avro-record-utils</artifactId>
    +            <version>1.7.0-SNAPSHOT</version>
    +            <scope>compile</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-avro-record-utils</artifactId>
    --- End diff --
    
    Comment is still valid @MikeThomsen 


---

[GitHub] nifi pull request #2615: NIFI-5051 Created ElasticSearch lookup service.

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r192050863
  
    --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java ---
    @@ -0,0 +1,266 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.SchemaRegistryService;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +public class ElasticSearchLookupService extends SchemaRegistryService implements LookupService {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for indexing and searching)")
    +        .required(false)
    +        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +
    +    public static final PropertyDescriptor RECORD_SCHEMA_NAME = new PropertyDescriptor.Builder()
    +        .name("el-lookup-record-schema-name")
    +        .displayName("Record Schema Name")
    +        .description("If specified, the value will be used to lookup a schema in the configured schema registry.")
    +        .required(false)
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    private ElasticSearchClientService clientService;
    +
    +    private String index;
    +    private String type;
    +    private ObjectMapper mapper;
    +
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) throws InitializationException {
    +        clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
    +        index = context.getProperty(INDEX).getValue();
    --- End diff --
    
    Done.


---