You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by br...@apache.org on 2017/05/02 18:41:00 UTC

[3/5] nifi git commit: NIFI-1833 - Addressed issues from PR review.

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
deleted file mode 100644
index ee7e237..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
+++ /dev/null
@@ -1,527 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.standard;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.io.Charsets;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.state.Scope;
-import org.apache.nifi.components.state.StateMap;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.distributed.cache.client.Deserializer;
-import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
-import org.apache.nifi.distributed.cache.client.Serializer;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processors.standard.util.ListableEntity;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.state.MockStateManager;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-public class TestAbstractListProcessor {
-
-    static final long DEFAULT_SLEEP_MILLIS = TimeUnit.NANOSECONDS.toMillis(AbstractListProcessor.LISTING_LAG_NANOS * 2);
-
-    @Rule
-    public final TemporaryFolder testFolder = new TemporaryFolder();
-
-    @Test
-    public void testAllExistingEntriesEmittedOnFirstIteration() throws Exception {
-        final long oldTimestamp = System.nanoTime() - (AbstractListProcessor.LISTING_LAG_NANOS * 2);
-
-        // These entries have existed before the processor runs at the first time.
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        proc.addEntity("name", "id", oldTimestamp);
-        proc.addEntity("name", "id2", oldTimestamp);
-
-        // First run, the above listed entries should be emitted since it has existed.
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
-        runner.clearTransferState();
-
-        // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        // Run again without introducing any new entries
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-    }
-
-    @Test
-    public void testPreviouslySkippedEntriesEmittedOnNextIteration() throws Exception {
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.run();
-
-        final long initialTimestamp = System.nanoTime();
-
-        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        proc.addEntity("name", "id", initialTimestamp);
-        proc.addEntity("name", "id2", initialTimestamp);
-        runner.run();
-
-        // First run, the above listed entries would be skipped to avoid write synchronization issues
-        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        // Run again without introducing any new entries
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
-    }
-
-    @Test
-    public void testOnlyNewEntriesEmitted() throws Exception {
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.run();
-
-        final long initialTimestamp = System.nanoTime();
-
-        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        proc.addEntity("name", "id", initialTimestamp);
-        proc.addEntity("name", "id2", initialTimestamp);
-        runner.run();
-
-        // First run, the above listed entries would be skipped to avoid write synchronization issues
-        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        // Running again, our two previously seen files are now cleared to be released
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
-        runner.clearTransferState();
-
-        // Verify no new old files show up
-        proc.addEntity("name", "id2", initialTimestamp);
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        proc.addEntity("name", "id3", initialTimestamp - 1);
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        proc.addEntity("name", "id2", initialTimestamp);
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        // Now a new file beyond the current time enters
-        proc.addEntity("name", "id2", initialTimestamp + 1);
-
-        // It should show up
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
-        runner.clearTransferState();
-    }
-
-    @Test
-    public void testHandleRestartWithEntriesAlreadyTransferredAndNoneNew() throws Exception {
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-
-        final long initialTimestamp = System.nanoTime();
-
-        proc.addEntity("name", "id", initialTimestamp);
-        proc.addEntity("name", "id2", initialTimestamp);
-
-        // Emulate having state but not having had the processor run such as in a restart
-        final Map<String, String> preexistingState = new HashMap<>();
-        preexistingState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, Long.toString(initialTimestamp));
-        preexistingState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, Long.toString(initialTimestamp));
-        runner.getStateManager().setState(preexistingState, Scope.CLUSTER);
-
-        // run for the first time
-        runner.run();
-
-        // First run, the above listed entries would be skipped
-        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        // Running again, these files should be eligible for transfer and again skipped
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        // Verify no new old files show up
-        proc.addEntity("name", "id2", initialTimestamp);
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        proc.addEntity("name", "id3", initialTimestamp - 1);
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        proc.addEntity("name", "id2", initialTimestamp);
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        // Now a new file beyond the current time enters
-        proc.addEntity("name", "id2", initialTimestamp + 1);
-
-        // It should now show up
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
-        runner.clearTransferState();
-    }
-
-    @Test
-    public void testStateStoredInClusterStateManagement() throws Exception {
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        final DistributedCache cache = new DistributedCache();
-        runner.addControllerService("cache", cache);
-        runner.enableControllerService(cache);
-        runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache");
-
-        final long initialTimestamp = System.nanoTime();
-
-        proc.addEntity("name", "id", initialTimestamp);
-        runner.run();
-
-        final Map<String, String> expectedState = new HashMap<>();
-        // Ensure only timestamp is migrated
-        expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
-        expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "0");
-        runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
-
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        runner.run();
-        // Ensure only timestamp is migrated
-        expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
-        expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
-        runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
-    }
-
-    @Test
-    public void testStateMigratedFromCacheService() throws InitializationException {
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        final DistributedCache cache = new DistributedCache();
-        runner.addControllerService("cache", cache);
-        runner.enableControllerService(cache);
-        runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache");
-
-        final String serviceState = "{\"latestTimestamp\":1492,\"matchingIdentifiers\":[\"id\"]}";
-        final String cacheKey = runner.getProcessor().getIdentifier() + ".lastListingTime./path";
-        cache.stored.put(cacheKey, serviceState);
-
-        runner.run();
-
-        final MockStateManager stateManager = runner.getStateManager();
-        final Map<String, String> expectedState = new HashMap<>();
-        // Ensure only timestamp is migrated
-        expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492");
-        expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "1492");
-        stateManager.assertStateEquals(expectedState, Scope.CLUSTER);
-    }
-
-    @Test
-    public void testNoStateToMigrate() throws Exception {
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-
-        runner.run();
-
-        final MockStateManager stateManager = runner.getStateManager();
-        final Map<String, String> expectedState = new HashMap<>();
-        stateManager.assertStateEquals(expectedState, Scope.CLUSTER);
-    }
-
-    @Test
-    public void testStateMigratedFromLocalFile() throws Exception {
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-
-        // Create a file that we will populate with the desired state
-        File persistenceFile = testFolder.newFile(proc.persistenceFilename);
-        // Override the processor's internal persistence file
-        proc.persistenceFile = persistenceFile;
-
-        // Local File persistence was a properties file format of <key>=<JSON entity listing representation>
-        // Our ConcreteListProcessor is centered around files which are provided for a given path
-        final String serviceState = proc.getPath(runner.getProcessContext()) + "={\"latestTimestamp\":1492,\"matchingIdentifiers\":[\"id\"]}";
-
-        // Create a persistence file of the format anticipated
-        try (FileOutputStream fos = new FileOutputStream(persistenceFile);) {
-            fos.write(serviceState.getBytes(Charsets.UTF_8));
-        }
-
-        runner.run();
-
-        // Verify the local persistence file is removed
-        Assert.assertTrue("Failed to remove persistence file", !persistenceFile.exists());
-
-        // Verify the state manager now maintains the associated state
-        final Map<String, String> expectedState = new HashMap<>();
-        // Ensure only timestamp is migrated
-        expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492");
-        expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "1492");
-        runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
-    }
-
-    @Test
-    public void testResumeListingAfterClearingState() throws Exception {
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.run();
-
-        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-
-        final long initialEventTimestamp = System.nanoTime();
-        proc.addEntity("name", "id", initialEventTimestamp);
-        proc.addEntity("name", "id2", initialEventTimestamp);
-
-        // Add entities but these should not be transferred as they are the latest values
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-
-        // after providing a pause in listings, the files should now  transfer
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
-        runner.clearTransferState();
-
-        // Verify entities are not transferred again for the given state
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        // Clear state for this processor, eradicating timestamp
-        runner.getStateManager().clear(Scope.CLUSTER);
-        Assert.assertEquals("State is not empty for this component after clearing", 0, runner.getStateManager().getState(Scope.CLUSTER).toMap().size());
-
-        // Ensure the original files are now transferred again.
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
-        runner.clearTransferState();
-    }
-
-    @Test
-    public void testFetchOnStart() throws InitializationException {
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        final DistributedCache cache = new DistributedCache();
-        runner.addControllerService("cache", cache);
-        runner.enableControllerService(cache);
-        runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache");
-
-        runner.run();
-
-        assertEquals(1, cache.fetchCount);
-    }
-
-    @Test
-    public void testOnlyNewStateStored() throws Exception {
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.run();
-
-        final long initialTimestamp = System.nanoTime();
-
-        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        proc.addEntity("name", "id", initialTimestamp);
-        proc.addEntity("name", "id2", initialTimestamp);
-
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
-        runner.clearTransferState();
-
-        final StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER);
-        assertEquals(2, stateMap.getVersion());
-
-        final Map<String, String> map = stateMap.toMap();
-        // Ensure only timestamp is migrated
-        assertEquals(2, map.size());
-        assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LISTING_TIMESTAMP_KEY));
-        assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY));
-
-        proc.addEntity("new name", "new id", initialTimestamp + 1);
-        runner.run();
-
-        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
-        runner.clearTransferState();
-
-        StateMap updatedStateMap = runner.getStateManager().getState(Scope.CLUSTER);
-        assertEquals(3, updatedStateMap.getVersion());
-
-        assertEquals(2, updatedStateMap.toMap().size());
-        assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LISTING_TIMESTAMP_KEY));
-        // Processed timestamp is now caught up
-        assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY));
-    }
-
-    private static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient {
-        private final Map<Object, Object> stored = new HashMap<>();
-        private int fetchCount = 0;
-
-        @Override
-        public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
-            return false;
-        }
-
-        @Override
-        public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException {
-            return null;
-        }
-
-        @Override
-        public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException {
-            return false;
-        }
-
-        @Override
-        public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
-            stored.put(key, value);
-        }
-
-        @Override
-        @SuppressWarnings("unchecked")
-        public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
-            fetchCount++;
-            return (V) stored.get(key);
-        }
-
-        @Override
-        public void close() throws IOException {
-        }
-
-        @Override
-        public <K> boolean remove(K key, Serializer<K> serializer) throws IOException {
-            final Object value = stored.remove(key);
-            return value != null;
-        }
-
-        @Override
-        public long removeByPattern(String regex) throws IOException {
-            final List<Object> removedRecords = new ArrayList<>();
-            Pattern p = Pattern.compile(regex);
-            for (Object key : stored.keySet()) {
-                // Key must be backed by something that can be converted into a String
-                Matcher m = p.matcher(key.toString());
-                if (m.matches()) {
-                    removedRecords.add(stored.get(key));
-                }
-            }
-            final long numRemoved = removedRecords.size();
-            removedRecords.forEach(stored::remove);
-            return numRemoved;
-        }
-    }
-
-
-    private static class ConcreteListProcessor extends AbstractListProcessor<ListableEntity> {
-        private final List<ListableEntity> entities = new ArrayList<>();
-
-        public final String persistenceFilename = "ListProcessor-local-state-" + UUID.randomUUID().toString() + ".json";
-        public String persistenceFolder = "target/";
-        public File persistenceFile = new File(persistenceFolder + persistenceFilename);
-
-        @Override
-        protected File getPersistenceFile() {
-            return persistenceFile;
-        }
-
-        public void addEntity(final String name, final String identifier, final long timestamp) {
-            final ListableEntity entity = new ListableEntity() {
-                @Override
-                public String getName() {
-                    return name;
-                }
-
-                @Override
-                public String getIdentifier() {
-                    return identifier;
-                }
-
-                @Override
-                public long getTimestamp() {
-                    return timestamp;
-                }
-            };
-
-            entities.add(entity);
-        }
-
-        @Override
-        protected Map<String, String> createAttributes(final ListableEntity entity, final ProcessContext context) {
-            return Collections.emptyMap();
-        }
-
-        @Override
-        protected String getPath(final ProcessContext context) {
-            return "/path";
-        }
-
-        @Override
-        protected List<ListableEntity> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
-            return Collections.unmodifiableList(entities);
-        }
-
-        @Override
-        protected boolean isListingResetNecessary(PropertyDescriptor property) {
-            return false;
-        }
-
-        @Override
-        protected Scope getStateScope(final ProcessContext context) {
-            return Scope.CLUSTER;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
index 951aab0..26dcbbf 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
@@ -40,6 +40,7 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.list.AbstractListProcessor;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 06d155a..53b46a1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1009,6 +1009,11 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-distributed-cache-client-service-api</artifactId>
+                <version>1.2.0-SNAPSHOT</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-standard-nar</artifactId>
                 <version>1.2.0-SNAPSHOT</version>
                 <type>nar</type>