You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by pvillard31 <gi...@git.apache.org> on 2018/03/16 09:30:49 UTC

[GitHub] nifi pull request #2558: NIFI-4982 - Add a DistributedMapCacheLookupService

GitHub user pvillard31 opened a pull request:

    https://github.com/apache/nifi/pull/2558

    NIFI-4982 - Add a DistributedMapCacheLookupService

    Thank you for submitting a contribution to Apache NiFi.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
         in the commit message?
    
    - [ ] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [ ] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
    - [ ] Have you written or updated unit tests to verify your changes?
    - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
    - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
    - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
    - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
    
    ### For documentation related changes:
    - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/pvillard31/nifi mapcachelookup

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/2558.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2558
    
----
commit f13e83043ce1c86df933d182902c3bba10a8c0c1
Author: Pierre Villard <pi...@...>
Date:   2018-03-15T20:28:26Z

    NIFI-4982 - Add a DistributedMapCacheLookupService

----


---

[GitHub] nifi pull request #2558: NIFI-4982 - Add a DistributedMapCacheLookupService

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2558#discussion_r180771298
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/DistributedMapCacheLookupService.java ---
    @@ -0,0 +1,127 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.lookup;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.distributed.cache.client.Deserializer;
    +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
    +import org.apache.nifi.distributed.cache.client.Serializer;
    +import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
    +import org.apache.nifi.distributed.cache.client.exception.SerializationException;
    +
    +@Tags({"lookup", "enrich", "key", "value", "map", "cache", "distributed"})
    +@CapabilityDescription("Allows to choose a distributed map cache client to retrieve the value associated to a key. "
    +    + "The coordinates that are passed to the lookup must contain the key 'key'.")
    +public class DistributedMapCacheLookupService extends AbstractControllerService implements StringLookupService {
    +    private static final String KEY = "key";
    +    private static final Set<String> REQUIRED_KEYS = Stream.of(KEY).collect(Collectors.toSet());
    +
    +    private volatile DistributedMapCacheClient cache;
    +    private final Serializer<String> keySerializer = new StringSerializer();
    +    private final Deserializer<String> valueDeserializer = new StringDeserializer();
    +
    +    public static final PropertyDescriptor PROP_DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
    +            .name("distributed-map-cache-service")
    +            .displayName("Distributed Cache Service")
    +            .description("The Controller Service that is used to get the cached values.")
    +            .required(true)
    +            .identifiesControllerService(DistributedMapCacheClient.class)
    +            .build();
    +
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +            .name(propertyDescriptorName)
    +            .required(false)
    +            .dynamic(true)
    +            .addValidator(Validator.VALID)
    +            .expressionLanguageSupported(true)
    --- End diff --
    
    As of your PR #2205 , this should be changed to indicate the scope :)


---

[GitHub] nifi pull request #2558: NIFI-4982 - Add a DistributedMapCacheLookupService

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2558#discussion_r195933469
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/DistributedMapCacheLookupService.java ---
    @@ -0,0 +1,155 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.lookup;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.nio.charset.Charset;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.distributed.cache.client.Deserializer;
    +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
    +import org.apache.nifi.distributed.cache.client.Serializer;
    +import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
    +import org.apache.nifi.distributed.cache.client.exception.SerializationException;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +
    +@Tags({"lookup", "enrich", "key", "value", "map", "cache", "distributed"})
    +@CapabilityDescription("Allows to choose a distributed map cache client to retrieve the value associated to a key. "
    +    + "The coordinates that are passed to the lookup must contain the key 'key'.")
    +public class DistributedMapCacheLookupService extends AbstractControllerService implements StringLookupService {
    +
    +    private static final List<Charset> STANDARD_CHARSETS = Arrays.asList(
    +            StandardCharsets.UTF_8,
    +            StandardCharsets.US_ASCII,
    +            StandardCharsets.ISO_8859_1,
    +            StandardCharsets.UTF_16,
    +            StandardCharsets.UTF_16LE,
    +            StandardCharsets.UTF_16BE);
    +
    +    private static final String KEY = "key";
    +    private static final Set<String> REQUIRED_KEYS = Stream.of(KEY).collect(Collectors.toSet());
    +
    +    private volatile DistributedMapCacheClient cache;
    +    private volatile static Charset charset;
    +    private final Serializer<String> keySerializer = new StringSerializer();
    +    private final Deserializer<String> valueDeserializer = new StringDeserializer();
    +
    +    public static final PropertyDescriptor PROP_DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
    +            .name("distributed-map-cache-service")
    +            .displayName("Distributed Cache Service")
    +            .description("The Controller Service that is used to get the cached values.")
    +            .required(true)
    +            .identifiesControllerService(DistributedMapCacheClient.class)
    +            .build();
    +
    +    public static final PropertyDescriptor CHARACTER_ENCODING = new PropertyDescriptor.Builder()
    +            .name("character-encoding")
    +            .displayName("Character Encoding")
    +            .description("Specifies a character encoding to use.")
    +            .required(true)
    +            .allowableValues(getStandardCharsetNames())
    +            .defaultValue(StandardCharsets.UTF_8.displayName())
    +            .build();
    +
    +    private static Set<String> getStandardCharsetNames() {
    +        return STANDARD_CHARSETS.stream().map(c -> c.displayName()).collect(Collectors.toSet());
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +            .name(propertyDescriptorName)
    +            .required(false)
    +            .dynamic(true)
    +            .addValidator(Validator.VALID)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .build();
    +    }
    +
    +    @OnEnabled
    +    public void cacheConfiguredValues(final ConfigurationContext context) {
    --- End diff --
    
    Nitpick: `cacheConfiguredValues` It doesn't actually cache here so I thought `onEnabled` would be a good candidate.


---

[GitHub] nifi pull request #2558: NIFI-4982 - Add a DistributedMapCacheLookupService

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2558#discussion_r180771987
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/DistributedMapCacheLookupService.java ---
    @@ -0,0 +1,127 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.lookup;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.distributed.cache.client.Deserializer;
    +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
    +import org.apache.nifi.distributed.cache.client.Serializer;
    +import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
    +import org.apache.nifi.distributed.cache.client.exception.SerializationException;
    +
    +@Tags({"lookup", "enrich", "key", "value", "map", "cache", "distributed"})
    +@CapabilityDescription("Allows to choose a distributed map cache client to retrieve the value associated to a key. "
    +    + "The coordinates that are passed to the lookup must contain the key 'key'.")
    +public class DistributedMapCacheLookupService extends AbstractControllerService implements StringLookupService {
    +    private static final String KEY = "key";
    +    private static final Set<String> REQUIRED_KEYS = Stream.of(KEY).collect(Collectors.toSet());
    +
    +    private volatile DistributedMapCacheClient cache;
    +    private final Serializer<String> keySerializer = new StringSerializer();
    +    private final Deserializer<String> valueDeserializer = new StringDeserializer();
    +
    +    public static final PropertyDescriptor PROP_DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
    +            .name("distributed-map-cache-service")
    +            .displayName("Distributed Cache Service")
    +            .description("The Controller Service that is used to get the cached values.")
    +            .required(true)
    +            .identifiesControllerService(DistributedMapCacheClient.class)
    +            .build();
    +
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +            .name(propertyDescriptorName)
    +            .required(false)
    +            .dynamic(true)
    +            .addValidator(Validator.VALID)
    +            .expressionLanguageSupported(true)
    +            .build();
    +    }
    +
    +    @OnEnabled
    +    public void cacheConfiguredValues(final ConfigurationContext context) {
    +        cache = context.getProperty(PROP_DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        descriptors.add(PROP_DISTRIBUTED_CACHE_SERVICE);
    +        return descriptors;
    +    }
    +
    +    @Override
    +    public Optional<String> lookup(final Map<String, Object> coordinates) {
    +        if (coordinates == null) {
    +            return Optional.empty();
    +        }
    +
    +        final String key = coordinates.get(KEY).toString();
    +        if (key == null) {
    +            return Optional.empty();
    +        }
    +
    +        try {
    +            return Optional.ofNullable(cache.get(key, keySerializer, valueDeserializer));
    +        } catch (IOException e) {
    +            getLogger().error("Error while trying to get the value from distributed map cache with key = " + key, e);
    +            return Optional.empty();
    +        }
    +    }
    +
    +    @Override
    +    public Set<String> getRequiredKeys() {
    +        return REQUIRED_KEYS;
    +    }
    +
    +    public static class StringDeserializer implements Deserializer<String> {
    --- End diff --
    
    Should these be moved up to a utilities class or API package?  Also, do we support character sets other than UTF-8?


---

[GitHub] nifi pull request #2558: NIFI-4982 - Add a DistributedMapCacheLookupService

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2558#discussion_r180771727
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/DistributedMapCacheLookupService.java ---
    @@ -0,0 +1,127 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.lookup;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.distributed.cache.client.Deserializer;
    +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
    +import org.apache.nifi.distributed.cache.client.Serializer;
    +import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
    +import org.apache.nifi.distributed.cache.client.exception.SerializationException;
    +
    +@Tags({"lookup", "enrich", "key", "value", "map", "cache", "distributed"})
    +@CapabilityDescription("Allows to choose a distributed map cache client to retrieve the value associated to a key. "
    +    + "The coordinates that are passed to the lookup must contain the key 'key'.")
    +public class DistributedMapCacheLookupService extends AbstractControllerService implements StringLookupService {
    +    private static final String KEY = "key";
    --- End diff --
    
    Does requiring a single key "key" mean we can only do one lookup at a time? Perhaps we should not require any keys and let the dynamic properties define the lookup keys?


---

[GitHub] nifi pull request #2558: NIFI-4982 - Add a DistributedMapCacheLookupService

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/2558


---

[GitHub] nifi issue #2558: NIFI-4982 - Add a DistributedMapCacheLookupService

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on the issue:

    https://github.com/apache/nifi/pull/2558
  
    Thanks @ijokarumawak and @mattyb149 for the review.
    
    Thanks @pvillard31 Everything looks good. Merged to master.


---

[GitHub] nifi pull request #2558: NIFI-4982 - Add a DistributedMapCacheLookupService

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2558#discussion_r194971580
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/DistributedMapCacheLookupService.java ---
    @@ -0,0 +1,127 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.lookup;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.distributed.cache.client.Deserializer;
    +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
    +import org.apache.nifi.distributed.cache.client.Serializer;
    +import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
    +import org.apache.nifi.distributed.cache.client.exception.SerializationException;
    +
    +@Tags({"lookup", "enrich", "key", "value", "map", "cache", "distributed"})
    +@CapabilityDescription("Allows to choose a distributed map cache client to retrieve the value associated to a key. "
    +    + "The coordinates that are passed to the lookup must contain the key 'key'.")
    +public class DistributedMapCacheLookupService extends AbstractControllerService implements StringLookupService {
    +    private static final String KEY = "key";
    --- End diff --
    
    LookupAttribute processor loops through dynamic properties to fetch multiple keys, but it only supports single coordinates.
    
    |Dynamic property name|Dynamic property value|Result|
    |------------------------|------------------------|------|
    |resultA|`${attributeAKey}`|Lookup cache with `attributeAKey` FlowFile attribute value and put it to `resultA` attribute|
    |resultB|`${attributeBKey}`|Lookup cache with `attributeBKey` FlowFile attribute value and put it to `resultB` attribute|
    
    Since this LookupService implements StringLookupService, I think it is correct to require a "key" to fetch the String.


---

[GitHub] nifi issue #2558: NIFI-4982 - Add a DistributedMapCacheLookupService

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/2558
  
    A template to help reviewing/testing this PR is available in the JIRA: https://issues.apache.org/jira/secure/attachment/12914839/distributedMapCacheLookup.xml


---

[GitHub] nifi pull request #2558: NIFI-4982 - Add a DistributedMapCacheLookupService

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2558#discussion_r195999042
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/DistributedMapCacheLookupService.java ---
    @@ -0,0 +1,155 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.lookup;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.nio.charset.Charset;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.distributed.cache.client.Deserializer;
    +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
    +import org.apache.nifi.distributed.cache.client.Serializer;
    +import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
    +import org.apache.nifi.distributed.cache.client.exception.SerializationException;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +
    +@Tags({"lookup", "enrich", "key", "value", "map", "cache", "distributed"})
    +@CapabilityDescription("Allows to choose a distributed map cache client to retrieve the value associated to a key. "
    +    + "The coordinates that are passed to the lookup must contain the key 'key'.")
    +public class DistributedMapCacheLookupService extends AbstractControllerService implements StringLookupService {
    +
    +    private static final List<Charset> STANDARD_CHARSETS = Arrays.asList(
    +            StandardCharsets.UTF_8,
    +            StandardCharsets.US_ASCII,
    +            StandardCharsets.ISO_8859_1,
    +            StandardCharsets.UTF_16,
    +            StandardCharsets.UTF_16LE,
    +            StandardCharsets.UTF_16BE);
    +
    +    private static final String KEY = "key";
    +    private static final Set<String> REQUIRED_KEYS = Stream.of(KEY).collect(Collectors.toSet());
    +
    +    private volatile DistributedMapCacheClient cache;
    +    private volatile static Charset charset;
    +    private final Serializer<String> keySerializer = new StringSerializer();
    +    private final Deserializer<String> valueDeserializer = new StringDeserializer();
    +
    +    public static final PropertyDescriptor PROP_DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
    +            .name("distributed-map-cache-service")
    +            .displayName("Distributed Cache Service")
    +            .description("The Controller Service that is used to get the cached values.")
    +            .required(true)
    +            .identifiesControllerService(DistributedMapCacheClient.class)
    +            .build();
    +
    +    public static final PropertyDescriptor CHARACTER_ENCODING = new PropertyDescriptor.Builder()
    +            .name("character-encoding")
    +            .displayName("Character Encoding")
    +            .description("Specifies a character encoding to use.")
    +            .required(true)
    +            .allowableValues(getStandardCharsetNames())
    +            .defaultValue(StandardCharsets.UTF_8.displayName())
    +            .build();
    +
    +    private static Set<String> getStandardCharsetNames() {
    +        return STANDARD_CHARSETS.stream().map(c -> c.displayName()).collect(Collectors.toSet());
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +            .name(propertyDescriptorName)
    +            .required(false)
    +            .dynamic(true)
    +            .addValidator(Validator.VALID)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .build();
    +    }
    +
    +    @OnEnabled
    +    public void cacheConfiguredValues(final ConfigurationContext context) {
    --- End diff --
    
    True - done!


---

[GitHub] nifi pull request #2558: NIFI-4982 - Add a DistributedMapCacheLookupService

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2558#discussion_r180772442
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestDistributedMapCacheLookupService.java ---
    @@ -0,0 +1,131 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.lookup;
    +
    +import static org.hamcrest.CoreMatchers.instanceOf;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertThat;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Optional;
    +
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.distributed.cache.client.Deserializer;
    +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
    +import org.apache.nifi.distributed.cache.client.Serializer;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.Test;
    +
    +public class TestDistributedMapCacheLookupService {
    +
    +    final static Optional<String> EMPTY_STRING = Optional.empty();
    +
    +    @Test
    +    public void testDistributedMapCacheLookupService() throws InitializationException {
    +        final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
    +        final DistributedMapCacheLookupService service = new DistributedMapCacheLookupService();
    +        final DistributedMapCacheClient client = new DistributedMapCacheClientImpl();
    +
    +        runner.addControllerService("client", client);
    +        runner.addControllerService("lookup-service", service);
    +        runner.setProperty(service, DistributedMapCacheLookupService.PROP_DISTRIBUTED_CACHE_SERVICE, "client");
    +
    +        runner.enableControllerService(client);
    +        runner.enableControllerService(service);
    +
    +        runner.assertValid(service);
    +
    +        assertThat(service, instanceOf(LookupService.class));
    +
    +        final Optional<String> get = service.lookup(Collections.singletonMap("key", "myKey"));
    +        assertEquals(Optional.of("myValue"), get);
    +
    +        final Optional<String> absent = service.lookup(Collections.singletonMap("key", "absentKey"));
    +        assertEquals(EMPTY_STRING, absent);
    +    }
    +
    +    static final class DistributedMapCacheClientImpl extends AbstractControllerService implements DistributedMapCacheClient {
    +
    +        private Map<String, String> map = new HashMap<String, String>();
    +
    +        @OnEnabled
    +        public void onEnabled(final ConfigurationContext context) {
    +            map.put("myKey", "myValue");
    +        }
    +
    +        @Override
    +        public void close() throws IOException {
    +        }
    +
    +        @Override
    +        public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
    +        }
    +
    +        @Override
    +        protected java.util.List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +            return new ArrayList<>();
    +        }
    +
    +        @Override
    +        public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
    +            throw new IOException("not implemented");
    --- End diff --
    
    Nitpick, but these should probably be UnsupportedOperationExceptions :P 


---

[GitHub] nifi issue #2558: NIFI-4982 - Add a DistributedMapCacheLookupService

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on the issue:

    https://github.com/apache/nifi/pull/2558
  
    No further comments from me, I'm +1. @zenfenan please merge. Thanks!


---