You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by mattyb149 <gi...@git.apache.org> on 2018/04/11 14:24:09 UTC

[GitHub] nifi pull request #2558: NIFI-4982 - Add a DistributedMapCacheLookupService

Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2558#discussion_r180771987
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/DistributedMapCacheLookupService.java ---
    @@ -0,0 +1,127 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.lookup;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.distributed.cache.client.Deserializer;
    +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
    +import org.apache.nifi.distributed.cache.client.Serializer;
    +import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
    +import org.apache.nifi.distributed.cache.client.exception.SerializationException;
    +
    +@Tags({"lookup", "enrich", "key", "value", "map", "cache", "distributed"})
    +@CapabilityDescription("Allows to choose a distributed map cache client to retrieve the value associated to a key. "
    +    + "The coordinates that are passed to the lookup must contain the key 'key'.")
    +public class DistributedMapCacheLookupService extends AbstractControllerService implements StringLookupService {
    +    private static final String KEY = "key";
    +    private static final Set<String> REQUIRED_KEYS = Stream.of(KEY).collect(Collectors.toSet());
    +
    +    private volatile DistributedMapCacheClient cache;
    +    private final Serializer<String> keySerializer = new StringSerializer();
    +    private final Deserializer<String> valueDeserializer = new StringDeserializer();
    +
    +    public static final PropertyDescriptor PROP_DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
    +            .name("distributed-map-cache-service")
    +            .displayName("Distributed Cache Service")
    +            .description("The Controller Service that is used to get the cached values.")
    +            .required(true)
    +            .identifiesControllerService(DistributedMapCacheClient.class)
    +            .build();
    +
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +            .name(propertyDescriptorName)
    +            .required(false)
    +            .dynamic(true)
    +            .addValidator(Validator.VALID)
    +            .expressionLanguageSupported(true)
    +            .build();
    +    }
    +
    +    @OnEnabled
    +    public void cacheConfiguredValues(final ConfigurationContext context) {
    +        cache = context.getProperty(PROP_DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        descriptors.add(PROP_DISTRIBUTED_CACHE_SERVICE);
    +        return descriptors;
    +    }
    +
    +    @Override
    +    public Optional<String> lookup(final Map<String, Object> coordinates) {
    +        if (coordinates == null) {
    +            return Optional.empty();
    +        }
    +
    +        final String key = coordinates.get(KEY).toString();
    +        if (key == null) {
    +            return Optional.empty();
    +        }
    +
    +        try {
    +            return Optional.ofNullable(cache.get(key, keySerializer, valueDeserializer));
    +        } catch (IOException e) {
    +            getLogger().error("Error while trying to get the value from distributed map cache with key = " + key, e);
    +            return Optional.empty();
    +        }
    +    }
    +
    +    @Override
    +    public Set<String> getRequiredKeys() {
    +        return REQUIRED_KEYS;
    +    }
    +
    +    public static class StringDeserializer implements Deserializer<String> {
    --- End diff --
    
    Should these be moved up to a utilities class or API package?  Also, do we support character sets other than UTF-8?


---