You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2017/06/01 19:31:50 UTC

nifi git commit: NIFI-4003: Expose configuration option for cache size and duration NIFI-4003: Addressed remaining spots where client does not cache information

Repository: nifi
Updated Branches:
  refs/heads/master 2595d816c -> 067e9dfeb


NIFI-4003: Expose configuration option for cache size and duration NIFI-4003: Addressed remaining spots where client does not cache information

This closes #1879.

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/067e9dfe
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/067e9dfe
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/067e9dfe

Branch: refs/heads/master
Commit: 067e9dfeb0a7830a13003a04e4cfc3fd800f019b
Parents: 2595d81
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Jun 1 13:29:29 2017 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Thu Jun 1 15:31:35 2017 -0400

----------------------------------------------------------------------
 .../nifi-hwx-schema-registry-service/pom.xml    |  20 +++
 .../hortonworks/HortonworksSchemaRegistry.java  | 168 ++++++++++-------
 .../TestHortonworksSchemaRegistry.java          | 180 +++++++++++++++++++
 .../src/test/resources/empty-schema.avsc        |   6 +
 4 files changed, 314 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/067e9dfe/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml
index 574831b..79dbc84 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml
@@ -196,5 +196,25 @@ limitations under the License.
 			<artifactId>junit</artifactId>
 			<scope>test</scope>
 		</dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
 	</dependencies>
+    
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes combine.children="append">
+                        <exclude>src/test/resources/empty-schema.avsc</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+    
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/067e9dfe/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
index a83327d..d2289a2 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
@@ -16,9 +16,7 @@
  */
 package org.apache.nifi.schemaregistry.hortonworks;
 
-import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
@@ -64,15 +62,18 @@ import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
 public class HortonworksSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
     private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT,
         SchemaField.SCHEMA_TEXT_FORMAT, SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
+
     private final ConcurrentMap<Tuple<SchemaIdentifier, String>, RecordSchema> schemaNameToSchemaMap = new ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Tuple<SchemaVersionInfo, Long>> schemaVersionCache = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, Tuple<SchemaVersionInfo, Long>> schemaVersionByNameCache = new ConcurrentHashMap<>();
+    private final ConcurrentMap<SchemaVersionKey, Tuple<SchemaVersionInfo, Long>> schemaVersionByKeyCache = new ConcurrentHashMap<>();
 
     private static final String LOGICAL_TYPE_DATE = "date";
     private static final String LOGICAL_TYPE_TIME_MILLIS = "time-millis";
     private static final String LOGICAL_TYPE_TIME_MICROS = "time-micros";
     private static final String LOGICAL_TYPE_TIMESTAMP_MILLIS = "timestamp-millis";
     private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros";
-    private static final long VERSION_INFO_CACHE_NANOS = TimeUnit.MINUTES.toNanos(1L);
+
+    private volatile long versionInfoCacheNanos;
 
     static final PropertyDescriptor URL = new PropertyDescriptor.Builder()
         .name("url")
@@ -83,33 +84,50 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
         .required(true)
         .build();
 
+    static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder()
+        .name("cache-size")
+        .displayName("Cache Size")
+        .description("Specifies how many Schemas should be cached from the Hortonworks Schema Registry")
+        .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+        .defaultValue("1000")
+        .required(true)
+        .build();
+
+    static final PropertyDescriptor CACHE_EXPIRATION = new PropertyDescriptor.Builder()
+        .name("cache-expiration")
+        .displayName("Cache Expiration")
+        .description("Specifies how long a Schema that is cached should remain in the cache. Once this time period elapses, a "
+            + "cached version of a schema will no longer be used, and the service will have to communicate with the "
+            + "Hortonworks Schema Registry again in order to obtain the schema.")
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .defaultValue("1 hour")
+        .required(true)
+        .build();
 
