You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2019/02/14 13:02:46 UTC

[nifi] branch master updated: NIFI-5947 Elasticsearch lookup service compatible with LookupAttribute NIFI-5947 Made the lookup key for ElasticSearchStringLookupService more unique to the service (was previously 'id'); change by Mike Thomsen

This is an automated email from the ASF dual-hosted git repository.

mthomsen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 45b32e3  NIFI-5947 Elasticsearch lookup service compatible with LookupAttribute NIFI-5947 Made the lookup key for ElasticSearchStringLookupService more unique to the service (was previously 'id'); change by Mike Thomsen
45b32e3 is described below

commit 45b32e3bc11feea1958902cecc3c84b26b5b2b20
Author: Alex Savitsky <al...@scotiabank.com>
AuthorDate: Thu Jan 10 12:18:10 2019 -0500

    NIFI-5947 Elasticsearch lookup service compatible with LookupAttribute
    NIFI-5947 Made the lookup key for ElasticSearchStringLookupService more unique to the service (was previously 'id'); change by Mike Thomsen
    
    This closes #3268.
    
    Signed-off-by: Mike Thomsen <mi...@gmail.com>
---
 .../elasticsearch/ElasticSearchLookupService.java  |   5 +
 .../ElasticSearchStringLookupService.java          | 101 +++++++++++++++++++++
 .../org.apache.nifi.controller.ControllerService   |   3 +-
 .../ElasticSearchStringLookupServiceTest.groovy    |  59 ++++++++++++
 .../TestElasticSearchClientService.groovy          |  16 ++--
 5 files changed, 176 insertions(+), 8 deletions(-)

diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
index c864771..fd5a3e3 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
@@ -19,6 +19,8 @@ package org.apache.nifi.elasticsearch;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.jayway.jsonpath.JsonPath;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationResult;
@@ -46,6 +48,9 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
+@CapabilityDescription("Lookup a record from Elasticsearch Server associated with the specified document ID. " +
+        "The coordinates that are passed to the lookup must contain the key 'id'.")
+@Tags({"lookup", "enrich", "record", "elasticsearch"})
 public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryService implements LookupService<Record> {
     public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
         .name("el-rest-client-service")
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchStringLookupService.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchStringLookupService.java
new file mode 100644
index 0000000..0ff9672
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchStringLookupService.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.lookup.StringLookupService;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+@CapabilityDescription("Lookup a string value from Elasticsearch Server associated with the specified document ID. " +
+        "The coordinates that are passed to the lookup must contain the key 'id'.")
+@Tags({"lookup", "enrich", "value", "key", "elasticsearch"})
+public class ElasticSearchStringLookupService extends AbstractControllerService implements StringLookupService {
+    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
+            .name("el-rest-client-service")
+            .displayName("Client Service")
+            .description("An ElasticSearch client service to use for running queries.")
+            .identifiesControllerService(ElasticSearchClientService.class)
+            .required(true)
+            .build();
+    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
+            .name("el-lookup-index")
+            .displayName("Index")
+            .description("The name of the index to read from")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
+            .name("el-lookup-type")
+            .displayName("Type")
+            .description("The type of this document (used by Elasticsearch for indexing and searching)")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    private static final List<PropertyDescriptor> DESCRIPTORS = Arrays.asList(CLIENT_SERVICE, INDEX, TYPE);
+    private static final ObjectMapper mapper = new ObjectMapper();
+    public static final String ID = "es_document_id";
+    private ElasticSearchClientService esClient;
+    private String index;
+    private String type;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        esClient = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
+        index = context.getProperty(INDEX).evaluateAttributeExpressions().getValue();
+        type = context.getProperty(TYPE).evaluateAttributeExpressions().getValue();
+    }
+
+    @Override
+    public Optional<String> lookup(Map<String, Object> coordinates) throws LookupFailureException {
+        try {
+            final String id = (String) coordinates.get(ID);
+            final Map<String, Object> enums = esClient.get(index, type, id);
+            return Optional.of(mapper.writeValueAsString(enums));
+        } catch (IOException e) {
+            throw new LookupFailureException(e);
+        }
+    }
+
+    @Override
+    public Set<String> getRequiredKeys() {
+        return Collections.singleton(ID);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 65745fb..56d640f 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -13,4 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 org.apache.nifi.elasticsearch.ElasticSearchLookupService
-org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl
\ No newline at end of file
+org.apache.nifi.elasticsearch.ElasticSearchStringLookupService
+org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchStringLookupServiceTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchStringLookupServiceTest.groovy
new file mode 100644
index 0000000..f3ad4b9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchStringLookupServiceTest.groovy
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.elasticsearch.integration
+
+import org.apache.nifi.elasticsearch.ElasticSearchClientService
+import org.apache.nifi.elasticsearch.ElasticSearchStringLookupService
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.junit.Assert
+import org.junit.Before
+import org.junit.Test
+
+class ElasticSearchStringLookupServiceTest {
+	ElasticSearchClientService mockClientService
+	ElasticSearchStringLookupService lookupService
+	TestRunner runner
+
+	@Before
+	void setup() throws Exception {
+		mockClientService = new TestElasticSearchClientService()
+		lookupService = new ElasticSearchStringLookupService()
+		runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class)
+		runner.addControllerService("clientService", mockClientService)
+		runner.addControllerService("lookupService", lookupService)
+		runner.enableControllerService(mockClientService)
+		runner.setProperty(lookupService, ElasticSearchStringLookupService.CLIENT_SERVICE, "clientService")
+		runner.setProperty(lookupService, ElasticSearchStringLookupService.INDEX, "users")
+		runner.setProperty(TestControllerServiceProcessor.CLIENT_SERVICE, "clientService")
+		runner.setProperty(TestControllerServiceProcessor.LOOKUP_SERVICE, "lookupService")
+		runner.enableControllerService(lookupService)
+	}
+
+	@Test
+	void simpleLookupTest() throws Exception {
+		def coordinates = [ (ElasticSearchStringLookupService.ID): "12345" ]
+
+		Optional<String> result = lookupService.lookup(coordinates)
+
+		Assert.assertNotNull(result)
+		Assert.assertTrue(result.isPresent())
+		String json = result.get()
+		Assert.assertEquals('{"username":"john.smith","password":"testing1234","email":"john.smith@test.com","position":"Software Engineer"}', json)
+	}
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy
index 3b5fc0a..899cedb 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy
@@ -25,6 +25,13 @@ import org.apache.nifi.elasticsearch.IndexOperationResponse
 import org.apache.nifi.elasticsearch.SearchResponse
 
 class TestElasticSearchClientService extends AbstractControllerService implements ElasticSearchClientService {
+    Map data = [
+        "username": "john.smith",
+        "password": "testing1234",
+        "email": "john.smith@test.com",
+        "position": "Software Engineer"
+    ]
+
     @Override
     IndexOperationResponse add(IndexOperationRequest operation) throws IOException {
         return null
@@ -52,18 +59,13 @@ class TestElasticSearchClientService extends AbstractControllerService implement
 
     @Override
     Map<String, Object> get(String index, String type, String id) throws IOException {
-        return null
+        return data
     }
 
     @Override
     SearchResponse search(String query, String index, String type) throws IOException {
         List hits = [[
-            "_source": [
-                "username": "john.smith",
-                "password": "testing1234",
-                "email": "john.smith@test.com",
-                "position": "Software Engineer"
-            ]
+            "_source": data
         ]]
         return new SearchResponse(hits, null, 1, 100, false)
     }