-    private static final List<PropertyDescriptor> propertyDescriptors = Collections.singletonList(URL);
     private volatile SchemaRegistryClient schemaRegistryClient;
     private volatile boolean initialized;
     private volatile Map<String, Object> schemaRegistryConfig;
 
-    public HortonworksSchemaRegistry() {
-    }
-
 
     @OnEnabled
     public void enable(final ConfigurationContext context) throws InitializationException {
         schemaRegistryConfig = new HashMap<>();
 
+        versionInfoCacheNanos = context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS);
+
         // The below properties may or may not need to be exposed to the end
         // user. We just need to watch usage patterns to see if sensible default
         // can satisfy NiFi requirements
         String urlValue = context.getProperty(URL).evaluateAttributeExpressions().getValue();
-        if (urlValue == null || urlValue.trim().length() == 0){
-            throw new IllegalArgumentException("'Schema Registry URL' must not  be nul or empty.");
+        if (urlValue == null || urlValue.trim().isEmpty()) {
+            throw new IllegalArgumentException("'Schema Registry URL' must not be null or empty.");
         }
 
         schemaRegistryConfig.put(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), urlValue);
         schemaRegistryConfig.put(SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_SIZE.name(), 10L);
-        schemaRegistryConfig.put(SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS.name(), 5000L);
-        schemaRegistryConfig.put(SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_SIZE.name(), 1000L);
-        schemaRegistryConfig.put(SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS.name(), 60 * 60 * 1000L);
+        schemaRegistryConfig.put(SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS.name(), context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.SECONDS));
+        schemaRegistryConfig.put(SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_SIZE.name(), context.getProperty(CACHE_SIZE).asInteger());
+        schemaRegistryConfig.put(SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS.name(), context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.SECONDS));
     }
 
 
@@ -126,11 +144,15 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return propertyDescriptors;
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(URL);
+        properties.add(CACHE_SIZE);
+        properties.add(CACHE_EXPIRATION);
+        return properties;
     }
 
 
-    private synchronized SchemaRegistryClient getClient() {
+    protected synchronized SchemaRegistryClient getClient() {
         if (!initialized) {
             schemaRegistryClient = new SchemaRegistryClient(schemaRegistryConfig);
             initialized = true;
@@ -142,14 +164,14 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
     private SchemaVersionInfo getLatestSchemaVersionInfo(final SchemaRegistryClient client, final String schemaName) throws org.apache.nifi.schema.access.SchemaNotFoundException {
         try {
             // Try to fetch the SchemaVersionInfo from the cache.
-            final Tuple<SchemaVersionInfo, Long> timestampedVersionInfo = schemaVersionCache.get(schemaName);
+            final Tuple<SchemaVersionInfo, Long> timestampedVersionInfo = schemaVersionByNameCache.get(schemaName);
 
             // Determine if the timestampedVersionInfo is expired
             boolean fetch = false;
             if (timestampedVersionInfo == null) {
                 fetch = true;
             } else {
-                final long minTimestamp = System.nanoTime() - VERSION_INFO_CACHE_NANOS;
+                final long minTimestamp = System.nanoTime() - versionInfoCacheNanos;
                 fetch = timestampedVersionInfo.getValue() < minTimestamp;
             }
 
@@ -166,7 +188,41 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
 
             // Store new version in cache.
             final Tuple<SchemaVersionInfo, Long> tuple = new Tuple<>(versionInfo, System.nanoTime());
-            schemaVersionCache.put(schemaName, tuple);
+            schemaVersionByNameCache.put(schemaName, tuple);
+            return versionInfo;
+        } catch (final SchemaNotFoundException e) {
+            throw new org.apache.nifi.schema.access.SchemaNotFoundException(e);
+        }
+    }
+
+    private SchemaVersionInfo getSchemaVersionInfo(final SchemaRegistryClient client, final SchemaVersionKey key) throws org.apache.nifi.schema.access.SchemaNotFoundException {
+        try {
+            // Try to fetch the SchemaVersionInfo from the cache.
+            final Tuple<SchemaVersionInfo, Long> timestampedVersionInfo = schemaVersionByKeyCache.get(key);
+
+            // Determine if the timestampedVersionInfo is expired
+            boolean fetch = false;
+            if (timestampedVersionInfo == null) {
+                fetch = true;
+            } else {
+                final long minTimestamp = System.nanoTime() - versionInfoCacheNanos;
+                fetch = timestampedVersionInfo.getValue() < minTimestamp;
+            }
+
+            // If not expired, use what we got from the cache
+            if (!fetch) {
+                return timestampedVersionInfo.getKey();
+            }
+
+            // schema version info was expired or not found in cache. Fetch from schema registry
+            final SchemaVersionInfo versionInfo = client.getSchemaVersionInfo(key);
+            if (versionInfo == null) {
+                throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + key.getSchemaName() + "' and version " + key.getVersion());
+            }
+
+            // Store new version in cache.
+            final Tuple<SchemaVersionInfo, Long> tuple = new Tuple<>(versionInfo, System.nanoTime());
+            schemaVersionByKeyCache.put(key, tuple);
             return versionInfo;
         } catch (final SchemaNotFoundException e) {
             throw new org.apache.nifi.schema.access.SchemaNotFoundException(e);
@@ -211,58 +267,50 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
 
 
     @Override
-    public String retrieveSchemaText(final long schemaId, final int version) throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException {
-        try {
-            final SchemaRegistryClient client = getClient();
-            final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId);
-            if (info == null) {
-                throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'");
-            }
-
-            final SchemaMetadata metadata = info.getSchemaMetadata();
-            final String schemaName = metadata.getName();
+    public String retrieveSchemaText(final long schemaId, final int version) throws org.apache.nifi.schema.access.SchemaNotFoundException {
+        final SchemaRegistryClient client = getClient();
+        final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId);
+        if (info == null) {
+            throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'");
+        }
 
-            final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version);
-            final SchemaVersionInfo versionInfo = client.getSchemaVersionInfo(schemaVersionKey);
-            if (versionInfo == null) {
-                throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'");
-            }
+        final SchemaMetadata metadata = info.getSchemaMetadata();
+        final String schemaName = metadata.getName();
 
-            return versionInfo.getSchemaText();
-        } catch (final SchemaNotFoundException e) {
-            throw new org.apache.nifi.schema.access.SchemaNotFoundException(e);
+        final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version);
+        final SchemaVersionInfo versionInfo = getSchemaVersionInfo(client, schemaVersionKey);
+        if (versionInfo == null) {
+            throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'");
         }
+
+        return versionInfo.getSchemaText();
     }
 
     @Override
-    public RecordSchema retrieveSchema(final long schemaId, final int version) throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException {
-        try {
-            final SchemaRegistryClient client = getClient();
-            final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId);
-            if (info == null) {
-                throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'");
-            }
+    public RecordSchema retrieveSchema(final long schemaId, final int version) throws org.apache.nifi.schema.access.SchemaNotFoundException {
+        final SchemaRegistryClient client = getClient();
+        final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId);
+        if (info == null) {
+            throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'");
+        }
 
-            final SchemaMetadata metadata = info.getSchemaMetadata();
-            final String schemaName = metadata.getName();
+        final SchemaMetadata metadata = info.getSchemaMetadata();
+        final String schemaName = metadata.getName();
 
-            final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version);
-            final SchemaVersionInfo versionInfo = client.getSchemaVersionInfo(schemaVersionKey);
-            if (versionInfo == null) {
-                throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'");
-            }
+        final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version);
+        final SchemaVersionInfo versionInfo = getSchemaVersionInfo(client, schemaVersionKey);
+        if (versionInfo == null) {
+            throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'");
+        }
 
-            final String schemaText = versionInfo.getSchemaText();
+        final String schemaText = versionInfo.getSchemaText();
 
-            final SchemaIdentifier schemaIdentifier = SchemaIdentifier.of(schemaName, schemaId, version);
-            final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(schemaIdentifier, schemaText);
-            return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> {
-                final Schema schema = new Schema.Parser().parse(schemaText);
-                return createRecordSchema(schema, schemaText, schemaIdentifier);
-            });
-        } catch (final SchemaNotFoundException e) {
-            throw new org.apache.nifi.schema.access.SchemaNotFoundException(e);
-        }
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.of(schemaName, schemaId, version);
+        final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(schemaIdentifier, schemaText);
+        return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> {
+            final Schema schema = new Schema.Parser().parse(schemaText);
+            return createRecordSchema(schema, schemaText, schemaIdentifier);
+        });
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/067e9dfe/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/java/org/apache/nifi/schemaregistry/hortonworks/TestHortonworksSchemaRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/java/org/apache/nifi/schemaregistry/hortonworks/TestHortonworksSchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/java/org/apache/nifi/schemaregistry/hortonworks/TestHortonworksSchemaRegistry.java
new file mode 100644
index 0000000..40f2ba7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/java/org/apache/nifi/schemaregistry/hortonworks/TestHortonworksSchemaRegistry.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schemaregistry.hortonworks;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+import java.lang.reflect.Constructor;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.MockConfigurationContext;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.hortonworks.registries.schemaregistry.SchemaCompatibility;
+import com.hortonworks.registries.schemaregistry.SchemaMetadata;
+import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo;
+import com.hortonworks.registries.schemaregistry.SchemaVersionInfo;
+import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
+import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
+
+public class TestHortonworksSchemaRegistry {
+    private HortonworksSchemaRegistry registry;
+    private SchemaRegistryClient client;
+
+    private final Map<String, SchemaVersionInfo> schemaVersionInfoMap = new HashMap<>();
+    private final Map<String, SchemaMetadataInfo> schemaMetadataInfoMap = new HashMap<>();
+
+    @Before
+    public void setup() throws SchemaNotFoundException {
+        schemaVersionInfoMap.clear();
+        schemaMetadataInfoMap.clear();
+
+        client = mock(SchemaRegistryClient.class);
+        doAnswer(new Answer<SchemaVersionInfo>() {
+            @Override
+            public SchemaVersionInfo answer(final InvocationOnMock invocation) throws Throwable {
+                final String schemaName = invocation.getArgumentAt(0, String.class);
+                final SchemaVersionInfo info = schemaVersionInfoMap.get(schemaName);
+
+                if (info == null) {
+                    throw new SchemaNotFoundException();
+                }
+
+                return info;
+            }
+        }).when(client).getLatestSchemaVersionInfo(any(String.class));
+
+        doAnswer(new Answer<SchemaMetadataInfo>() {
+            @Override
+            public SchemaMetadataInfo answer(InvocationOnMock invocation) throws Throwable {
+                final String schemaName = invocation.getArgumentAt(0, String.class);
+                final SchemaMetadataInfo info = schemaMetadataInfoMap.get(schemaName);
+
+                if (info == null) {
+                    throw new SchemaNotFoundException();
+                }
+
+                return info;
+            }
+        }).when(client).getSchemaMetadataInfo(any(String.class));
+
+        registry = new HortonworksSchemaRegistry() {
+            @Override
+            protected synchronized SchemaRegistryClient getClient() {
+                return client;
+            }
+        };
+    }
+
+    @Test
+    public void testCacheUsed() throws Exception {
+        final String text = new String(Files.readAllBytes(Paths.get("src/test/resources/empty-schema.avsc")));
+        final SchemaVersionInfo info = new SchemaVersionInfo(1, text, 2L, "description");
+        schemaVersionInfoMap.put("unit-test", info);
+
+        final SchemaMetadata metadata = new SchemaMetadata.Builder("unit-test")
+            .compatibility(SchemaCompatibility.NONE)
+            .evolve(true)
+            .schemaGroup("group")
+            .type("AVRO")
+            .build();
+
+        final Constructor<SchemaMetadataInfo> ctr = SchemaMetadataInfo.class.getDeclaredConstructor(SchemaMetadata.class, Long.class, Long.class);
+        ctr.setAccessible(true);
+
+        final SchemaMetadataInfo schemaMetadataInfo = ctr.newInstance(metadata, 1L, System.currentTimeMillis());
+
+        schemaMetadataInfoMap.put("unit-test", schemaMetadataInfo);
+
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(HortonworksSchemaRegistry.URL, "http://localhost:44444");
+        properties.put(HortonworksSchemaRegistry.CACHE_EXPIRATION, "5 mins");
+        properties.put(HortonworksSchemaRegistry.CACHE_SIZE, "1000");
+
+        final ConfigurationContext configurationContext = new MockConfigurationContext(properties, null);
+        registry.enable(configurationContext);
+
+        for (int i = 0; i < 10000; i++) {
+            final RecordSchema schema = registry.retrieveSchema("unit-test");
+            assertNotNull(schema);
+        }
+
+        Mockito.verify(client, Mockito.times(1)).getLatestSchemaVersionInfo(any(String.class));
+    }
+
+    @Test
+    @Ignore("This can be useful for manual testing/debugging, but will keep ignored for now because we don't want automated builds to run this, since it depends on timing")
+    public void testCacheExpires() throws Exception {
+        final String text = new String(Files.readAllBytes(Paths.get("src/test/resources/empty-schema.avsc")));
+        final SchemaVersionInfo info = new SchemaVersionInfo(1, text, 2L, "description");
+        schemaVersionInfoMap.put("unit-test", info);
+
+        final SchemaMetadata metadata = new SchemaMetadata.Builder("unit-test")
+            .compatibility(SchemaCompatibility.NONE)
+            .evolve(true)
+            .schemaGroup("group")
+            .type("AVRO")
+            .build();
+
+        final Constructor<SchemaMetadataInfo> ctr = SchemaMetadataInfo.class.getDeclaredConstructor(SchemaMetadata.class, Long.class, Long.class);
+        ctr.setAccessible(true);
+
+        final SchemaMetadataInfo schemaMetadataInfo = ctr.newInstance(metadata, 1L, System.currentTimeMillis());
+
+        schemaMetadataInfoMap.put("unit-test", schemaMetadataInfo);
+
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(HortonworksSchemaRegistry.URL, "http://localhost:44444");
+        properties.put(HortonworksSchemaRegistry.CACHE_EXPIRATION, "1 sec");
+        properties.put(HortonworksSchemaRegistry.CACHE_SIZE, "1000");
+
+        final ConfigurationContext configurationContext = new MockConfigurationContext(properties, null);
+        registry.enable(configurationContext);
+
+        for (int i = 0; i < 2; i++) {
+            final RecordSchema schema = registry.retrieveSchema("unit-test");
+            assertNotNull(schema);
+        }
+
+        Mockito.verify(client, Mockito.times(1)).getLatestSchemaVersionInfo(any(String.class));
+
+        Thread.sleep(2000L);
+
+        for (int i = 0; i < 2; i++) {
+            final RecordSchema schema = registry.retrieveSchema("unit-test");
+            assertNotNull(schema);
+        }
+
+        Mockito.verify(client, Mockito.times(2)).getLatestSchemaVersionInfo(any(String.class));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/067e9dfe/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/resources/empty-schema.avsc
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/resources/empty-schema.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/resources/empty-schema.avsc
new file mode 100644
index 0000000..67098d0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/resources/empty-schema.avsc
@@ -0,0 +1,6 @@
+{
+	"name": "unitTest",
+	"namespace": "org.apache.nifi",
+	"type": "record",
+	"fields": []
+}
\ No newline at end of file