You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by gg...@apache.org on 2022/08/29 11:16:49 UTC

[commons-jcs] branch master updated: Javadoc @see tags do not need to use a FQCN for classes in java.lang

This is an automated email from the ASF dual-hosted git repository.

ggregory pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-jcs.git


The following commit(s) were added to refs/heads/master by this push:
     new 62ee38cc Javadoc @see tags do not need to use a FQCN for classes in java.lang
62ee38cc is described below

commit 62ee38cc8fad67346f912a80606a0896ac089861
Author: Gary Gregory <gg...@rocketsoftware.com>
AuthorDate: Mon Aug 29 07:16:45 2022 -0400

    Javadoc @see tags do not need to use a FQCN for classes in java.lang
---
 .../AbstractAuxiliaryCacheAttributes.java          |  292 +-
 .../auxiliary/disk/indexed/IndexedDiskCache.java   | 3296 ++++++++++----------
 .../disk/indexed/IndexedDiskElementDescriptor.java |  232 +-
 .../lateral/socket/tcp/LateralTCPListener.java     | 1584 +++++-----
 .../jcs3/auxiliary/remote/RemoteCacheNoWait.java   | 1032 +++---
 .../jcs3/auxiliary/remote/RemoteLocation.java      |    2 +-
 .../jcs3/engine/CompositeCacheAttributes.java      |  884 +++---
 .../commons/jcs3/engine/ElementAttributes.java     |  918 +++---
 .../AbstractDoubleLinkedListMemoryCache.java       | 1020 +++---
 .../jcs3/utils/discovery/UDPCleanupRunner.java     |  112 +-
 .../commons/jcs3/utils/struct/AbstractLRUMap.java  | 1070 +++----
 .../jcs3/auxiliary/disk/DiskTestObject.java        |  162 +-
 12 files changed, 5302 insertions(+), 5302 deletions(-)

diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/AbstractAuxiliaryCacheAttributes.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/AbstractAuxiliaryCacheAttributes.java
index 757409a8..f3f166ac 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/AbstractAuxiliaryCacheAttributes.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/AbstractAuxiliaryCacheAttributes.java
@@ -1,146 +1,146 @@
-package org.apache.commons.jcs3.auxiliary;
-
-import org.apache.commons.jcs3.engine.behavior.ICacheEventQueue;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * This has common attributes used by all auxiliaries.
- */
-public abstract class AbstractAuxiliaryCacheAttributes
-    implements AuxiliaryCacheAttributes
-{
-    /** Don't change */
-    private static final long serialVersionUID = -6594609334959187673L;
-
-    /** cacheName */
-    private String cacheName;
-
-    /** name */
-    private String name;
-
-    /** eventQueueType -- pooled, or single threaded */
-    private ICacheEventQueue.QueueType eventQueueType;
-
-    /** Named when pooled */
-    private String eventQueuePoolName;
-
-    /**
-     * @param name
-     */
-    @Override
-    public void setCacheName( final String name )
-    {
-        this.cacheName = name;
-    }
-
-    /**
-     * Gets the cacheName attribute of the AuxiliaryCacheAttributes object
-     * <p>
-     * @return The cacheName value
-     */
-    @Override
-    public String getCacheName()
-    {
-        return this.cacheName;
-    }
-
-    /**
-     * This is the name of the auxiliary in configuration file.
-     * <p>
-     * @see org.apache.commons.jcs3.auxiliary.AuxiliaryCacheAttributes#setName(java.lang.String)
-     */
-    @Override
-    public void setName( final String s )
-    {
-        this.name = s;
-    }
-
-    /**
-     * Gets the name attribute of the AuxiliaryCacheAttributes object
-     * <p>
-     * @return The name value
-     */
-    @Override
-    public String getName()
-    {
-        return this.name;
-    }
-
-    /**
-     * SINGLE is the default. If you choose POOLED, the value of EventQueuePoolName will be used
-     * <p>
-     * @param queueType SINGLE or POOLED
-     */
-    @Override
-    public void setEventQueueType( final ICacheEventQueue.QueueType queueType )
-    {
-        this.eventQueueType = queueType;
-    }
-
-    /**
-     * @return SINGLE or POOLED
-     */
-    @Override
-    public ICacheEventQueue.QueueType getEventQueueType()
-    {
-        return eventQueueType;
-    }
-
-    /**
-     * If you choose a POOLED event queue type, the value of EventQueuePoolName will be used. This
-     * is ignored if the pool type is SINGLE
-     * <p>
-     * @param s SINGLE or POOLED
-     */
-    @Override
-    public void setEventQueuePoolName( final String s )
-    {
-        eventQueuePoolName = s;
-    }
-
-    /**
-     * Sets the pool name to use. If a pool is not found by this name, the thread pool manager will
-     * return a default configuration.
-     * <p>
-     * @return name of thread pool to use for this auxiliary
-     */
-    @Override
-    public String getEventQueuePoolName()
-    {
-        return eventQueuePoolName;
-    }
-
-    /**
-     * @see java.lang.Object#clone()
-     */
-    @Override
-    public AbstractAuxiliaryCacheAttributes clone()
-    {
-        try
-        {
-            return (AbstractAuxiliaryCacheAttributes)super.clone();
-        }
-        catch (final CloneNotSupportedException e)
-        {
-            throw new RuntimeException("Clone not supported. This should never happen.", e);
-        }
-    }
-}
+package org.apache.commons.jcs3.auxiliary;
+
+import org.apache.commons.jcs3.engine.behavior.ICacheEventQueue;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * This has common attributes used by all auxiliaries.
+ */
+public abstract class AbstractAuxiliaryCacheAttributes
+    implements AuxiliaryCacheAttributes
+{
+    /** Don't change */
+    private static final long serialVersionUID = -6594609334959187673L;
+
+    /** cacheName */
+    private String cacheName;
+
+    /** name */
+    private String name;
+
+    /** eventQueueType -- pooled, or single threaded */
+    private ICacheEventQueue.QueueType eventQueueType;
+
+    /** Named when pooled */
+    private String eventQueuePoolName;
+
+    /**
+     * @param name
+     */
+    @Override
+    public void setCacheName( final String name )
+    {
+        this.cacheName = name;
+    }
+
+    /**
+     * Gets the cacheName attribute of the AuxiliaryCacheAttributes object
+     * <p>
+     * @return The cacheName value
+     */
+    @Override
+    public String getCacheName()
+    {
+        return this.cacheName;
+    }
+
+    /**
+     * This is the name of the auxiliary in configuration file.
+     * <p>
+     * @see org.apache.commons.jcs3.auxiliary.AuxiliaryCacheAttributes#setName(String)
+     */
+    @Override
+    public void setName( final String s )
+    {
+        this.name = s;
+    }
+
+    /**
+     * Gets the name attribute of the AuxiliaryCacheAttributes object
+     * <p>
+     * @return The name value
+     */
+    @Override
+    public String getName()
+    {
+        return this.name;
+    }
+
+    /**
+     * SINGLE is the default. If you choose POOLED, the value of EventQueuePoolName will be used
+     * <p>
+     * @param queueType SINGLE or POOLED
+     */
+    @Override
+    public void setEventQueueType( final ICacheEventQueue.QueueType queueType )
+    {
+        this.eventQueueType = queueType;
+    }
+
+    /**
+     * @return SINGLE or POOLED
+     */
+    @Override
+    public ICacheEventQueue.QueueType getEventQueueType()
+    {
+        return eventQueueType;
+    }
+
+    /**
+     * If you choose a POOLED event queue type, the value of EventQueuePoolName will be used. This
+     * is ignored if the pool type is SINGLE
+     * <p>
+     * @param s SINGLE or POOLED
+     */
+    @Override
+    public void setEventQueuePoolName( final String s )
+    {
+        eventQueuePoolName = s;
+    }
+
+    /**
+     * Sets the pool name to use. If a pool is not found by this name, the thread pool manager will
+     * return a default configuration.
+     * <p>
+     * @return name of thread pool to use for this auxiliary
+     */
+    @Override
+    public String getEventQueuePoolName()
+    {
+        return eventQueuePoolName;
+    }
+
+    /**
+     * @see Object#clone()
+     */
+    @Override
+    public AbstractAuxiliaryCacheAttributes clone()
+    {
+        try
+        {
+            return (AbstractAuxiliaryCacheAttributes)super.clone();
+        }
+        catch (final CloneNotSupportedException e)
+        {
+            throw new RuntimeException("Clone not supported. This should never happen.", e);
+        }
+    }
+}
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/disk/indexed/IndexedDiskCache.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/disk/indexed/IndexedDiskCache.java
index ce2e94f4..f63638d5 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/disk/indexed/IndexedDiskCache.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/disk/indexed/IndexedDiskCache.java
@@ -1,1648 +1,1648 @@
-package org.apache.commons.jcs3.auxiliary.disk.indexed;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.commons.jcs3.auxiliary.AuxiliaryCacheAttributes;
-import org.apache.commons.jcs3.auxiliary.disk.AbstractDiskCache;
-import org.apache.commons.jcs3.auxiliary.disk.behavior.IDiskCacheAttributes.DiskLimitType;
-import org.apache.commons.jcs3.engine.behavior.ICacheElement;
-import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
-import org.apache.commons.jcs3.engine.control.group.GroupAttrName;
-import org.apache.commons.jcs3.engine.control.group.GroupId;
-import org.apache.commons.jcs3.engine.logging.behavior.ICacheEvent;
-import org.apache.commons.jcs3.engine.logging.behavior.ICacheEventLogger;
-import org.apache.commons.jcs3.engine.stats.StatElement;
-import org.apache.commons.jcs3.engine.stats.Stats;
-import org.apache.commons.jcs3.engine.stats.behavior.IStatElement;
-import org.apache.commons.jcs3.engine.stats.behavior.IStats;
-import org.apache.commons.jcs3.log.Log;
-import org.apache.commons.jcs3.log.LogManager;
-import org.apache.commons.jcs3.utils.struct.AbstractLRUMap;
-import org.apache.commons.jcs3.utils.struct.LRUMap;
-import org.apache.commons.jcs3.utils.timing.ElapsedTimer;
-
-/**
- * Disk cache that uses a RandomAccessFile with keys stored in memory. The maximum number of keys
- * stored in memory is configurable. The disk cache tries to recycle spots on disk to limit file
- * expansion.
- */
-public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V>
-{
-    /** The logger */
-    private static final Log log = LogManager.getLog(IndexedDiskCache.class);
-
-    /** Cache name used in log messages */
-    protected final String logCacheName;
-
-    /** The name of the file where the data is stored */
-    private final String fileName;
-
-    /** The IndexedDisk manages reads and writes to the data file. */
-    private IndexedDisk dataFile;
-
-    /** The IndexedDisk manages reads and writes to the key file. */
-    private IndexedDisk keyFile;
-
-    /** Map containing the keys and disk offsets. */
-    private final Map<K, IndexedDiskElementDescriptor> keyHash;
-
-    /** The maximum number of keys that we will keep in memory. */
-    private final int maxKeySize;
-
-    /** A handle on the data file. */
-    private File rafDir;
-
-    /** Should we keep adding to the recycle bin. False during optimization. */
-    private boolean doRecycle = true;
-
-    /** Should we optimize real time */
-    private boolean isRealTimeOptimizationEnabled = true;
-
-    /** Should we optimize on shutdown. */
-    private boolean isShutdownOptimizationEnabled = true;
-
-    /** are we currently optimizing the files */
-    private boolean isOptimizing;
-
-    /** The number of times the file has been optimized. */
-    private int timesOptimized;
-
-    /** The thread optimizing the file. */
-    private volatile Thread currentOptimizationThread;
-
-    /** used for counting the number of requests */
-    private int removeCount;
-
-    /** Should we queue puts. True when optimizing. We write the queue post optimization. */
-    private boolean queueInput;
-
-    /** list where puts made during optimization are made */
-    private final ConcurrentSkipListSet<IndexedDiskElementDescriptor> queuedPutList;
-
-    /** RECYCLE BIN -- array of empty spots */
-    private final ConcurrentSkipListSet<IndexedDiskElementDescriptor> recycle;
-
-    /** User configurable parameters */
-    private final IndexedDiskCacheAttributes cattr;
-
-    /** How many slots have we recycled. */
-    private int recycleCnt;
-
-    /** How many items were there on startup. */
-    private int startupSize;
-
-    /** the number of bytes free on disk. */
-    private final AtomicLong bytesFree = new AtomicLong();
-
-    /** mode we are working on (size or count limited **/
-    private DiskLimitType diskLimitType = DiskLimitType.COUNT;
-
-    /** simple stat */
-    private final AtomicInteger hitCount = new AtomicInteger(0);
-
-    /**
-     * Use this lock to synchronize reads and writes to the underlying storage mechanism.
-     */
-    protected ReentrantReadWriteLock storageLock = new ReentrantReadWriteLock();
-
-    /**
-     * Constructor for the DiskCache object.
-     * <p>
-     *
-     * @param cacheAttributes
-     */
-    public IndexedDiskCache(final IndexedDiskCacheAttributes cacheAttributes)
-    {
-        this(cacheAttributes, null);
-    }
-
-    /**
-     * Constructor for the DiskCache object.
-     * <p>
-     *
-     * @param cattr
-     * @param elementSerializer
-     *            used if supplied, the super's super will not set a null
-     */
-    public IndexedDiskCache(final IndexedDiskCacheAttributes cattr, final IElementSerializer elementSerializer)
-    {
-        super(cattr);
-
-        setElementSerializer(elementSerializer);
-
-        this.cattr = cattr;
-        this.maxKeySize = cattr.getMaxKeySize();
-        this.isRealTimeOptimizationEnabled = cattr.getOptimizeAtRemoveCount() > 0;
-        this.isShutdownOptimizationEnabled = cattr.isOptimizeOnShutdown();
-        this.logCacheName = "Region [" + getCacheName() + "] ";
-        this.diskLimitType = cattr.getDiskLimitType();
-        // Make a clean file name
-        this.fileName = getCacheName().replaceAll("[^a-zA-Z0-9-_\\.]", "_");
-        this.keyHash = createInitialKeyMap();
-        this.queuedPutList = new ConcurrentSkipListSet<>(new PositionComparator());
-        this.recycle = new ConcurrentSkipListSet<>();
-
-        try
-        {
-            initializeFileSystem(cattr);
-            initializeKeysAndData(cattr);
-
-            // Initialization finished successfully, so set alive to true.
-            setAlive(true);
-            log.info("{0}: Indexed Disk Cache is alive.", logCacheName);
-
-            // TODO: Should we improve detection of whether or not the file should be optimized.
-            if (isRealTimeOptimizationEnabled && !keyHash.isEmpty())
-            {
-                // Kick off a real time optimization, in case we didn't do a final optimization.
-                doOptimizeRealTime();
-            }
-        }
-        catch (final IOException e)
-        {
-            log.error("{0}: Failure initializing for fileName: {1} and directory: {2}",
-                    logCacheName, fileName, this.rafDir.getAbsolutePath(), e);
-        }
-    }
-
-    /**
-     * Tries to create the root directory if it does not already exist.
-     * <p>
-     *
-     * @param cattr
-     */
-    private void initializeFileSystem(final IndexedDiskCacheAttributes cattr)
-    {
-        this.rafDir = cattr.getDiskPath();
-        log.info("{0}: Cache file root directory: {1}", logCacheName, rafDir);
-    }
-
-    /**
-     * Creates the key and data disk caches.
-     * <p>
-     * Loads any keys if they are present and ClearDiskOnStartup is false.
-     * <p>
-     *
-     * @param cattr
-     * @throws IOException
-     */
-    private void initializeKeysAndData(final IndexedDiskCacheAttributes cattr) throws IOException
-    {
-        this.dataFile = new IndexedDisk(new File(rafDir, fileName + ".data"), getElementSerializer());
-        this.keyFile = new IndexedDisk(new File(rafDir, fileName + ".key"), getElementSerializer());
-
-        if (cattr.isClearDiskOnStartup())
-        {
-            log.info("{0}: ClearDiskOnStartup is set to true. Ignoring any persisted data.",
-                    logCacheName);
-            initializeEmptyStore();
-        }
-        else if (!keyFile.isEmpty())
-        {
-            // If the key file has contents, try to initialize the keys
-            // from it. In no keys are loaded reset the data file.
-            initializeStoreFromPersistedData();
-        }
-        else
-        {
-            // Otherwise start with a new empty map for the keys, and reset
-            // the data file if it has contents.
-            initializeEmptyStore();
-        }
-    }
-
-    /**
-     * Initializes an empty disk cache.
-     * <p>
-     *
-     * @throws IOException
-     */
-    private void initializeEmptyStore() throws IOException
-    {
-        this.keyHash.clear();
-
-        if (!dataFile.isEmpty())
-        {
-            dataFile.reset();
-        }
-    }
-
-    /**
-     * Loads any persisted data and checks for consistency. If there is a consistency issue, the
-     * files are cleared.
-     * <p>
-     *
-     * @throws IOException
-     */
-    private void initializeStoreFromPersistedData() throws IOException
-    {
-        loadKeys();
-
-        if (keyHash.isEmpty())
-        {
-            dataFile.reset();
-        }
-        else
-        {
-            final boolean isOk = checkKeyDataConsistency(false);
-            if (!isOk)
-            {
-                keyHash.clear();
-                keyFile.reset();
-                dataFile.reset();
-                log.warn("{0}: Corruption detected. Resetting data and keys files.", logCacheName);
-            }
-            else
-            {
-                synchronized (this)
-                {
-                    startupSize = keyHash.size();
-                }
-            }
-        }
-    }
-
-    /**
-     * Loads the keys from the .key file. The keys are stored in a HashMap on disk. This is
-     * converted into a LRUMap.
-     */
-    protected void loadKeys()
-    {
-        log.debug("{0}: Loading keys for {1}", () -> logCacheName, () -> keyFile.toString());
-
-        storageLock.writeLock().lock();
-
-        try
-        {
-            // clear a key map to use.
-            keyHash.clear();
-
-            final HashMap<K, IndexedDiskElementDescriptor> keys = keyFile.readObject(
-                new IndexedDiskElementDescriptor(0, (int) keyFile.length() - IndexedDisk.HEADER_SIZE_BYTES));
-
-            if (keys != null)
-            {
-                log.debug("{0}: Found {1} in keys file.", logCacheName, keys.size());
-
-                keyHash.putAll(keys);
-
-                log.info("{0}: Loaded keys from [{1}], key count: {2}; up to {3} will be available.",
-                        () -> logCacheName, () -> fileName, keyHash::size, () -> maxKeySize);
-            }
-
-            if (log.isTraceEnabled())
-            {
-                dump(false);
-            }
-        }
-        catch (final Exception e)
-        {
-            log.error("{0}: Problem loading keys for file {1}", logCacheName, fileName, e);
-        }
-        finally
-        {
-            storageLock.writeLock().unlock();
-        }
-    }
-
-    /**
-     * Check for minimal consistency between the keys and the datafile. Makes sure no starting
-     * positions in the keys exceed the file length.
-     * <p>
-     * The caller should take the appropriate action if the keys and data are not consistent.
-     *
-     * @param checkForDedOverlaps
-     *            if <code>true</code>, do a more thorough check by checking for
-     *            data overlap
-     * @return <code>true</code> if the test passes
-     */
-    private boolean checkKeyDataConsistency(final boolean checkForDedOverlaps)
-    {
-        final ElapsedTimer timer = new ElapsedTimer();
-        log.debug("{0}: Performing inital consistency check", logCacheName);
-
-        boolean isOk = true;
-        try
-        {
-            final long fileLength = dataFile.length();
-
-            final IndexedDiskElementDescriptor corruptDed = keyHash.values().stream()
-                .filter(ded -> ded.pos + IndexedDisk.HEADER_SIZE_BYTES + ded.len > fileLength)
-                .findFirst()
-                .orElse(null);
-
-            if (corruptDed != null)
-            {
-                isOk = false;
-                log.warn("{0}: The dataFile is corrupted!\n raf.length() = {1}\n ded.pos = {2}",
-                        logCacheName, fileLength, corruptDed.pos);
-            }
-            else if (checkForDedOverlaps)
-            {
-                isOk = checkForDedOverlaps(createPositionSortedDescriptorList());
-            }
-        }
-        catch (final IOException e)
-        {
-            log.error(e);
-            isOk = false;
-        }
-
-        log.info("{0}: Finished inital consistency check, isOk = {1} in {2}",
-                logCacheName, isOk, timer.getElapsedTimeString());
-
-        return isOk;
-    }
-
-    /**
-     * Detects any overlapping elements. This expects a sorted list.
-     * <p>
-     * The total length of an item is IndexedDisk.RECORD_HEADER + ded.len.
-     * <p>
-     *
-     * @param sortedDescriptors
-     * @return false if there are overlaps.
-     */
-    protected boolean checkForDedOverlaps(final IndexedDiskElementDescriptor[] sortedDescriptors)
-    {
-        final ElapsedTimer timer = new ElapsedTimer();
-        boolean isOk = true;
-        long expectedNextPos = 0;
-        for (final IndexedDiskElementDescriptor ded : sortedDescriptors) {
-            if (expectedNextPos > ded.pos)
-            {
-                log.error("{0}: Corrupt file: overlapping deds {1}", logCacheName, ded);
-                isOk = false;
-                break;
-            }
-            expectedNextPos = ded.pos + IndexedDisk.HEADER_SIZE_BYTES + ded.len;
-        }
-        log.debug("{0}: Check for DED overlaps took {1} ms.", () -> logCacheName,
-                timer::getElapsedTime);
-
-        return isOk;
-    }
-
-    /**
-     * Saves key file to disk. This converts the LRUMap to a HashMap for deserialization.
-     */
-    protected void saveKeys()
-    {
-        try
-        {
-            log.info("{0}: Saving keys to: {1}, key count: {2}",
-                    () -> logCacheName, () -> fileName, keyHash::size);
-
-            keyFile.reset();
-
-            final HashMap<K, IndexedDiskElementDescriptor> keys = new HashMap<>(keyHash);
-            if (!keys.isEmpty())
-            {
-                keyFile.writeObject(keys, 0);
-            }
-
-            log.info("{0}: Finished saving keys.", logCacheName);
-        }
-        catch (final IOException e)
-        {
-            log.error("{0}: Problem storing keys.", logCacheName, e);
-        }
-    }
-
-    /**
-     * Update the disk cache. Called from the Queue. Makes sure the Item has not been retrieved from
-     * purgatory while in queue for disk. Remove items from purgatory when they go to disk.
-     * <p>
-     *
-     * @param ce
-     *            The ICacheElement&lt;K, V&gt; to put to disk.
-     */
-    @Override
-    protected void processUpdate(final ICacheElement<K, V> ce)
-    {
-        if (!isAlive())
-        {
-            log.error("{0}: No longer alive; aborting put of key = {1}",
-                    () -> logCacheName, ce::getKey);
-            return;
-        }
-
-        log.debug("{0}: Storing element on disk, key: {1}",
-                () -> logCacheName, ce::getKey);
-
-        // old element with same key
-        IndexedDiskElementDescriptor old = null;
-
-        try
-        {
-            IndexedDiskElementDescriptor ded = null;
-            final byte[] data = getElementSerializer().serialize(ce);
-
-            // make sure this only locks for one particular cache region
-            storageLock.writeLock().lock();
-            try
-            {
-                old = keyHash.get(ce.getKey());
-
-                // Item with the same key already exists in file.
-                // Try to reuse the location if possible.
-                if (old != null && data.length <= old.len)
-                {
-                    // Reuse the old ded. The defrag relies on ded updates by reference, not
-                    // replacement.
-                    ded = old;
-                    ded.len = data.length;
-                }
-                else
-                {
-                    // we need this to compare in the recycle bin
-                    ded = new IndexedDiskElementDescriptor(dataFile.length(), data.length);
-
-                    if (doRecycle)
-                    {
-                        final IndexedDiskElementDescriptor rep = recycle.ceiling(ded);
-                        if (rep != null)
-                        {
-                            // remove element from recycle bin
-                            recycle.remove(rep);
-                            ded = rep;
-                            ded.len = data.length;
-                            recycleCnt++;
-                            this.adjustBytesFree(ded, false);
-                            log.debug("{0}: using recycled ded {1} rep.len = {2} ded.len = {3}",
-                                    logCacheName, ded.pos, rep.len, ded.len);
-                        }
-                    }
-
-                    // Put it in the map
-                    keyHash.put(ce.getKey(), ded);
-
-                    if (queueInput)
-                    {
-                        queuedPutList.add(ded);
-                        log.debug("{0}: added to queued put list. {1}",
-                                () -> logCacheName, queuedPutList::size);
-                    }
-
-                    // add the old slot to the recycle bin
-                    if (old != null)
-                    {
-                        addToRecycleBin(old);
-                    }
-                }
-
-                dataFile.write(ded, data);
-            }
-            finally
-            {
-                storageLock.writeLock().unlock();
-            }
-
-            log.debug("{0}: Put to file: {1}, key: {2}, position: {3}, size: {4}",
-                    logCacheName, fileName, ce.getKey(), ded.pos, ded.len);
-        }
-        catch (final IOException e)
-        {
-            log.error("{0}: Failure updating element, key: {1} old: {2}",
-                    logCacheName, ce.getKey(), old, e);
-        }
-    }
-
-    /**
-     * Gets the key, then goes to disk to get the object.
-     * <p>
-     *
-     * @param key
-     * @return ICacheElement&lt;K, V&gt; or null
-     * @see AbstractDiskCache#doGet
-     */
-    @Override
-    protected ICacheElement<K, V> processGet(final K key)
-    {
-        if (!isAlive())
-        {
-            log.error("{0}: No longer alive so returning null for key = {1}",
-                    logCacheName, key);
-            return null;
-        }
-
-        log.debug("{0}: Trying to get from disk: {1}", logCacheName, key);
-
-        ICacheElement<K, V> object = null;
-        try
-        {
-            storageLock.readLock().lock();
-            try
-            {
-                object = readElement(key);
-            }
-            finally
-            {
-                storageLock.readLock().unlock();
-            }
-
-            if (object != null)
-            {
-                hitCount.incrementAndGet();
-            }
-        }
-        catch (final IOException ioe)
-        {
-            log.error("{0}: Failure getting from disk, key = {1}", logCacheName, key, ioe);
-            reset();
-        }
-        return object;
-    }
-
-    /**
-     * Gets matching items from the cache.
-     * <p>
-     *
-     * @param pattern
-     * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
-     *         data in cache matching keys
-     */
-    @Override
-    public Map<K, ICacheElement<K, V>> processGetMatching(final String pattern)
-    {
-        final Map<K, ICacheElement<K, V>> elements = new HashMap<>();
-        Set<K> keyArray = null;
-        storageLock.readLock().lock();
-        try
-        {
-            keyArray = new HashSet<>(keyHash.keySet());
-        }
-        finally
-        {
-            storageLock.readLock().unlock();
-        }
-
-        final Set<K> matchingKeys = getKeyMatcher().getMatchingKeysFromArray(pattern, keyArray);
-
-        for (final K key : matchingKeys)
-        {
-            final ICacheElement<K, V> element = processGet(key);
-            if (element != null)
-            {
-                elements.put(key, element);
-            }
-        }
-        return elements;
-    }
-
-    /**
-     * Reads the item from disk.
-     * <p>
-     *
-     * @param key
-     * @return ICacheElement
-     * @throws IOException
-     */
-    private ICacheElement<K, V> readElement(final K key) throws IOException
-    {
-        final IndexedDiskElementDescriptor ded = keyHash.get(key);
-
-        if (ded != null)
-        {
-            log.debug("{0}: Found on disk, key: ", logCacheName, key);
-
-            try
-            {
-                return dataFile.readObject(ded);
-                // TODO consider checking key equality and throwing if there is a failure
-            }
-            catch (final IOException e)
-            {
-                log.error("{0}: IO Exception, Problem reading object from file", logCacheName, e);
-                throw e;
-            }
-            catch (final Exception e)
-            {
-                log.error("{0}: Exception, Problem reading object from file", logCacheName, e);
-                throw new IOException(logCacheName + "Problem reading object from disk.", e);
-            }
-        }
-
-        return null;
-    }
-
-    /**
-     * Return the keys in this cache.
-     * <p>
-     *
-     * @see org.apache.commons.jcs3.auxiliary.disk.AbstractDiskCache#getKeySet()
-     */
-    @Override
-    public Set<K> getKeySet() throws IOException
-    {
-        final HashSet<K> keys = new HashSet<>();
-
-        storageLock.readLock().lock();
-
-        try
-        {
-            keys.addAll(this.keyHash.keySet());
-        }
-        finally
-        {
-            storageLock.readLock().unlock();
-        }
-
-        return keys;
-    }
-
-    /**
-     * Returns true if the removal was successful; or false if there is nothing to remove. Current
-     * implementation always result in a disk orphan.
-     * <p>
-     *
-     * @return true if at least one item was removed.
-     * @param key
-     */
-    @Override
-    protected boolean processRemove(final K key)
-    {
-        if (!isAlive())
-        {
-            log.error("{0}: No longer alive so returning false for key = {1}", logCacheName, key);
-            return false;
-        }
-
-        if (key == null)
-        {
-            return false;
-        }
-
-        boolean removed = false;
-        try
-        {
-            storageLock.writeLock().lock();
-
-            if (key instanceof String && key.toString().endsWith(NAME_COMPONENT_DELIMITER))
-            {
-                removed = performPartialKeyRemoval((String) key);
-            }
-            else if (key instanceof GroupAttrName && ((GroupAttrName<?>) key).attrName == null)
-            {
-                removed = performGroupRemoval(((GroupAttrName<?>) key).groupId);
-            }
-            else
-            {
-                removed = performSingleKeyRemoval(key);
-            }
-        }
-        finally
-        {
-            storageLock.writeLock().unlock();
-        }
-
-        // this increments the remove count.
-        // there is no reason to call this if an item was not removed.
-        if (removed)
-        {
-            doOptimizeRealTime();
-        }
-
-        return removed;
-    }
-
-    /**
-     * Iterates over the keyset. Builds a list of matches. Removes all the keys in the list. Does
-     * not remove via the iterator, since the map impl may not support it.
-     * <p>
-     * This operates under a lock obtained in doRemove().
-     * <p>
-     *
-     * @param key
-     * @return true if there was a match
-     */
-    private boolean performPartialKeyRemoval(final String key)
-    {
-        boolean removed = false;
-
-        // remove all keys of the same name hierarchy.
-        final List<K> itemsToRemove = new LinkedList<>();
-
-        for (final K k : keyHash.keySet())
-        {
-            if (k instanceof String && k.toString().startsWith(key))
-            {
-                itemsToRemove.add(k);
-            }
-        }
-
-        // remove matches.
-        for (final K fullKey : itemsToRemove)
-        {
-            // Don't add to recycle bin here
-            // https://issues.apache.org/jira/browse/JCS-67
-            performSingleKeyRemoval(fullKey);
-            removed = true;
-            // TODO this needs to update the remove count separately
-        }
-
-        return removed;
-    }
-
-    /**
-     * Remove all elements from the group. This does not use the iterator to remove. It builds a
-     * list of group elements and then removes them one by one.
-     * <p>
-     * This operates under a lock obtained in doRemove().
-     * <p>
-     *
-     * @param key
-     * @return true if an element was removed
-     */
-    private boolean performGroupRemoval(final GroupId key)
-    {
-        boolean removed = false;
-
-        // remove all keys of the same name group.
-        final List<K> itemsToRemove = new LinkedList<>();
-
-        // remove all keys of the same name hierarchy.
-        for (final K k : keyHash.keySet())
-        {
-            if (k instanceof GroupAttrName && ((GroupAttrName<?>) k).groupId.equals(key))
-            {
-                itemsToRemove.add(k);
-            }
-        }
-
-        // remove matches.
-        for (final K fullKey : itemsToRemove)
-        {
-            // Don't add to recycle bin here
-            // https://issues.apache.org/jira/browse/JCS-67
-            performSingleKeyRemoval(fullKey);
-            removed = true;
-            // TODO this needs to update the remove count separately
-        }
-
-        return removed;
-    }
-
-    /**
-     * Removes an individual key from the cache.
-     * <p>
-     * This operates under a lock obtained in doRemove().
-     * <p>
-     *
-     * @param key
-     * @return true if an item was removed.
-     */
-    private boolean performSingleKeyRemoval(final K key)
-    {
-        final boolean removed;
-        // remove single item.
-        final IndexedDiskElementDescriptor ded = keyHash.remove(key);
-        removed = ded != null;
-        addToRecycleBin(ded);
-
-        log.debug("{0}: Disk removal: Removed from key hash, key [{1}] removed = {2}",
-                logCacheName, key, removed);
-        return removed;
-    }
-
-    /**
-     * Remove all the items from the disk cache by resetting everything.
-     */
-    @Override
-    public void processRemoveAll()
-    {
-        final ICacheEvent<String> cacheEvent =
-                createICacheEvent(getCacheName(), "all", ICacheEventLogger.REMOVEALL_EVENT);
-        try
-        {
-            reset();
-        }
-        finally
-        {
-            logICacheEvent(cacheEvent);
-        }
-    }
-
-    /**
-     * Reset effectively clears the disk cache, creating new files, recycle bins, and keymaps.
-     * <p>
-     * It can be used to handle errors by last resort, force content update, or removeall.
-     */
-    private void reset()
-    {
-        log.info("{0}: Resetting cache", logCacheName);
-
-        try
-        {
-            storageLock.writeLock().lock();
-
-            if (dataFile != null)
-            {
-                dataFile.close();
-            }
-
-            final File dataFileTemp = new File(rafDir, fileName + ".data");
-            Files.delete(dataFileTemp.toPath());
-
-            if (keyFile != null)
-            {
-                keyFile.close();
-            }
-            final File keyFileTemp = new File(rafDir, fileName + ".key");
-            Files.delete(keyFileTemp.toPath());
-
-            dataFile = new IndexedDisk(dataFileTemp, getElementSerializer());
-            keyFile = new IndexedDisk(keyFileTemp, getElementSerializer());
-
-            this.recycle.clear();
-            this.keyHash.clear();
-        }
-        catch (final IOException e)
-        {
-            log.error("{0}: Failure resetting state", logCacheName, e);
-        }
-        finally
-        {
-            storageLock.writeLock().unlock();
-        }
-    }
-
-    /**
-     * Create the map for keys that contain the index position on disk.
-     *
-     * @return a new empty Map for keys and IndexedDiskElementDescriptors
-     */
-    private Map<K, IndexedDiskElementDescriptor> createInitialKeyMap()
-    {
-        Map<K, IndexedDiskElementDescriptor> keyMap = null;
-        if (maxKeySize >= 0)
-        {
-            if (this.diskLimitType == DiskLimitType.COUNT)
-            {
-                keyMap = new LRUMapCountLimited(maxKeySize);
-            }
-            else
-            {
-                keyMap = new LRUMapSizeLimited(maxKeySize);
-            }
-
-            log.info("{0}: Set maxKeySize to: \"{1}\"", logCacheName, maxKeySize);
-        }
-        else
-        {
-            // If no max size, use a plain map for memory and processing efficiency.
-            keyMap = new HashMap<>();
-            // keyHash = Collections.synchronizedMap( new HashMap() );
-            log.info("{0}: Set maxKeySize to unlimited", logCacheName);
-        }
-
-        return keyMap;
-    }
-
-    /**
-     * Dispose of the disk cache in a background thread. Joins against this thread to put a cap on
-     * the disposal time.
-     * <p>
-     * TODO make dispose window configurable.
-     */
-    @Override
-    public void processDispose()
-    {
-        final ICacheEvent<String> cacheEvent = createICacheEvent(getCacheName(), "none", ICacheEventLogger.DISPOSE_EVENT);
-        try
-        {
-            final Thread t = new Thread(this::disposeInternal, "IndexedDiskCache-DisposalThread");
-            t.start();
-            // wait up to 60 seconds for dispose and then quit if not done.
-            try
-            {
-                t.join(60 * 1000);
-            }
-            catch (final InterruptedException ex)
-            {
-                log.error("{0}: Interrupted while waiting for disposal thread to finish.",
-                        logCacheName, ex);
-            }
-        }
-        finally
-        {
-            logICacheEvent(cacheEvent);
-        }
-    }
-
-    /**
-     * Internal method that handles the disposal.
-     */
-    protected void disposeInternal()
-    {
-        if (!isAlive())
-        {
-            log.error("{0}: Not alive and dispose was called, filename: {1}",
-                    logCacheName, fileName);
-            return;
-        }
-
-        // Prevents any interaction with the cache while we're shutting down.
-        setAlive(false);
-
-        final Thread optimizationThread = currentOptimizationThread;
-        if (isRealTimeOptimizationEnabled && optimizationThread != null)
-        {
-            // Join with the current optimization thread.
-            log.debug("{0}: In dispose, optimization already in progress; waiting for completion.",
-                    logCacheName);
-
-            try
-            {
-                optimizationThread.join();
-            }
-            catch (final InterruptedException e)
-            {
-                log.error("{0}: Unable to join current optimization thread.",
-                        logCacheName, e);
-            }
-        }
-        else if (isShutdownOptimizationEnabled && this.getBytesFree() > 0)
-        {
-            optimizeFile();
-        }
-
-        saveKeys();
-
-        try
-        {
-            log.debug("{0}: Closing files, base filename: {1}", logCacheName,
-                    fileName);
-            dataFile.close();
-            dataFile = null;
-            keyFile.close();
-            keyFile = null;
-        }
-        catch (final IOException e)
-        {
-            log.error("{0}: Failure closing files in dispose, filename: {1}",
-                    logCacheName, fileName, e);
-        }
-
-        log.info("{0}: Shutdown complete.", logCacheName);
-    }
-
-    /**
-     * Add descriptor to recycle bin if it is not null. Adds the length of the item to the bytes
-     * free.
-     * <p>
-     * This is called in three places: (1) When an item is removed. All item removals funnel down to the removeSingleItem method.
-     * (2) When an item on disk is updated with a value that will not fit in the previous slot. (3) When the max key size is
-     * reached, the freed slot will be added.
-     * <p>
-     *
-     * @param ded
-     */
-    protected void addToRecycleBin(final IndexedDiskElementDescriptor ded)
-    {
-        // reuse the spot
-        if (ded != null)
-        {
-            storageLock.readLock().lock();
-
-            try
-            {
-                adjustBytesFree(ded, true);
-
-                if (doRecycle)
-                {
-                    recycle.add(ded);
-                    log.debug("{0}: recycled ded {1}", logCacheName, ded);
-                }
-            }
-            finally
-            {
-                storageLock.readLock().unlock();
-            }
-        }
-    }
-
-    /**
-     * Performs the check for optimization, and if it is required, do it.
-     */
-    protected void doOptimizeRealTime()
-    {
-        if (isRealTimeOptimizationEnabled && !isOptimizing
-            && removeCount++ >= cattr.getOptimizeAtRemoveCount())
-        {
-            isOptimizing = true;
-
-            log.info("{0}: Optimizing file. removeCount [{1}] OptimizeAtRemoveCount [{2}]",
-                    logCacheName, removeCount, cattr.getOptimizeAtRemoveCount());
-
-            if (currentOptimizationThread == null)
-            {
-                storageLock.writeLock().lock();
-
-                try
-                {
-                    if (currentOptimizationThread == null)
-                    {
-                        currentOptimizationThread = new Thread(() -> {
-                            optimizeFile();
-                            currentOptimizationThread = null;
-                        }, "IndexedDiskCache-OptimizationThread");
-                    }
-                }
-                finally
-                {
-                    storageLock.writeLock().unlock();
-                }
-
-                if (currentOptimizationThread != null)
-                {
-                    currentOptimizationThread.start();
-                }
-            }
-        }
-    }
-
-    /**
-     * File optimization is handled by this method. It works as follows:
-     * <ol>
-     * <li>Shutdown recycling and turn on queuing of puts.</li>
-     * <li>Take a snapshot of the current descriptors. If there are any removes, ignore them, as they will be compacted during the
-     * next optimization.</li>
-     * <li>Optimize the snapshot. For each descriptor:
-     * <ol>
-     * <li>Obtain the write-lock.</li>
-     * <li>Shift the element on the disk, in order to compact out the free space.</li>
-     * <li>Release the write-lock. This allows elements to still be accessible during optimization.</li>
-     * </ol>
-     * </li>
-     * <li>Obtain the write-lock.</li>
-     * <li>All queued puts are made at the end of the file. Optimize these under a single write-lock.</li>
-     * <li>Truncate the file.</li>
-     * <li>Release the write-lock.</li>
-     * <li>Restore system to standard operation.</li>
-     * </ol>
-     */
-    protected void optimizeFile()
-    {
-        final ElapsedTimer timer = new ElapsedTimer();
-        timesOptimized++;
-        log.info("{0}: Beginning Optimization #{1}", logCacheName, timesOptimized);
-
-        // CREATE SNAPSHOT
-        IndexedDiskElementDescriptor[] defragList = null;
-
-        storageLock.writeLock().lock();
-
-        try
-        {
-            queueInput = true;
-            // shut off recycle while we're optimizing,
-            doRecycle = false;
-            defragList = createPositionSortedDescriptorList();
-        }
-        finally
-        {
-            storageLock.writeLock().unlock();
-        }
-
-        // Defrag the file outside of the write lock. This allows a move to be made,
-        // and yet have the element still accessible for reading or writing.
-        long expectedNextPos = defragFile(defragList, 0);
-
-        // ADD THE QUEUED ITEMS to the end and then truncate
-        storageLock.writeLock().lock();
-
-        try
-        {
-            try
-            {
-                if (!queuedPutList.isEmpty())
-                {
-                    defragList = queuedPutList.toArray(new IndexedDiskElementDescriptor[queuedPutList.size()]);
-
-                    // pack them at the end
-                    expectedNextPos = defragFile(defragList, expectedNextPos);
-                }
-                // TRUNCATE THE FILE
-                dataFile.truncate(expectedNextPos);
-            }
-            catch (final IOException e)
-            {
-                log.error("{0}: Error optimizing queued puts.", logCacheName, e);
-            }
-
-            // RESTORE NORMAL OPERATION
-            removeCount = 0;
-            resetBytesFree();
-            this.recycle.clear();
-            queuedPutList.clear();
-            queueInput = false;
-            // turn recycle back on.
-            doRecycle = true;
-            isOptimizing = false;
-        }
-        finally
-        {
-            storageLock.writeLock().unlock();
-        }
-
-        log.info("{0}: Finished #{1}, Optimization took {2}",
-                logCacheName, timesOptimized, timer.getElapsedTimeString());
-    }
-
-    /**
-     * Defragments the file in place by compacting out the free space (i.e., moving records
-     * forward). If there were no gaps the resulting file would be the same size as the previous
-     * file. This must be supplied an ordered defragList.
-     * <p>
-     *
-     * @param defragList
-     *            sorted list of descriptors for optimization
-     * @param startingPos
-     *            the start position in the file
-     * @return this is the potential new file end
-     */
-    private long defragFile(final IndexedDiskElementDescriptor[] defragList, final long startingPos)
-    {
-        final ElapsedTimer timer = new ElapsedTimer();
-        long preFileSize = 0;
-        long postFileSize = 0;
-        long expectedNextPos = 0;
-        try
-        {
-            preFileSize = this.dataFile.length();
-            // find the first gap in the disk and start defragging.
-            expectedNextPos = startingPos;
-            for (final IndexedDiskElementDescriptor element : defragList) {
-                storageLock.writeLock().lock();
-                try
-                {
-                    if (expectedNextPos != element.pos)
-                    {
-                        dataFile.move(element, expectedNextPos);
-                    }
-                    expectedNextPos = element.pos + IndexedDisk.HEADER_SIZE_BYTES + element.len;
-                }
-                finally
-                {
-                    storageLock.writeLock().unlock();
-                }
-            }
-
-            postFileSize = this.dataFile.length();
-
-            // this is the potential new file end
-            return expectedNextPos;
-        }
-        catch (final IOException e)
-        {
-            log.error("{0}: Error occurred during defragmentation.", logCacheName, e);
-        }
-        finally
-        {
-            log.info("{0}: Defragmentation took {1}. File Size (before={2}) (after={3}) (truncating to {4})",
-                    logCacheName, timer.getElapsedTimeString(), preFileSize, postFileSize, expectedNextPos);
-        }
-
-        return 0;
-    }
-
-    /**
-     * Creates a snapshot of the IndexedDiskElementDescriptors in the keyHash and returns them
-     * sorted by position in the dataFile.
-     * <p>
-     *
-     * @return IndexedDiskElementDescriptor[]
-     */
-    private IndexedDiskElementDescriptor[] createPositionSortedDescriptorList()
-    {
-        final List<IndexedDiskElementDescriptor> defragList = new ArrayList<>(keyHash.values());
-        Collections.sort(defragList, (ded1, ded2) -> Long.compare(ded1.pos, ded2.pos));
-
-        return defragList.toArray(new IndexedDiskElementDescriptor[0]);
-    }
-
-    /**
-     * Returns the current cache size.
-     * <p>
-     *
-     * @return The size value
-     */
-    @Override
-    public int getSize()
-    {
-        return keyHash.size();
-    }
-
-    /**
-     * Returns the size of the recycle bin in number of elements.
-     * <p>
-     *
-     * @return The number of items in the bin.
-     */
-    protected int getRecyleBinSize()
-    {
-        return this.recycle.size();
-    }
-
-    /**
-     * Returns the number of times we have used spots from the recycle bin.
-     * <p>
-     *
-     * @return The number of spots used.
-     */
-    protected int getRecyleCount()
-    {
-        return this.recycleCnt;
-    }
-
-    /**
-     * Returns the number of bytes that are free. When an item is removed, its length is recorded.
-     * When a spot is used form the recycle bin, the length of the item stored is recorded.
-     * <p>
-     *
-     * @return The number bytes free on the disk file.
-     */
-    protected long getBytesFree()
-    {
-        return this.bytesFree.get();
-    }
-
-    /**
-     * Resets the number of bytes that are free.
-     */
-    private void resetBytesFree()
-    {
-        this.bytesFree.set(0);
-    }
-
-    /**
-     * To subtract you can pass in false for add..
-     * <p>
-     *
-     * @param ded
-     * @param add
-     */
-    private void adjustBytesFree(final IndexedDiskElementDescriptor ded, final boolean add)
-    {
-        if (ded != null)
-        {
-            final int amount = ded.len + IndexedDisk.HEADER_SIZE_BYTES;
-
-            if (add)
-            {
-                this.bytesFree.addAndGet(amount);
-            }
-            else
-            {
-                this.bytesFree.addAndGet(-amount);
-            }
-        }
-    }
-
-    /**
-     * This is for debugging and testing.
-     * <p>
-     *
-     * @return the length of the data file.
-     * @throws IOException
-     */
-    protected long getDataFileSize() throws IOException
-    {
-        long size = 0;
-
-        storageLock.readLock().lock();
-
-        try
-        {
-            if (dataFile != null)
-            {
-                size = dataFile.length();
-            }
-        }
-        finally
-        {
-            storageLock.readLock().unlock();
-        }
-
-        return size;
-    }
-
-    /**
-     * For debugging. This dumps the values by default.
-     */
-    public void dump()
-    {
-        dump(true);
-    }
-
-    /**
-     * For debugging.
-     * <p>
-     *
-     * @param dumpValues
-     *            A boolean indicating if values should be dumped.
-     */
-    public void dump(final boolean dumpValues)
-    {
-        if (log.isTraceEnabled())
-        {
-            log.trace("{0}: [dump] Number of keys: {1}", logCacheName, keyHash.size());
-
-            for (final Map.Entry<K, IndexedDiskElementDescriptor> e : keyHash.entrySet())
-            {
-                final K key = e.getKey();
-                final IndexedDiskElementDescriptor ded = e.getValue();
-
-                log.trace("{0}: [dump] Disk element, key: {1}, pos: {2}, len: {3}" +
-                        (dumpValues ? ", val: " + get(key) : ""),
-                        logCacheName, key, ded.pos, ded.len);
-            }
-        }
-    }
-
-    /**
-     * @return Returns the AuxiliaryCacheAttributes.
-     */
-    @Override
-    public AuxiliaryCacheAttributes getAuxiliaryCacheAttributes()
-    {
-        return this.cattr;
-    }
-
-    /**
-     * Returns info about the disk cache.
-     * <p>
-     *
-     * @see org.apache.commons.jcs3.auxiliary.AuxiliaryCache#getStatistics()
-     */
-    @Override
-    public synchronized IStats getStatistics()
-    {
-        final IStats stats = new Stats();
-        stats.setTypeName("Indexed Disk Cache");
-
-        final ArrayList<IStatElement<?>> elems = new ArrayList<>();
-
-        elems.add(new StatElement<>("Is Alive", Boolean.valueOf(isAlive())));
-        elems.add(new StatElement<>("Key Map Size", Integer.valueOf(this.keyHash != null ? this.keyHash.size() : -1)));
-        try
-        {
-            elems.add(
-                    new StatElement<>("Data File Length", Long.valueOf(this.dataFile != null ? this.dataFile.length() : -1L)));
-        }
-        catch (final IOException e)
-        {
-            log.error(e);
-        }
-        elems.add(new StatElement<>("Max Key Size", this.maxKeySize));
-        elems.add(new StatElement<>("Hit Count", this.hitCount));
-        elems.add(new StatElement<>("Bytes Free", this.bytesFree));
-        elems.add(new StatElement<>("Optimize Operation Count", Integer.valueOf(this.removeCount)));
-        elems.add(new StatElement<>("Times Optimized", Integer.valueOf(this.timesOptimized)));
-        elems.add(new StatElement<>("Recycle Count", Integer.valueOf(this.recycleCnt)));
-        elems.add(new StatElement<>("Recycle Bin Size", Integer.valueOf(this.recycle.size())));
-        elems.add(new StatElement<>("Startup Size", Integer.valueOf(this.startupSize)));
-
-        // get the stats from the super too
-        final IStats sStats = super.getStatistics();
-        elems.addAll(sStats.getStatElements());
-
-        stats.setStatElements(elems);
-
-        return stats;
-    }
-
-    /**
-     * This is exposed for testing.
-     * <p>
-     *
-     * @return Returns the timesOptimized.
-     */
-    protected int getTimesOptimized()
-    {
-        return timesOptimized;
-    }
-
-    /**
-     * This is used by the event logging.
-     * <p>
-     *
-     * @return the location of the disk, either path or ip.
-     */
-    @Override
-    protected String getDiskLocation()
-    {
-        return dataFile.getFilePath();
-    }
-
-    /**
-     * Compares IndexedDiskElementDescriptor based on their position.
-     * <p>
-     * @deprecated Use lambda instead
-     */
-    @Deprecated
-    protected static final class PositionComparator implements Comparator<IndexedDiskElementDescriptor>, Serializable
-    {
-        /** serialVersionUID */
-        private static final long serialVersionUID = -8387365338590814113L;
-
-        /**
-         * Compares two descriptors based on position.
-         * <p>
-         *
-         * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
-         */
-        @Override
-        public int compare(final IndexedDiskElementDescriptor ded1, final IndexedDiskElementDescriptor ded2)
-        {
-            return Long.compare(ded1.pos, ded2.pos);
-        }
-    }
-
-    /**
-     * Class for recycling and lru. This implements the LRU overflow callback, so we can add items
-     * to the recycle bin. This class counts the size element to decide, when to throw away an element
-     */
-    public class LRUMapSizeLimited extends AbstractLRUMap<K, IndexedDiskElementDescriptor>
-    {
-        /**
-         * <code>tag</code> tells us which map we are working on.
-         */
-        public static final String TAG = "orig";
-
-        // size of the content in kB
-        private final AtomicInteger contentSize;
-        private final int maxSize;
-
-        /**
-         * Default
-         */
-        public LRUMapSizeLimited()
-        {
-            this(-1);
-        }
-
-        /**
-         * @param maxKeySize
-         */
-        public LRUMapSizeLimited(final int maxKeySize)
-        {
-            this.maxSize = maxKeySize;
-            this.contentSize = new AtomicInteger(0);
-        }
-
-        // keep the content size in kB, so 2^31 kB is reasonable value
-        private void subLengthFromCacheSize(final IndexedDiskElementDescriptor value)
-        {
-            contentSize.addAndGet((value.len + IndexedDisk.HEADER_SIZE_BYTES) / -1024 - 1);
-        }
-
-        // keep the content size in kB, so 2^31 kB is reasonable value
-        private void addLengthToCacheSize(final IndexedDiskElementDescriptor value)
-        {
-            contentSize.addAndGet((value.len + IndexedDisk.HEADER_SIZE_BYTES) / 1024 + 1);
-        }
-
-        @Override
-        public IndexedDiskElementDescriptor put(final K key, final IndexedDiskElementDescriptor value)
-        {
-            IndexedDiskElementDescriptor oldValue = null;
-
-            try
-            {
-                oldValue = super.put(key, value);
-            }
-            finally
-            {
-                // keep the content size in kB, so 2^31 kB is reasonable value
-                if (value != null)
-                {
-                    addLengthToCacheSize(value);
-                }
-                if (oldValue != null)
-                {
-                    subLengthFromCacheSize(oldValue);
-                }
-            }
-
-            return oldValue;
-        }
-
-        @Override
-        public IndexedDiskElementDescriptor remove(final Object key)
-        {
-            IndexedDiskElementDescriptor value = null;
-
-            try
-            {
-                value = super.remove(key);
-                return value;
-            }
-            finally
-            {
-                if (value != null)
-                {
-                    subLengthFromCacheSize(value);
-                }
-            }
-        }
-
-        /**
-         * This is called when the may key size is reached. The least recently used item will be
-         * passed here. We will store the position and size of the spot on disk in the recycle bin.
-         * <p>
-         *
-         * @param key
-         * @param value
-         */
-        @Override
-        protected void processRemovedLRU(final K key, final IndexedDiskElementDescriptor value)
-        {
-            if (value != null)
-            {
-                subLengthFromCacheSize(value);
-            }
-
-            addToRecycleBin(value);
-
-            log.debug("{0}: Removing key: [{1}] from key store.", logCacheName, key);
-            log.debug("{0}: Key store size: [{1}].", logCacheName, this.size());
-
-            doOptimizeRealTime();
-        }
-
-        @Override
-        protected boolean shouldRemove()
-        {
-            return maxSize > 0 && contentSize.get() > maxSize && !this.isEmpty();
-        }
-    }
-
-    /**
-     * Class for recycling and lru. This implements the LRU overflow callback, so we can add items
-     * to the recycle bin. This class counts the elements to decide, when to throw away an element
-     */
-
-    public class LRUMapCountLimited extends LRUMap<K, IndexedDiskElementDescriptor>
-    // implements Serializable
-    {
-        public LRUMapCountLimited(final int maxKeySize)
-        {
-            super(maxKeySize);
-        }
-
-        /**
-         * This is called when the may key size is reached. The least recently used item will be
-         * passed here. We will store the position and size of the spot on disk in the recycle bin.
-         * <p>
-         *
-         * @param key
-         * @param value
-         */
-        @Override
-        protected void processRemovedLRU(final K key, final IndexedDiskElementDescriptor value)
-        {
-            addToRecycleBin(value);
-            log.debug("{0}: Removing key: [{1}] from key store.", logCacheName, key);
-            log.debug("{0}: Key store size: [{1}].", logCacheName, this.size());
-
-            doOptimizeRealTime();
-        }
-    }
-}
+package org.apache.commons.jcs3.auxiliary.disk.indexed;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.jcs3.auxiliary.AuxiliaryCacheAttributes;
+import org.apache.commons.jcs3.auxiliary.disk.AbstractDiskCache;
+import org.apache.commons.jcs3.auxiliary.disk.behavior.IDiskCacheAttributes.DiskLimitType;
+import org.apache.commons.jcs3.engine.behavior.ICacheElement;
+import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
+import org.apache.commons.jcs3.engine.control.group.GroupAttrName;
+import org.apache.commons.jcs3.engine.control.group.GroupId;
+import org.apache.commons.jcs3.engine.logging.behavior.ICacheEvent;
+import org.apache.commons.jcs3.engine.logging.behavior.ICacheEventLogger;
+import org.apache.commons.jcs3.engine.stats.StatElement;
+import org.apache.commons.jcs3.engine.stats.Stats;
+import org.apache.commons.jcs3.engine.stats.behavior.IStatElement;
+import org.apache.commons.jcs3.engine.stats.behavior.IStats;
+import org.apache.commons.jcs3.log.Log;
+import org.apache.commons.jcs3.log.LogManager;
+import org.apache.commons.jcs3.utils.struct.AbstractLRUMap;
+import org.apache.commons.jcs3.utils.struct.LRUMap;
+import org.apache.commons.jcs3.utils.timing.ElapsedTimer;
+
+/**
+ * Disk cache that uses a RandomAccessFile with keys stored in memory. The maximum number of keys
+ * stored in memory is configurable. The disk cache tries to recycle spots on disk to limit file
+ * expansion.
+ */
+public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V>
+{
+    /** The logger */
+    private static final Log log = LogManager.getLog(IndexedDiskCache.class);
+
+    /** Cache name used in log messages */
+    protected final String logCacheName;
+
+    /** The name of the file where the data is stored */
+    private final String fileName;
+
+    /** The IndexedDisk manages reads and writes to the data file. */
+    private IndexedDisk dataFile;
+
+    /** The IndexedDisk manages reads and writes to the key file. */
+    private IndexedDisk keyFile;
+
+    /** Map containing the keys and disk offsets. */
+    private final Map<K, IndexedDiskElementDescriptor> keyHash;
+
+    /** The maximum number of keys that we will keep in memory. */
+    private final int maxKeySize;
+
+    /** A handle on the data file. */
+    private File rafDir;
+
+    /** Should we keep adding to the recycle bin. False during optimization. */
+    private boolean doRecycle = true;
+
+    /** Should we optimize real time */
+    private boolean isRealTimeOptimizationEnabled = true;
+
+    /** Should we optimize on shutdown. */
+    private boolean isShutdownOptimizationEnabled = true;
+
+    /** are we currently optimizing the files */
+    private boolean isOptimizing;
+
+    /** The number of times the file has been optimized. */
+    private int timesOptimized;
+
+    /** The thread optimizing the file. */
+    private volatile Thread currentOptimizationThread;
+
+    /** used for counting the number of requests */
+    private int removeCount;
+
+    /** Should we queue puts. True when optimizing. We write the queue post optimization. */
+    private boolean queueInput;
+
+    /** list where puts made during optimization are made */
+    private final ConcurrentSkipListSet<IndexedDiskElementDescriptor> queuedPutList;
+
+    /** RECYCLE BIN -- array of empty spots */
+    private final ConcurrentSkipListSet<IndexedDiskElementDescriptor> recycle;
+
+    /** User configurable parameters */
+    private final IndexedDiskCacheAttributes cattr;
+
+    /** How many slots have we recycled. */
+    private int recycleCnt;
+
+    /** How many items were there on startup. */
+    private int startupSize;
+
+    /** the number of bytes free on disk. */
+    private final AtomicLong bytesFree = new AtomicLong();
+
+    /** mode we are working on (size or count limited **/
+    private DiskLimitType diskLimitType = DiskLimitType.COUNT;
+
+    /** simple stat */
+    private final AtomicInteger hitCount = new AtomicInteger(0);
+
+    /**
+     * Use this lock to synchronize reads and writes to the underlying storage mechanism.
+     */
+    protected ReentrantReadWriteLock storageLock = new ReentrantReadWriteLock();
+
+    /**
+     * Constructor for the DiskCache object.
+     * <p>
+     *
+     * @param cacheAttributes
+     */
+    public IndexedDiskCache(final IndexedDiskCacheAttributes cacheAttributes)
+    {
+        this(cacheAttributes, null);
+    }
+
+    /**
+     * Constructor for the DiskCache object.
+     * <p>
+     *
+     * @param cattr
+     * @param elementSerializer
+     *            used if supplied, the super's super will not set a null
+     */
+    public IndexedDiskCache(final IndexedDiskCacheAttributes cattr, final IElementSerializer elementSerializer)
+    {
+        super(cattr);
+
+        setElementSerializer(elementSerializer);
+
+        this.cattr = cattr;
+        this.maxKeySize = cattr.getMaxKeySize();
+        this.isRealTimeOptimizationEnabled = cattr.getOptimizeAtRemoveCount() > 0;
+        this.isShutdownOptimizationEnabled = cattr.isOptimizeOnShutdown();
+        this.logCacheName = "Region [" + getCacheName() + "] ";
+        this.diskLimitType = cattr.getDiskLimitType();
+        // Make a clean file name
+        this.fileName = getCacheName().replaceAll("[^a-zA-Z0-9-_\\.]", "_");
+        this.keyHash = createInitialKeyMap();
+        this.queuedPutList = new ConcurrentSkipListSet<>(new PositionComparator());
+        this.recycle = new ConcurrentSkipListSet<>();
+
+        try
+        {
+            initializeFileSystem(cattr);
+            initializeKeysAndData(cattr);
+
+            // Initialization finished successfully, so set alive to true.
+            setAlive(true);
+            log.info("{0}: Indexed Disk Cache is alive.", logCacheName);
+
+            // TODO: Should we improve detection of whether or not the file should be optimized.
+            if (isRealTimeOptimizationEnabled && !keyHash.isEmpty())
+            {
+                // Kick off a real time optimization, in case we didn't do a final optimization.
+                doOptimizeRealTime();
+            }
+        }
+        catch (final IOException e)
+        {
+            log.error("{0}: Failure initializing for fileName: {1} and directory: {2}",
+                    logCacheName, fileName, this.rafDir.getAbsolutePath(), e);
+        }
+    }
+
+    /**
+     * Tries to create the root directory if it does not already exist.
+     * <p>
+     *
+     * @param cattr
+     */
+    private void initializeFileSystem(final IndexedDiskCacheAttributes cattr)
+    {
+        this.rafDir = cattr.getDiskPath();
+        log.info("{0}: Cache file root directory: {1}", logCacheName, rafDir);
+    }
+
+    /**
+     * Creates the key and data disk caches.
+     * <p>
+     * Loads any keys if they are present and ClearDiskOnStartup is false.
+     * <p>
+     *
+     * @param cattr
+     * @throws IOException
+     */
+    private void initializeKeysAndData(final IndexedDiskCacheAttributes cattr) throws IOException
+    {
+        this.dataFile = new IndexedDisk(new File(rafDir, fileName + ".data"), getElementSerializer());
+        this.keyFile = new IndexedDisk(new File(rafDir, fileName + ".key"), getElementSerializer());
+
+        if (cattr.isClearDiskOnStartup())
+        {
+            log.info("{0}: ClearDiskOnStartup is set to true. Ignoring any persisted data.",
+                    logCacheName);
+            initializeEmptyStore();
+        }
+        else if (!keyFile.isEmpty())
+        {
+            // If the key file has contents, try to initialize the keys
+            // from it. In no keys are loaded reset the data file.
+            initializeStoreFromPersistedData();
+        }
+        else
+        {
+            // Otherwise start with a new empty map for the keys, and reset
+            // the data file if it has contents.
+            initializeEmptyStore();
+        }
+    }
+
+    /**
+     * Initializes an empty disk cache.
+     * <p>
+     *
+     * @throws IOException
+     */
+    private void initializeEmptyStore() throws IOException
+    {
+        this.keyHash.clear();
+
+        if (!dataFile.isEmpty())
+        {
+            dataFile.reset();
+        }
+    }
+
+    /**
+     * Loads any persisted data and checks for consistency. If there is a consistency issue, the
+     * files are cleared.
+     * <p>
+     *
+     * @throws IOException
+     */
+    private void initializeStoreFromPersistedData() throws IOException
+    {
+        loadKeys();
+
+        if (keyHash.isEmpty())
+        {
+            dataFile.reset();
+        }
+        else
+        {
+            final boolean isOk = checkKeyDataConsistency(false);
+            if (!isOk)
+            {
+                keyHash.clear();
+                keyFile.reset();
+                dataFile.reset();
+                log.warn("{0}: Corruption detected. Resetting data and keys files.", logCacheName);
+            }
+            else
+            {
+                synchronized (this)
+                {
+                    startupSize = keyHash.size();
+                }
+            }
+        }
+    }
+
+    /**
+     * Loads the keys from the .key file. The keys are stored in a HashMap on disk. This is
+     * converted into a LRUMap.
+     */
+    protected void loadKeys()
+    {
+        log.debug("{0}: Loading keys for {1}", () -> logCacheName, () -> keyFile.toString());
+
+        storageLock.writeLock().lock();
+
+        try
+        {
+            // clear a key map to use.
+            keyHash.clear();
+
+            final HashMap<K, IndexedDiskElementDescriptor> keys = keyFile.readObject(
+                new IndexedDiskElementDescriptor(0, (int) keyFile.length() - IndexedDisk.HEADER_SIZE_BYTES));
+
+            if (keys != null)
+            {
+                log.debug("{0}: Found {1} in keys file.", logCacheName, keys.size());
+
+                keyHash.putAll(keys);
+
+                log.info("{0}: Loaded keys from [{1}], key count: {2}; up to {3} will be available.",
+                        () -> logCacheName, () -> fileName, keyHash::size, () -> maxKeySize);
+            }
+
+            if (log.isTraceEnabled())
+            {
+                dump(false);
+            }
+        }
+        catch (final Exception e)
+        {
+            log.error("{0}: Problem loading keys for file {1}", logCacheName, fileName, e);
+        }
+        finally
+        {
+            storageLock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Check for minimal consistency between the keys and the datafile. Makes sure no starting
+     * positions in the keys exceed the file length.
+     * <p>
+     * The caller should take the appropriate action if the keys and data are not consistent.
+     *
+     * @param checkForDedOverlaps
+     *            if <code>true</code>, do a more thorough check by checking for
+     *            data overlap
+     * @return <code>true</code> if the test passes
+     */
+    private boolean checkKeyDataConsistency(final boolean checkForDedOverlaps)
+    {
+        final ElapsedTimer timer = new ElapsedTimer();
+        log.debug("{0}: Performing inital consistency check", logCacheName);
+
+        boolean isOk = true;
+        try
+        {
+            final long fileLength = dataFile.length();
+
+            final IndexedDiskElementDescriptor corruptDed = keyHash.values().stream()
+                .filter(ded -> ded.pos + IndexedDisk.HEADER_SIZE_BYTES + ded.len > fileLength)
+                .findFirst()
+                .orElse(null);
+
+            if (corruptDed != null)
+            {
+                isOk = false;
+                log.warn("{0}: The dataFile is corrupted!\n raf.length() = {1}\n ded.pos = {2}",
+                        logCacheName, fileLength, corruptDed.pos);
+            }
+            else if (checkForDedOverlaps)
+            {
+                isOk = checkForDedOverlaps(createPositionSortedDescriptorList());
+            }
+        }
+        catch (final IOException e)
+        {
+            log.error(e);
+            isOk = false;
+        }
+
+        log.info("{0}: Finished inital consistency check, isOk = {1} in {2}",
+                logCacheName, isOk, timer.getElapsedTimeString());
+
+        return isOk;
+    }
+
+    /**
+     * Detects any overlapping elements. This expects a sorted list.
+     * <p>
+     * The total length of an item is IndexedDisk.RECORD_HEADER + ded.len.
+     * <p>
+     *
+     * @param sortedDescriptors
+     * @return false if there are overlaps.
+     */
+    protected boolean checkForDedOverlaps(final IndexedDiskElementDescriptor[] sortedDescriptors)
+    {
+        final ElapsedTimer timer = new ElapsedTimer();
+        boolean isOk = true;
+        long expectedNextPos = 0;
+        for (final IndexedDiskElementDescriptor ded : sortedDescriptors) {
+            if (expectedNextPos > ded.pos)
+            {
+                log.error("{0}: Corrupt file: overlapping deds {1}", logCacheName, ded);
+                isOk = false;
+                break;
+            }
+            expectedNextPos = ded.pos + IndexedDisk.HEADER_SIZE_BYTES + ded.len;
+        }
+        log.debug("{0}: Check for DED overlaps took {1} ms.", () -> logCacheName,
+                timer::getElapsedTime);
+
+        return isOk;
+    }
+
+    /**
+     * Saves key file to disk. This converts the LRUMap to a HashMap for deserialization.
+     */
+    protected void saveKeys()
+    {
+        try
+        {
+            log.info("{0}: Saving keys to: {1}, key count: {2}",
+                    () -> logCacheName, () -> fileName, keyHash::size);
+
+            keyFile.reset();
+
+            final HashMap<K, IndexedDiskElementDescriptor> keys = new HashMap<>(keyHash);
+            if (!keys.isEmpty())
+            {
+                keyFile.writeObject(keys, 0);
+            }
+
+            log.info("{0}: Finished saving keys.", logCacheName);
+        }
+        catch (final IOException e)
+        {
+            log.error("{0}: Problem storing keys.", logCacheName, e);
+        }
+    }
+
+    /**
+     * Update the disk cache. Called from the Queue. Makes sure the Item has not been retrieved from
+     * purgatory while in queue for disk. Remove items from purgatory when they go to disk.
+     * <p>
+     *
+     * @param ce
+     *            The ICacheElement&lt;K, V&gt; to put to disk.
+     */
+    @Override
+    protected void processUpdate(final ICacheElement<K, V> ce)
+    {
+        if (!isAlive())
+        {
+            log.error("{0}: No longer alive; aborting put of key = {1}",
+                    () -> logCacheName, ce::getKey);
+            return;
+        }
+
+        log.debug("{0}: Storing element on disk, key: {1}",
+                () -> logCacheName, ce::getKey);
+
+        // old element with same key
+        IndexedDiskElementDescriptor old = null;
+
+        try
+        {
+            IndexedDiskElementDescriptor ded = null;
+            final byte[] data = getElementSerializer().serialize(ce);
+
+            // make sure this only locks for one particular cache region
+            storageLock.writeLock().lock();
+            try
+            {
+                old = keyHash.get(ce.getKey());
+
+                // Item with the same key already exists in file.
+                // Try to reuse the location if possible.
+                if (old != null && data.length <= old.len)
+                {
+                    // Reuse the old ded. The defrag relies on ded updates by reference, not
+                    // replacement.
+                    ded = old;
+                    ded.len = data.length;
+                }
+                else
+                {
+                    // we need this to compare in the recycle bin
+                    ded = new IndexedDiskElementDescriptor(dataFile.length(), data.length);
+
+                    if (doRecycle)
+                    {
+                        final IndexedDiskElementDescriptor rep = recycle.ceiling(ded);
+                        if (rep != null)
+                        {
+                            // remove element from recycle bin
+                            recycle.remove(rep);
+                            ded = rep;
+                            ded.len = data.length;
+                            recycleCnt++;
+                            this.adjustBytesFree(ded, false);
+                            log.debug("{0}: using recycled ded {1} rep.len = {2} ded.len = {3}",
+                                    logCacheName, ded.pos, rep.len, ded.len);
+                        }
+                    }
+
+                    // Put it in the map
+                    keyHash.put(ce.getKey(), ded);
+
+                    if (queueInput)
+                    {
+                        queuedPutList.add(ded);
+                        log.debug("{0}: added to queued put list. {1}",
+                                () -> logCacheName, queuedPutList::size);
+                    }
+
+                    // add the old slot to the recycle bin
+                    if (old != null)
+                    {
+                        addToRecycleBin(old);
+                    }
+                }
+
+                dataFile.write(ded, data);
+            }
+            finally
+            {
+                storageLock.writeLock().unlock();
+            }
+
+            log.debug("{0}: Put to file: {1}, key: {2}, position: {3}, size: {4}",
+                    logCacheName, fileName, ce.getKey(), ded.pos, ded.len);
+        }
+        catch (final IOException e)
+        {
+            log.error("{0}: Failure updating element, key: {1} old: {2}",
+                    logCacheName, ce.getKey(), old, e);
+        }
+    }
+
+    /**
+     * Gets the key, then goes to disk to get the object.
+     * <p>
+     *
+     * @param key
+     * @return ICacheElement&lt;K, V&gt; or null
+     * @see AbstractDiskCache#doGet
+     */
+    @Override
+    protected ICacheElement<K, V> processGet(final K key)
+    {
+        if (!isAlive())
+        {
+            log.error("{0}: No longer alive so returning null for key = {1}",
+                    logCacheName, key);
+            return null;
+        }
+
+        log.debug("{0}: Trying to get from disk: {1}", logCacheName, key);
+
+        ICacheElement<K, V> object = null;
+        try
+        {
+            storageLock.readLock().lock();
+            try
+            {
+                object = readElement(key);
+            }
+            finally
+            {
+                storageLock.readLock().unlock();
+            }
+
+            if (object != null)
+            {
+                hitCount.incrementAndGet();
+            }
+        }
+        catch (final IOException ioe)
+        {
+            log.error("{0}: Failure getting from disk, key = {1}", logCacheName, key, ioe);
+            reset();
+        }
+        return object;
+    }
+
+    /**
+     * Gets matching items from the cache.
+     * <p>
+     *
+     * @param pattern
+     * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
+     *         data in cache matching keys
+     */
+    @Override
+    public Map<K, ICacheElement<K, V>> processGetMatching(final String pattern)
+    {
+        final Map<K, ICacheElement<K, V>> elements = new HashMap<>();
+        Set<K> keyArray = null;
+        storageLock.readLock().lock();
+        try
+        {
+            keyArray = new HashSet<>(keyHash.keySet());
+        }
+        finally
+        {
+            storageLock.readLock().unlock();
+        }
+
+        final Set<K> matchingKeys = getKeyMatcher().getMatchingKeysFromArray(pattern, keyArray);
+
+        for (final K key : matchingKeys)
+        {
+            final ICacheElement<K, V> element = processGet(key);
+            if (element != null)
+            {
+                elements.put(key, element);
+            }
+        }
+        return elements;
+    }
+
+    /**
+     * Reads the item from disk.
+     * <p>
+     *
+     * @param key
+     * @return ICacheElement
+     * @throws IOException
+     */
+    private ICacheElement<K, V> readElement(final K key) throws IOException
+    {
+        final IndexedDiskElementDescriptor ded = keyHash.get(key);
+
+        if (ded != null)
+        {
+            log.debug("{0}: Found on disk, key: ", logCacheName, key);
+
+            try
+            {
+                return dataFile.readObject(ded);
+                // TODO consider checking key equality and throwing if there is a failure
+            }
+            catch (final IOException e)
+            {
+                log.error("{0}: IO Exception, Problem reading object from file", logCacheName, e);
+                throw e;
+            }
+            catch (final Exception e)
+            {
+                log.error("{0}: Exception, Problem reading object from file", logCacheName, e);
+                throw new IOException(logCacheName + "Problem reading object from disk.", e);
+            }
+        }
+
+        return null;
+    }
+
+    /**
+     * Return the keys in this cache.
+     * <p>
+     *
+     * @see org.apache.commons.jcs3.auxiliary.disk.AbstractDiskCache#getKeySet()
+     */
+    @Override
+    public Set<K> getKeySet() throws IOException
+    {
+        final HashSet<K> keys = new HashSet<>();
+
+        storageLock.readLock().lock();
+
+        try
+        {
+            keys.addAll(this.keyHash.keySet());
+        }
+        finally
+        {
+            storageLock.readLock().unlock();
+        }
+
+        return keys;
+    }
+
+    /**
+     * Returns true if the removal was successful; or false if there is nothing to remove. Current
+     * implementation always result in a disk orphan.
+     * <p>
+     *
+     * @return true if at least one item was removed.
+     * @param key
+     */
+    @Override
+    protected boolean processRemove(final K key)
+    {
+        if (!isAlive())
+        {
+            log.error("{0}: No longer alive so returning false for key = {1}", logCacheName, key);
+            return false;
+        }
+
+        if (key == null)
+        {
+            return false;
+        }
+
+        boolean removed = false;
+        try
+        {
+            storageLock.writeLock().lock();
+
+            if (key instanceof String && key.toString().endsWith(NAME_COMPONENT_DELIMITER))
+            {
+                removed = performPartialKeyRemoval((String) key);
+            }
+            else if (key instanceof GroupAttrName && ((GroupAttrName<?>) key).attrName == null)
+            {
+                removed = performGroupRemoval(((GroupAttrName<?>) key).groupId);
+            }
+            else
+            {
+                removed = performSingleKeyRemoval(key);
+            }
+        }
+        finally
+        {
+            storageLock.writeLock().unlock();
+        }
+
+        // this increments the remove count.
+        // there is no reason to call this if an item was not removed.
+        if (removed)
+        {
+            doOptimizeRealTime();
+        }
+
+        return removed;
+    }
+
+    /**
+     * Iterates over the keyset. Builds a list of matches. Removes all the keys in the list. Does
+     * not remove via the iterator, since the map impl may not support it.
+     * <p>
+     * This operates under a lock obtained in doRemove().
+     * <p>
+     *
+     * @param key
+     * @return true if there was a match
+     */
+    private boolean performPartialKeyRemoval(final String key)
+    {
+        boolean removed = false;
+
+        // remove all keys of the same name hierarchy.
+        final List<K> itemsToRemove = new LinkedList<>();
+
+        for (final K k : keyHash.keySet())
+        {
+            if (k instanceof String && k.toString().startsWith(key))
+            {
+                itemsToRemove.add(k);
+            }
+        }
+
+        // remove matches.
+        for (final K fullKey : itemsToRemove)
+        {
+            // Don't add to recycle bin here
+            // https://issues.apache.org/jira/browse/JCS-67
+            performSingleKeyRemoval(fullKey);
+            removed = true;
+            // TODO this needs to update the remove count separately
+        }
+
+        return removed;
+    }
+
+    /**
+     * Remove all elements from the group. This does not use the iterator to remove. It builds a
+     * list of group elements and then removes them one by one.
+     * <p>
+     * This operates under a lock obtained in doRemove().
+     * <p>
+     *
+     * @param key
+     * @return true if an element was removed
+     */
+    private boolean performGroupRemoval(final GroupId key)
+    {
+        boolean removed = false;
+
+        // remove all keys of the same name group.
+        final List<K> itemsToRemove = new LinkedList<>();
+
+        // remove all keys of the same name hierarchy.
+        for (final K k : keyHash.keySet())
+        {
+            if (k instanceof GroupAttrName && ((GroupAttrName<?>) k).groupId.equals(key))
+            {
+                itemsToRemove.add(k);
+            }
+        }
+
+        // remove matches.
+        for (final K fullKey : itemsToRemove)
+        {
+            // Don't add to recycle bin here
+            // https://issues.apache.org/jira/browse/JCS-67
+            performSingleKeyRemoval(fullKey);
+            removed = true;
+            // TODO this needs to update the remove count separately
+        }
+
+        return removed;
+    }
+
+    /**
+     * Removes an individual key from the cache.
+     * <p>
+     * This operates under a lock obtained in doRemove().
+     * <p>
+     *
+     * @param key
+     * @return true if an item was removed.
+     */
+    private boolean performSingleKeyRemoval(final K key)
+    {
+        final boolean removed;
+        // remove single item.
+        final IndexedDiskElementDescriptor ded = keyHash.remove(key);
+        removed = ded != null;
+        addToRecycleBin(ded);
+
+        log.debug("{0}: Disk removal: Removed from key hash, key [{1}] removed = {2}",
+                logCacheName, key, removed);
+        return removed;
+    }
+
+    /**
+     * Remove all the items from the disk cache by resetting everything.
+     */
+    @Override
+    public void processRemoveAll()
+    {
+        final ICacheEvent<String> cacheEvent =
+                createICacheEvent(getCacheName(), "all", ICacheEventLogger.REMOVEALL_EVENT);
+        try
+        {
+            reset();
+        }
+        finally
+        {
+            logICacheEvent(cacheEvent);
+        }
+    }
+
+    /**
+     * Reset effectively clears the disk cache, creating new files, recycle bins, and keymaps.
+     * <p>
+     * It can be used to handle errors by last resort, force content update, or removeall.
+     */
+    private void reset()
+    {
+        log.info("{0}: Resetting cache", logCacheName);
+
+        try
+        {
+            storageLock.writeLock().lock();
+
+            if (dataFile != null)
+            {
+                dataFile.close();
+            }
+
+            final File dataFileTemp = new File(rafDir, fileName + ".data");
+            Files.delete(dataFileTemp.toPath());
+
+            if (keyFile != null)
+            {
+                keyFile.close();
+            }
+            final File keyFileTemp = new File(rafDir, fileName + ".key");
+            Files.delete(keyFileTemp.toPath());
+
+            dataFile = new IndexedDisk(dataFileTemp, getElementSerializer());
+            keyFile = new IndexedDisk(keyFileTemp, getElementSerializer());
+
+            this.recycle.clear();
+            this.keyHash.clear();
+        }
+        catch (final IOException e)
+        {
+            log.error("{0}: Failure resetting state", logCacheName, e);
+        }
+        finally
+        {
+            storageLock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Create the map for keys that contain the index position on disk.
+     *
+     * @return a new empty Map for keys and IndexedDiskElementDescriptors
+     */
+    private Map<K, IndexedDiskElementDescriptor> createInitialKeyMap()
+    {
+        Map<K, IndexedDiskElementDescriptor> keyMap = null;
+        if (maxKeySize >= 0)
+        {
+            if (this.diskLimitType == DiskLimitType.COUNT)
+            {
+                keyMap = new LRUMapCountLimited(maxKeySize);
+            }
+            else
+            {
+                keyMap = new LRUMapSizeLimited(maxKeySize);
+            }
+
+            log.info("{0}: Set maxKeySize to: \"{1}\"", logCacheName, maxKeySize);
+        }
+        else
+        {
+            // If no max size, use a plain map for memory and processing efficiency.
+            keyMap = new HashMap<>();
+            // keyHash = Collections.synchronizedMap( new HashMap() );
+            log.info("{0}: Set maxKeySize to unlimited", logCacheName);
+        }
+
+        return keyMap;
+    }
+
+    /**
+     * Dispose of the disk cache in a background thread. Joins against this thread to put a cap on
+     * the disposal time.
+     * <p>
+     * TODO make dispose window configurable.
+     */
+    @Override
+    public void processDispose()
+    {
+        final ICacheEvent<String> cacheEvent = createICacheEvent(getCacheName(), "none", ICacheEventLogger.DISPOSE_EVENT);
+        try
+        {
+            final Thread t = new Thread(this::disposeInternal, "IndexedDiskCache-DisposalThread");
+            t.start();
+            // wait up to 60 seconds for dispose and then quit if not done.
+            try
+            {
+                t.join(60 * 1000);
+            }
+            catch (final InterruptedException ex)
+            {
+                log.error("{0}: Interrupted while waiting for disposal thread to finish.",
+                        logCacheName, ex);
+            }
+        }
+        finally
+        {
+            logICacheEvent(cacheEvent);
+        }
+    }
+
+    /**
+     * Internal method that handles the disposal.
+     */
+    protected void disposeInternal()
+    {
+        if (!isAlive())
+        {
+            log.error("{0}: Not alive and dispose was called, filename: {1}",
+                    logCacheName, fileName);
+            return;
+        }
+
+        // Prevents any interaction with the cache while we're shutting down.
+        setAlive(false);
+
+        final Thread optimizationThread = currentOptimizationThread;
+        if (isRealTimeOptimizationEnabled && optimizationThread != null)
+        {
+            // Join with the current optimization thread.
+            log.debug("{0}: In dispose, optimization already in progress; waiting for completion.",
+                    logCacheName);
+
+            try
+            {
+                optimizationThread.join();
+            }
+            catch (final InterruptedException e)
+            {
+                log.error("{0}: Unable to join current optimization thread.",
+                        logCacheName, e);
+            }
+        }
+        else if (isShutdownOptimizationEnabled && this.getBytesFree() > 0)
+        {
+            optimizeFile();
+        }
+
+        saveKeys();
+
+        try
+        {
+            log.debug("{0}: Closing files, base filename: {1}", logCacheName,
+                    fileName);
+            dataFile.close();
+            dataFile = null;
+            keyFile.close();
+            keyFile = null;
+        }
+        catch (final IOException e)
+        {
+            log.error("{0}: Failure closing files in dispose, filename: {1}",
+                    logCacheName, fileName, e);
+        }
+
+        log.info("{0}: Shutdown complete.", logCacheName);
+    }
+
+    /**
+     * Add descriptor to recycle bin if it is not null. Adds the length of the item to the bytes
+     * free.
+     * <p>
+     * This is called in three places: (1) When an item is removed. All item removals funnel down to the removeSingleItem method.
+     * (2) When an item on disk is updated with a value that will not fit in the previous slot. (3) When the max key size is
+     * reached, the freed slot will be added.
+     * <p>
+     *
+     * @param ded
+     */
+    protected void addToRecycleBin(final IndexedDiskElementDescriptor ded)
+    {
+        // reuse the spot
+        if (ded != null)
+        {
+            storageLock.readLock().lock();
+
+            try
+            {
+                adjustBytesFree(ded, true);
+
+                if (doRecycle)
+                {
+                    recycle.add(ded);
+                    log.debug("{0}: recycled ded {1}", logCacheName, ded);
+                }
+            }
+            finally
+            {
+                storageLock.readLock().unlock();
+            }
+        }
+    }
+
+    /**
+     * Performs the check for optimization, and if it is required, do it.
+     */
+    protected void doOptimizeRealTime()
+    {
+        if (isRealTimeOptimizationEnabled && !isOptimizing
+            && removeCount++ >= cattr.getOptimizeAtRemoveCount())
+        {
+            isOptimizing = true;
+
+            log.info("{0}: Optimizing file. removeCount [{1}] OptimizeAtRemoveCount [{2}]",
+                    logCacheName, removeCount, cattr.getOptimizeAtRemoveCount());
+
+            if (currentOptimizationThread == null)
+            {
+                storageLock.writeLock().lock();
+
+                try
+                {
+                    if (currentOptimizationThread == null)
+                    {
+                        currentOptimizationThread = new Thread(() -> {
+                            optimizeFile();
+                            currentOptimizationThread = null;
+                        }, "IndexedDiskCache-OptimizationThread");
+                    }
+                }
+                finally
+                {
+                    storageLock.writeLock().unlock();
+                }
+
+                if (currentOptimizationThread != null)
+                {
+                    currentOptimizationThread.start();
+                }
+            }
+        }
+    }
+
+    /**
+     * File optimization is handled by this method. It works as follows:
+     * <ol>
+     * <li>Shutdown recycling and turn on queuing of puts.</li>
+     * <li>Take a snapshot of the current descriptors. If there are any removes, ignore them, as they will be compacted during the
+     * next optimization.</li>
+     * <li>Optimize the snapshot. For each descriptor:
+     * <ol>
+     * <li>Obtain the write-lock.</li>
+     * <li>Shift the element on the disk, in order to compact out the free space.</li>
+     * <li>Release the write-lock. This allows elements to still be accessible during optimization.</li>
+     * </ol>
+     * </li>
+     * <li>Obtain the write-lock.</li>
+     * <li>All queued puts are made at the end of the file. Optimize these under a single write-lock.</li>
+     * <li>Truncate the file.</li>
+     * <li>Release the write-lock.</li>
+     * <li>Restore system to standard operation.</li>
+     * </ol>
+     */
+    protected void optimizeFile()
+    {
+        final ElapsedTimer timer = new ElapsedTimer();
+        timesOptimized++;
+        log.info("{0}: Beginning Optimization #{1}", logCacheName, timesOptimized);
+
+        // CREATE SNAPSHOT
+        IndexedDiskElementDescriptor[] defragList = null;
+
+        storageLock.writeLock().lock();
+
+        try
+        {
+            queueInput = true;
+            // shut off recycle while we're optimizing,
+            doRecycle = false;
+            defragList = createPositionSortedDescriptorList();
+        }
+        finally
+        {
+            storageLock.writeLock().unlock();
+        }
+
+        // Defrag the file outside of the write lock. This allows a move to be made,
+        // and yet have the element still accessible for reading or writing.
+        long expectedNextPos = defragFile(defragList, 0);
+
+        // ADD THE QUEUED ITEMS to the end and then truncate
+        storageLock.writeLock().lock();
+
+        try
+        {
+            try
+            {
+                if (!queuedPutList.isEmpty())
+                {
+                    defragList = queuedPutList.toArray(new IndexedDiskElementDescriptor[queuedPutList.size()]);
+
+                    // pack them at the end
+                    expectedNextPos = defragFile(defragList, expectedNextPos);
+                }
+                // TRUNCATE THE FILE
+                dataFile.truncate(expectedNextPos);
+            }
+            catch (final IOException e)
+            {
+                log.error("{0}: Error optimizing queued puts.", logCacheName, e);
+            }
+
+            // RESTORE NORMAL OPERATION
+            removeCount = 0;
+            resetBytesFree();
+            this.recycle.clear();
+            queuedPutList.clear();
+            queueInput = false;
+            // turn recycle back on.
+            doRecycle = true;
+            isOptimizing = false;
+        }
+        finally
+        {
+            storageLock.writeLock().unlock();
+        }
+
+        log.info("{0}: Finished #{1}, Optimization took {2}",
+                logCacheName, timesOptimized, timer.getElapsedTimeString());
+    }
+
+    /**
+     * Defragments the file in place by compacting out the free space (i.e., moving records
+     * forward). If there were no gaps the resulting file would be the same size as the previous
+     * file. This must be supplied an ordered defragList.
+     * <p>
+     *
+     * @param defragList
+     *            sorted list of descriptors for optimization
+     * @param startingPos
+     *            the start position in the file
+     * @return this is the potential new file end
+     */
+    private long defragFile(final IndexedDiskElementDescriptor[] defragList, final long startingPos)
+    {
+        final ElapsedTimer timer = new ElapsedTimer();
+        long preFileSize = 0;
+        long postFileSize = 0;
+        long expectedNextPos = 0;
+        try
+        {
+            preFileSize = this.dataFile.length();
+            // find the first gap in the disk and start defragging.
+            expectedNextPos = startingPos;
+            for (final IndexedDiskElementDescriptor element : defragList) {
+                storageLock.writeLock().lock();
+                try
+                {
+                    if (expectedNextPos != element.pos)
+                    {
+                        dataFile.move(element, expectedNextPos);
+                    }
+                    expectedNextPos = element.pos + IndexedDisk.HEADER_SIZE_BYTES + element.len;
+                }
+                finally
+                {
+                    storageLock.writeLock().unlock();
+                }
+            }
+
+            postFileSize = this.dataFile.length();
+
+            // this is the potential new file end
+            return expectedNextPos;
+        }
+        catch (final IOException e)
+        {
+            log.error("{0}: Error occurred during defragmentation.", logCacheName, e);
+        }
+        finally
+        {
+            log.info("{0}: Defragmentation took {1}. File Size (before={2}) (after={3}) (truncating to {4})",
+                    logCacheName, timer.getElapsedTimeString(), preFileSize, postFileSize, expectedNextPos);
+        }
+
+        return 0;
+    }
+
+    /**
+     * Creates a snapshot of the IndexedDiskElementDescriptors in the keyHash and returns them
+     * sorted by position in the dataFile.
+     * <p>
+     *
+     * @return IndexedDiskElementDescriptor[]
+     */
+    private IndexedDiskElementDescriptor[] createPositionSortedDescriptorList()
+    {
+        final List<IndexedDiskElementDescriptor> defragList = new ArrayList<>(keyHash.values());
+        Collections.sort(defragList, (ded1, ded2) -> Long.compare(ded1.pos, ded2.pos));
+
+        return defragList.toArray(new IndexedDiskElementDescriptor[0]);
+    }
+
+    /**
+     * Returns the current cache size.
+     * <p>
+     *
+     * @return The size value
+     */
+    @Override
+    public int getSize()
+    {
+        return keyHash.size();
+    }
+
+    /**
+     * Returns the size of the recycle bin in number of elements.
+     * <p>
+     *
+     * @return The number of items in the bin.
+     */
+    protected int getRecyleBinSize()
+    {
+        return this.recycle.size();
+    }
+
+    /**
+     * Returns the number of times we have used spots from the recycle bin.
+     * <p>
+     *
+     * @return The number of spots used.
+     */
+    protected int getRecyleCount()
+    {
+        return this.recycleCnt;
+    }
+
+    /**
+     * Returns the number of bytes that are free. When an item is removed, its length is recorded.
+     * When a spot is used form the recycle bin, the length of the item stored is recorded.
+     * <p>
+     *
+     * @return The number bytes free on the disk file.
+     */
+    protected long getBytesFree()
+    {
+        return this.bytesFree.get();
+    }
+
+    /**
+     * Resets the number of bytes that are free.
+     */
+    private void resetBytesFree()
+    {
+        this.bytesFree.set(0);
+    }
+
+    /**
+     * To subtract you can pass in false for add..
+     * <p>
+     *
+     * @param ded
+     * @param add
+     */
+    private void adjustBytesFree(final IndexedDiskElementDescriptor ded, final boolean add)
+    {
+        if (ded != null)
+        {
+            final int amount = ded.len + IndexedDisk.HEADER_SIZE_BYTES;
+
+            if (add)
+            {
+                this.bytesFree.addAndGet(amount);
+            }
+            else
+            {
+                this.bytesFree.addAndGet(-amount);
+            }
+        }
+    }
+
+    /**
+     * This is for debugging and testing.
+     * <p>
+     *
+     * @return the length of the data file.
+     * @throws IOException
+     */
+    protected long getDataFileSize() throws IOException
+    {
+        long size = 0;
+
+        storageLock.readLock().lock();
+
+        try
+        {
+            if (dataFile != null)
+            {
+                size = dataFile.length();
+            }
+        }
+        finally
+        {
+            storageLock.readLock().unlock();
+        }
+
+        return size;
+    }
+
+    /**
+     * For debugging. This dumps the values by default.
+     */
+    public void dump()
+    {
+        dump(true);
+    }
+
+    /**
+     * For debugging.
+     * <p>
+     *
+     * @param dumpValues
+     *            A boolean indicating if values should be dumped.
+     */
+    public void dump(final boolean dumpValues)
+    {
+        if (log.isTraceEnabled())
+        {
+            log.trace("{0}: [dump] Number of keys: {1}", logCacheName, keyHash.size());
+
+            for (final Map.Entry<K, IndexedDiskElementDescriptor> e : keyHash.entrySet())
+            {
+                final K key = e.getKey();
+                final IndexedDiskElementDescriptor ded = e.getValue();
+
+                log.trace("{0}: [dump] Disk element, key: {1}, pos: {2}, len: {3}" +
+                        (dumpValues ? ", val: " + get(key) : ""),
+                        logCacheName, key, ded.pos, ded.len);
+            }
+        }
+    }
+
+    /**
+     * @return Returns the AuxiliaryCacheAttributes.
+     */
+    @Override
+    public AuxiliaryCacheAttributes getAuxiliaryCacheAttributes()
+    {
+        return this.cattr;
+    }
+
+    /**
+     * Returns info about the disk cache.
+     * <p>
+     *
+     * @see org.apache.commons.jcs3.auxiliary.AuxiliaryCache#getStatistics()
+     */
+    @Override
+    public synchronized IStats getStatistics()
+    {
+        final IStats stats = new Stats();
+        stats.setTypeName("Indexed Disk Cache");
+
+        final ArrayList<IStatElement<?>> elems = new ArrayList<>();
+
+        elems.add(new StatElement<>("Is Alive", Boolean.valueOf(isAlive())));
+        elems.add(new StatElement<>("Key Map Size", Integer.valueOf(this.keyHash != null ? this.keyHash.size() : -1)));
+        try
+        {
+            elems.add(
+                    new StatElement<>("Data File Length", Long.valueOf(this.dataFile != null ? this.dataFile.length() : -1L)));
+        }
+        catch (final IOException e)
+        {
+            log.error(e);
+        }
+        elems.add(new StatElement<>("Max Key Size", this.maxKeySize));
+        elems.add(new StatElement<>("Hit Count", this.hitCount));
+        elems.add(new StatElement<>("Bytes Free", this.bytesFree));
+        elems.add(new StatElement<>("Optimize Operation Count", Integer.valueOf(this.removeCount)));
+        elems.add(new StatElement<>("Times Optimized", Integer.valueOf(this.timesOptimized)));
+        elems.add(new StatElement<>("Recycle Count", Integer.valueOf(this.recycleCnt)));
+        elems.add(new StatElement<>("Recycle Bin Size", Integer.valueOf(this.recycle.size())));
+        elems.add(new StatElement<>("Startup Size", Integer.valueOf(this.startupSize)));
+
+        // get the stats from the super too
+        final IStats sStats = super.getStatistics();
+        elems.addAll(sStats.getStatElements());
+
+        stats.setStatElements(elems);
+
+        return stats;
+    }
+
+    /**
+     * This is exposed for testing.
+     * <p>
+     *
+     * @return Returns the timesOptimized.
+     */
+    protected int getTimesOptimized()
+    {
+        return timesOptimized;
+    }
+
+    /**
+     * This is used by the event logging.
+     * <p>
+     *
+     * @return the location of the disk, either path or ip.
+     */
+    @Override
+    protected String getDiskLocation()
+    {
+        return dataFile.getFilePath();
+    }
+
+    /**
+     * Compares IndexedDiskElementDescriptor based on their position.
+     * <p>
+     * @deprecated Use lambda instead
+     */
+    @Deprecated
+    protected static final class PositionComparator implements Comparator<IndexedDiskElementDescriptor>, Serializable
+    {
+        /** serialVersionUID */
+        private static final long serialVersionUID = -8387365338590814113L;
+
+        /**
+         * Compares two descriptors based on position.
+         * <p>
+         *
+         * @see java.util.Comparator#compare(Object, Object)
+         */
+        @Override
+        public int compare(final IndexedDiskElementDescriptor ded1, final IndexedDiskElementDescriptor ded2)
+        {
+            return Long.compare(ded1.pos, ded2.pos);
+        }
+    }
+
+    /**
+     * Class for recycling and lru. This implements the LRU overflow callback, so we can add items
+     * to the recycle bin. This class counts the size element to decide, when to throw away an element
+     */
+    public class LRUMapSizeLimited extends AbstractLRUMap<K, IndexedDiskElementDescriptor>
+    {
+        /**
+         * <code>tag</code> tells us which map we are working on.
+         */
+        public static final String TAG = "orig";
+
+        // size of the content in kB
+        private final AtomicInteger contentSize;
+        private final int maxSize;
+
+        /**
+         * Default
+         */
+        public LRUMapSizeLimited()
+        {
+            this(-1);
+        }
+
+        /**
+         * @param maxKeySize
+         */
+        public LRUMapSizeLimited(final int maxKeySize)
+        {
+            this.maxSize = maxKeySize;
+            this.contentSize = new AtomicInteger(0);
+        }
+
+        // keep the content size in kB, so 2^31 kB is reasonable value
+        private void subLengthFromCacheSize(final IndexedDiskElementDescriptor value)
+        {
+            contentSize.addAndGet((value.len + IndexedDisk.HEADER_SIZE_BYTES) / -1024 - 1);
+        }
+
+        // keep the content size in kB, so 2^31 kB is reasonable value
+        private void addLengthToCacheSize(final IndexedDiskElementDescriptor value)
+        {
+            contentSize.addAndGet((value.len + IndexedDisk.HEADER_SIZE_BYTES) / 1024 + 1);
+        }
+
+        @Override
+        public IndexedDiskElementDescriptor put(final K key, final IndexedDiskElementDescriptor value)
+        {
+            IndexedDiskElementDescriptor oldValue = null;
+
+            try
+            {
+                oldValue = super.put(key, value);
+            }
+            finally
+            {
+                // keep the content size in kB, so 2^31 kB is reasonable value
+                if (value != null)
+                {
+                    addLengthToCacheSize(value);
+                }
+                if (oldValue != null)
+                {
+                    subLengthFromCacheSize(oldValue);
+                }
+            }
+
+            return oldValue;
+        }
+
+        @Override
+        public IndexedDiskElementDescriptor remove(final Object key)
+        {
+            IndexedDiskElementDescriptor value = null;
+
+            try
+            {
+                value = super.remove(key);
+                return value;
+            }
+            finally
+            {
+                if (value != null)
+                {
+                    subLengthFromCacheSize(value);
+                }
+            }
+        }
+
+        /**
+         * This is called when the may key size is reached. The least recently used item will be
+         * passed here. We will store the position and size of the spot on disk in the recycle bin.
+         * <p>
+         *
+         * @param key
+         * @param value
+         */
+        @Override
+        protected void processRemovedLRU(final K key, final IndexedDiskElementDescriptor value)
+        {
+            if (value != null)
+            {
+                subLengthFromCacheSize(value);
+            }
+
+            addToRecycleBin(value);
+
+            log.debug("{0}: Removing key: [{1}] from key store.", logCacheName, key);
+            log.debug("{0}: Key store size: [{1}].", logCacheName, this.size());
+
+            doOptimizeRealTime();
+        }
+
+        @Override
+        protected boolean shouldRemove()
+        {
+            return maxSize > 0 && contentSize.get() > maxSize && !this.isEmpty();
+        }
+    }
+
+    /**
+     * Class for recycling and lru. This implements the LRU overflow callback, so we can add items
+     * to the recycle bin. This class counts the elements to decide, when to throw away an element
+     */
+
+    public class LRUMapCountLimited extends LRUMap<K, IndexedDiskElementDescriptor>
+    // implements Serializable
+    {
+        public LRUMapCountLimited(final int maxKeySize)
+        {
+            super(maxKeySize);
+        }
+
+        /**
+         * This is called when the may key size is reached. The least recently used item will be
+         * passed here. We will store the position and size of the spot on disk in the recycle bin.
+         * <p>
+         *
+         * @param key
+         * @param value
+         */
+        @Override
+        protected void processRemovedLRU(final K key, final IndexedDiskElementDescriptor value)
+        {
+            addToRecycleBin(value);
+            log.debug("{0}: Removing key: [{1}] from key store.", logCacheName, key);
+            log.debug("{0}: Key store size: [{1}].", logCacheName, this.size());
+
+            doOptimizeRealTime();
+        }
+    }
+}
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/disk/indexed/IndexedDiskElementDescriptor.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/disk/indexed/IndexedDiskElementDescriptor.java
index b74e50fb..c2fdd7e2 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/disk/indexed/IndexedDiskElementDescriptor.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/disk/indexed/IndexedDiskElementDescriptor.java
@@ -1,116 +1,116 @@
-package org.apache.commons.jcs3.auxiliary.disk.indexed;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.Serializable;
-
-/**
- * Disk objects are located by descriptor entries. These are saved on shutdown and loaded into
- * memory on startup.
- */
-public class IndexedDiskElementDescriptor
-    implements Serializable, Comparable<IndexedDiskElementDescriptor>
-{
-    /** Don't change */
-    private static final long serialVersionUID = -3029163572847659450L;
-
-    /** Position of the cache data entry on disk. */
-    long pos;
-
-    /** Number of bytes the serialized form of the cache data takes. */
-    int len;
-
-    /**
-     * Constructs a usable disk element descriptor.
-     * <p>
-     * @param pos
-     * @param len
-     */
-    public IndexedDiskElementDescriptor( final long pos, final int len )
-    {
-        this.pos = pos;
-        this.len = len;
-    }
-
-    /**
-     * @return debug string
-     */
-    @Override
-    public String toString()
-    {
-        final StringBuilder buf = new StringBuilder();
-        buf.append( "[DED: " );
-        buf.append( " pos = " + pos );
-        buf.append( " len = " + len );
-        buf.append( "]" );
-        return buf.toString();
-    }
-
-    /**
-     * @see java.lang.Object#hashCode()
-     */
-    @Override
-    public int hashCode()
-    {
-        return Long.valueOf(this.pos).hashCode() ^ Integer.valueOf(len).hashCode();
-    }
-
-    /**
-     * @see java.lang.Object#equals(java.lang.Object)
-     */
-    @Override
-    public boolean equals(final Object o)
-    {
-    	if (o == null)
-    	{
-    		return false;
-    	}
-        if (o instanceof IndexedDiskElementDescriptor)
-        {
-    		final IndexedDiskElementDescriptor ided = (IndexedDiskElementDescriptor)o;
-            return pos == ided.pos && len == ided.len;
-        }
-
-        return false;
-    }
-
-    /**
-     * Compares based on length, then on pos descending.
-     * <p>
-     * @param o Object
-     * @return int
-     */
-    @Override
-    public int compareTo( final IndexedDiskElementDescriptor o )
-    {
-        if ( o == null )
-        {
-            return 1;
-        }
-
-        int lenCompare = Integer.compare(len, o.len);
-        if (lenCompare == 0)
-        {
-            return Long.compare(o.pos, pos);
-        }
-
-        return lenCompare;
-    }
-}
+package org.apache.commons.jcs3.auxiliary.disk.indexed;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.Serializable;
+
+/**
+ * Disk objects are located by descriptor entries. These are saved on shutdown and loaded into
+ * memory on startup.
+ */
+public class IndexedDiskElementDescriptor
+    implements Serializable, Comparable<IndexedDiskElementDescriptor>
+{
+    /** Don't change */
+    private static final long serialVersionUID = -3029163572847659450L;
+
+    /** Position of the cache data entry on disk. */
+    long pos;
+
+    /** Number of bytes the serialized form of the cache data takes. */
+    int len;
+
+    /**
+     * Constructs a usable disk element descriptor.
+     * <p>
+     * @param pos
+     * @param len
+     */
+    public IndexedDiskElementDescriptor( final long pos, final int len )
+    {
+        this.pos = pos;
+        this.len = len;
+    }
+
+    /**
+     * @return debug string
+     */
+    @Override
+    public String toString()
+    {
+        final StringBuilder buf = new StringBuilder();
+        buf.append( "[DED: " );
+        buf.append( " pos = " + pos );
+        buf.append( " len = " + len );
+        buf.append( "]" );
+        return buf.toString();
+    }
+
+    /**
+     * @see Object#hashCode()
+     */
+    @Override
+    public int hashCode()
+    {
+        return Long.valueOf(this.pos).hashCode() ^ Integer.valueOf(len).hashCode();
+    }
+
+    /**
+     * @see Object#equals(Object)
+     */
+    @Override
+    public boolean equals(final Object o)
+    {
+    	if (o == null)
+    	{
+    		return false;
+    	}
+        if (o instanceof IndexedDiskElementDescriptor)
+        {
+    		final IndexedDiskElementDescriptor ided = (IndexedDiskElementDescriptor)o;
+            return pos == ided.pos && len == ided.len;
+        }
+
+        return false;
+    }
+
+    /**
+     * Compares based on length, then on pos descending.
+     * <p>
+     * @param o Object
+     * @return int
+     */
+    @Override
+    public int compareTo( final IndexedDiskElementDescriptor o )
+    {
+        if ( o == null )
+        {
+            return 1;
+        }
+
+        int lenCompare = Integer.compare(len, o.len);
+        if (lenCompare == 0)
+        {
+            return Long.compare(o.pos, pos);
+        }
+
+        return lenCompare;
+    }
+}
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPListener.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPListener.java
index 4d1154da..8ac7de9e 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPListener.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPListener.java
@@ -1,792 +1,792 @@
-package org.apache.commons.jcs3.auxiliary.lateral.socket.tcp;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.jcs3.auxiliary.lateral.LateralElementDescriptor;
-import org.apache.commons.jcs3.auxiliary.lateral.behavior.ILateralCacheListener;
-import org.apache.commons.jcs3.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
-import org.apache.commons.jcs3.engine.CacheInfo;
-import org.apache.commons.jcs3.engine.behavior.ICacheElement;
-import org.apache.commons.jcs3.engine.behavior.ICompositeCacheManager;
-import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
-import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
-import org.apache.commons.jcs3.engine.control.CompositeCache;
-import org.apache.commons.jcs3.log.Log;
-import org.apache.commons.jcs3.log.LogManager;
-import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
-
-/**
- * Listens for connections from other TCP lateral caches and handles them. The initialization method
- * starts a listening thread, which creates a socket server. When messages are received they are
- * passed to a pooled executor which then calls the appropriate handle method.
- */
-public class LateralTCPListener<K, V>
-    implements ILateralCacheListener<K, V>, IShutdownObserver
-{
-    /** The logger */
-    private static final Log log = LogManager.getLog( LateralTCPListener.class );
-
-    /** How long the server will block on an accept(). 0 is infinite. */
-    private static final int acceptTimeOut = 1000;
-
-    /** The CacheHub this listener is associated with */
-    private transient ICompositeCacheManager cacheManager;
-
-    /** Map of available instances, keyed by port */
-    private static final ConcurrentHashMap<String, ILateralCacheListener<?, ?>> instances =
-        new ConcurrentHashMap<>();
-
-    /** Configuration attributes */
-    private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
-
-    /** The listener thread */
-    private Thread listenerThread;
-
-    /**
-     * Serializer for reading and writing
-     */
-    private IElementSerializer serializer;
-
-    /** put count */
-    private int putCnt;
-
-    /** remove count */
-    private int removeCnt;
-
-    /** get count */
-    private int getCnt;
-
-    /**
-     * Use the vmid by default. This can be set for testing. If we ever need to run more than one
-     * per vm, then we need a new technique.
-     */
-    private long listenerId = CacheInfo.listenerId;
-
-    /** is this shut down? */
-    private final AtomicBoolean shutdown = new AtomicBoolean();
-
-    /** is this terminated? */
-    private final AtomicBoolean terminated = new AtomicBoolean();
-
-    /**
-     * Gets the instance attribute of the LateralCacheTCPListener class.
-     * <p>
-     * @param ilca ITCPLateralCacheAttributes
-     * @param cacheMgr
-     * @return The instance value
-     * @deprecated Specify serializer
-     */
-    @Deprecated
-    @SuppressWarnings("unchecked") // Need to cast because of common map for all instances
-    public static <K, V> LateralTCPListener<K, V>
-        getInstance( final ITCPLateralCacheAttributes ilca, final ICompositeCacheManager cacheMgr)
-    {
-        return (LateralTCPListener<K, V>) instances.computeIfAbsent(
-                String.valueOf( ilca.getTcpListenerPort() ),
-                k -> {
-                    final LateralTCPListener<K, V> newIns = new LateralTCPListener<>( ilca, new StandardSerializer() );
-
-                    newIns.init();
-                    newIns.setCacheManager( cacheMgr );
-
-                    log.info("Created new listener {0}", ilca::getTcpListenerPort);
-
-                    return newIns;
-                });
-    }
-    
-    /**
-     * Gets the instance attribute of the LateralCacheTCPListener class.
-     * <p>
-     * @param ilca ITCPLateralCacheAttributes
-     * @param cacheMgr
-     * @param serializer the serializer to use when receiving
-     * @return The instance value
-     */
-    @SuppressWarnings("unchecked") // Need to cast because of common map for all instances
-    public static <K, V> LateralTCPListener<K, V>
-        getInstance( final ITCPLateralCacheAttributes ilca, final ICompositeCacheManager cacheMgr, final IElementSerializer serializer )
-    {
-        return (LateralTCPListener<K, V>) instances.computeIfAbsent(
-                String.valueOf( ilca.getTcpListenerPort() ),
-                k -> {
-                    final LateralTCPListener<K, V> newIns = new LateralTCPListener<>( ilca, serializer );
-
-                    newIns.init();
-                    newIns.setCacheManager( cacheMgr );
-
-                    log.info("Created new listener {0}", ilca::getTcpListenerPort);
-
-                    return newIns;
-                });
-    }
-
-    /**
-     * Only need one since it does work for all regions, just reference by multiple region names.
-     * <p>
-     * @param ilca
-     * @param serializer the serializer to use when receiving
-     */
-    protected LateralTCPListener( final ITCPLateralCacheAttributes ilca, final IElementSerializer serializer )
-    {
-        this.setTcpLateralCacheAttributes( ilca );
-        this.serializer = serializer;
-    }
-
-    /**
-     * This starts the ListenerThread on the specified port.
-     */
-    @Override
-    public synchronized void init()
-    {
-        try
-        {
-            final int port = getTcpLateralCacheAttributes().getTcpListenerPort();
-            final String host = getTcpLateralCacheAttributes().getTcpListenerHost();
-
-            terminated.set(false);
-            shutdown.set(false);
-
-            final ServerSocketChannel serverSocket = ServerSocketChannel.open();
-
-            SocketAddress endPoint;
-
-            if (host != null && !host.isEmpty())
-            {
-                log.info( "Listening on {0}:{1}", host, port );
-                //Bind the SocketAddress with host and port
-                endPoint = new InetSocketAddress(host, port);
-            }
-            else
-            {
-                log.info( "Listening on port {0}", port );
-                endPoint = new InetSocketAddress(port);
-            }
-
-            serverSocket.bind(endPoint);
-            serverSocket.configureBlocking(false);
-
-            listenerThread = new Thread(() -> runListener(serverSocket),
-                    "JCS-LateralTCPListener-" + host + ":" + port);
-            listenerThread.setDaemon(true);
-            listenerThread.start();
-        }
-        catch ( final IOException ex )
-        {
-            throw new IllegalStateException( ex );
-        }
-    }
-
-    /**
-     * Let the lateral cache set a listener_id. Since there is only one listener for all the
-     * regions and every region gets registered? the id shouldn't be set if it isn't zero. If it is
-     * we assume that it is a reconnect.
-     * <p>
-     * By default, the listener id is the vmid.
-     * <p>
-     * The service should set this value. This value will never be changed by a server we connect
-     * to. It needs to be non static, for unit tests.
-     * <p>
-     * The service will use the value it sets in all send requests to the sender.
-     * <p>
-     * @param id The new listenerId value
-     * @throws IOException
-     */
-    @Override
-    public void setListenerId( final long id )
-        throws IOException
-    {
-        this.listenerId = id;
-        log.debug( "set listenerId = {0}", id );
-    }
-
-    /**
-     * Gets the listenerId attribute of the LateralCacheTCPListener object
-     * <p>
-     * @return The listenerId value
-     * @throws IOException
-     */
-    @Override
-    public long getListenerId()
-        throws IOException
-    {
-        return this.listenerId;
-    }
-
-    /**
-     * Increments the put count. Gets the cache that was injected by the lateral factory. Calls put
-     * on the cache.
-     * <p>
-     * @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handlePut(org.apache.commons.jcs3.engine.behavior.ICacheElement)
-     */
-    @Override
-    public void handlePut( final ICacheElement<K, V> element )
-        throws IOException
-    {
-        putCnt++;
-        if ( log.isInfoEnabled() && getPutCnt() % 100 == 0 )
-        {
-            log.info( "Put Count (port {0}) = {1}",
-                    () -> getTcpLateralCacheAttributes().getTcpListenerPort(),
-                    this::getPutCnt);
-        }
-
-        log.debug( "handlePut> cacheName={0}, key={1}",
-                element::getCacheName, element::getKey);
-
-        getCache( element.getCacheName() ).localUpdate( element );
-    }
-
-    /**
-     * Increments the remove count. Gets the cache that was injected by the lateral factory. Calls
-     * remove on the cache.
-     * <p>
-     * @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handleRemove(java.lang.String,
-     *      Object)
-     */
-    @Override
-    public void handleRemove( final String cacheName, final K key )
-        throws IOException
-    {
-        removeCnt++;
-        if ( log.isInfoEnabled() && getRemoveCnt() % 100 == 0 )
-        {
-            log.info( "Remove Count = {0}", this::getRemoveCnt);
-        }
-
-        log.debug( "handleRemove> cacheName={0}, key={1}", cacheName, key );
-
-        getCache( cacheName ).localRemove( key );
-    }
-
-    /**
-     * Gets the cache that was injected by the lateral factory. Calls removeAll on the cache.
-     * <p>
-     * @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handleRemoveAll(java.lang.String)
-     */
-    @Override
-    public void handleRemoveAll( final String cacheName )
-        throws IOException
-    {
-        log.debug( "handleRemoveAll> cacheName={0}", cacheName );
-
-        getCache( cacheName ).localRemoveAll();
-    }
-
-    /**
-     * Gets the cache that was injected by the lateral factory. Calls get on the cache.
-     * <p>
-     * @param cacheName
-     * @param key
-     * @return a ICacheElement
-     * @throws IOException
-     */
-    public ICacheElement<K, V> handleGet( final String cacheName, final K key )
-        throws IOException
-    {
-        getCnt++;
-        if ( log.isInfoEnabled() && getGetCnt() % 100 == 0 )
-        {
-            log.info( "Get Count (port {0}) = {1}",
-                    () -> getTcpLateralCacheAttributes().getTcpListenerPort(),
-                    this::getGetCnt);
-        }
-
-        log.debug( "handleGet> cacheName={0}, key={1}", cacheName, key );
-
-        return getCache( cacheName ).localGet( key );
-    }
-
-    /**
-     * Gets the cache that was injected by the lateral factory. Calls get on the cache.
-     * <p>
-     * @param cacheName the name of the cache
-     * @param pattern the matching pattern
-     * @return Map
-     * @throws IOException
-     */
-    public Map<K, ICacheElement<K, V>> handleGetMatching( final String cacheName, final String pattern )
-        throws IOException
-    {
-        getCnt++;
-        if ( log.isInfoEnabled() && getGetCnt() % 100 == 0 )
-        {
-            log.info( "GetMatching Count (port {0}) = {1}",
-                    () -> getTcpLateralCacheAttributes().getTcpListenerPort(),
-                    this::getGetCnt);
-        }
-
-        log.debug( "handleGetMatching> cacheName={0}, pattern={1}", cacheName, pattern );
-
-        return getCache( cacheName ).localGetMatching( pattern );
-    }
-
-    /**
-     * Gets the cache that was injected by the lateral factory. Calls getKeySet on the cache.
-     * <p>
-     * @param cacheName the name of the cache
-     * @return a set of keys
-     * @throws IOException
-     */
-    public Set<K> handleGetKeySet( final String cacheName ) throws IOException
-    {
-    	return getCache( cacheName ).getKeySet(true);
-    }
-
-    /**
-     * This marks this instance as terminated.
-     * <p>
-     * @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handleDispose(java.lang.String)
-     */
-    @Override
-    public void handleDispose( final String cacheName )
-        throws IOException
-    {
-        log.info( "handleDispose > cacheName={0} | Ignoring message. "
-                + "Do not dispose from remote.", cacheName );
-
-        // TODO handle active deregistration, rather than passive detection
-        dispose();
-    }
-
-    @Override
-    public synchronized void dispose()
-    {
-        if (terminated.compareAndSet(false, true))
-        {
-            notify();
-            listenerThread.interrupt();
-        }
-    }
-
-    /**
-     * Gets the cacheManager attribute of the LateralCacheTCPListener object.
-     * <p>
-     * Normally this is set by the factory. If it wasn't set the listener defaults to the expected
-     * singleton behavior of the cache manager.
-     * <p>
-     * @param name
-     * @return CompositeCache
-     */
-    protected CompositeCache<K, V> getCache( final String name )
-    {
-        return getCacheManager().getCache( name );
-    }
-
-    /**
-     * This is roughly the number of updates the lateral has received.
-     * <p>
-     * @return Returns the putCnt.
-     */
-    public int getPutCnt()
-    {
-        return putCnt;
-    }
-
-    /**
-     * @return Returns the getCnt.
-     */
-    public int getGetCnt()
-    {
-        return getCnt;
-    }
-
-    /**
-     * @return Returns the removeCnt.
-     */
-    public int getRemoveCnt()
-    {
-        return removeCnt;
-    }
-
-    /**
-     * @param cacheMgr The cacheMgr to set.
-     */
-    @Override
-    public void setCacheManager( final ICompositeCacheManager cacheMgr )
-    {
-        this.cacheManager = cacheMgr;
-    }
-
-    /**
-     * @return Returns the cacheMgr.
-     */
-    @Override
-    public ICompositeCacheManager getCacheManager()
-    {
-        return cacheManager;
-    }
-
-    /**
-     * @param tcpLateralCacheAttributes The tcpLateralCacheAttributes to set.
-     */
-    public void setTcpLateralCacheAttributes( final ITCPLateralCacheAttributes tcpLateralCacheAttributes )
-    {
-        this.tcpLateralCacheAttributes = tcpLateralCacheAttributes;
-    }
-
-    /**
-     * @return Returns the tcpLateralCacheAttributes.
-     */
-    public ITCPLateralCacheAttributes getTcpLateralCacheAttributes()
-    {
-        return tcpLateralCacheAttributes;
-    }
-
-    /**
-     * Processes commands from the server socket. There should be one listener for each configured
-     * TCP lateral.
-     * @deprecated No longer used
-     */
-    @Deprecated
-    public class ListenerThread
-        extends Thread
-    {
-        /** The socket listener */
-        private final ServerSocket serverSocket;
-
-        /**
-         * Constructor
-         *
-         * @param serverSocket
-         */
-        public ListenerThread(final ServerSocket serverSocket)
-        {
-            this.serverSocket = serverSocket;
-        }
-
-        /** Main processing method for the ListenerThread object */
-        @Override
-        public void run()
-        {
-            runListener(serverSocket.getChannel());
-        }
-    }
-
-    /**
-     * Processes commands from the server socket. There should be one listener for each configured
-     * TCP lateral.
-     */
-    private void runListener(final ServerSocketChannel serverSocket)
-    {
-        try (Selector selector = Selector.open())
-        {
-            serverSocket.register(selector, SelectionKey.OP_ACCEPT);
-            log.debug("Waiting for clients to connect");
-
-            // Check to see if we've been asked to exit, and exit
-            while (!terminated.get())
-            {
-                int activeKeys = selector.select(acceptTimeOut);
-                if (activeKeys == 0)
-                {
-                    continue;
-                }
-
-                for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();)
-                {
-                    if (terminated.get())
-                    {
-                        break;
-                    }
-
-                    SelectionKey key = i.next();
-
-                    if (!key.isValid())
-                    {
-                        continue;
-                    }
-
-                    if (key.isAcceptable())
-                    {
-                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
-                        SocketChannel client = server.accept();
-                        if (client == null)
-                        {
-                            //may happen in non-blocking mode
-                            continue;
-                        }
-
-                        log.info("Connected to client at {0}", client.getRemoteAddress());
-
-                        client.configureBlocking(false);
-                        client.register(selector, SelectionKey.OP_READ);
-                    }
-
-                    if (key.isReadable())
-                    {
-                        handleClient(key);
-                    }
-
-                    i.remove();
-                }
-            }
-
-            log.debug("Thread terminated, exiting gracefully");
-
-            //close all registered channels
-            selector.keys().forEach(key -> {
-                try
-                {
-                    key.channel().close();
-                }
-                catch (IOException e)
-                {
-                    log.warn("Problem closing channel", e);
-                }
-            });
-        }
-        catch (final IOException e)
-        {
-            log.error( "Exception caught in TCP listener", e );
-        }
-        finally
-        {
-            try
-            {
-                serverSocket.close();
-            }
-            catch (IOException e)
-            {
-                log.error( "Exception closing TCP listener", e );
-            }
-        }
-    }
-
-    /**
-     * A Separate thread that runs when a command comes into the LateralTCPReceiver.
-     * @deprecated No longer used
-     */
-    @Deprecated
-    public class ConnectionHandler
-        implements Runnable
-    {
-        /** The socket connection, passed in via constructor */
-        private final Socket socket;
-
-        /**
-         * Construct for a given socket
-         * @param socket
-         */
-        public ConnectionHandler( final Socket socket )
-        {
-            this.socket = socket;
-        }
-
-        /**
-         * Main processing method for the LateralTCPReceiverConnection object
-         */
-        @Override
-        public void run()
-        {
-            try (InputStream is = socket.getInputStream())
-            {
-                while ( true )
-                {
-                    final LateralElementDescriptor<K, V> led =
-                            serializer.deSerializeFrom(is, null);
-
-                    if ( led == null )
-                    {
-                        log.debug( "LateralElementDescriptor is null" );
-                        continue;
-                    }
-                    if ( led.getRequesterId() == getListenerId() )
-                    {
-                        log.debug( "from self" );
-                    }
-                    else
-                    {
-                        log.debug( "receiving LateralElementDescriptor from another led = {0}",
-                                led );
-
-                        Object obj = handleElement(led);
-                        if (obj != null)
-                        {
-                            OutputStream os = socket.getOutputStream();
-                            serializer.serializeTo(obj, os);
-                            os.flush();
-                        }
-                    }
-                }
-            }
-            catch (final IOException e)
-            {
-                log.info("Caught {0}, closing connection.", e.getClass().getSimpleName(), e);
-            }
-            catch (final ClassNotFoundException e)
-            {
-                log.error( "Deserialization failed reading from socket", e );
-            }
-        }
-    }
-
-    /**
-     * A Separate thread that runs when a command comes into the LateralTCPReceiver.
-     */
-    private void handleClient(final SelectionKey key)
-    {
-        final SocketChannel socketChannel = (SocketChannel) key.channel();
-
-        try
-        {
-            final LateralElementDescriptor<K, V> led =
-                    serializer.deSerializeFrom(socketChannel, null);
-
-            if ( led == null )
-            {
-                log.debug("LateralElementDescriptor is null");
-                return;
-            }
-
-            if ( led.getRequesterId() == getListenerId() )
-            {
-                log.debug( "from self" );
-            }
-            else
-            {
-                log.debug( "receiving LateralElementDescriptor from another led = {0}",
-                        led );
-
-                Object obj = handleElement(led);
-                if (obj != null)
-                {
-                    serializer.serializeTo(obj, socketChannel);
-                }
-            }
-        }
-        catch (final IOException e)
-        {
-            log.info("Caught {0}, closing connection.", e.getClass().getSimpleName(), e);
-            try
-            {
-                socketChannel.close();
-            }
-            catch (IOException e1)
-            {
-                log.error("Error while closing connection", e );
-            }
-        }
-        catch (final ClassNotFoundException e)
-        {
-            log.error( "Deserialization failed reading from socket", e );
-        }
-    }
-
-    /**
-     * This calls the appropriate method, based on the command sent in the Lateral element
-     * descriptor.
-     * <p>
-     * @param led the lateral element
-     * @return a possible response
-     * @throws IOException
-     */
-    private Object handleElement(final LateralElementDescriptor<K, V> led) throws IOException
-    {
-        final String cacheName = led.getPayload().getCacheName();
-        final K key = led.getPayload().getKey();
-        Object obj = null;
-
-        switch (led.getCommand())
-        {
-            case UPDATE:
-                handlePut(led.getPayload());
-                break;
-
-            case REMOVE:
-                // if a hashcode was given and filtering is on
-                // check to see if they are the same
-                // if so, then don't remove, otherwise issue a remove
-                if (led.getValHashCode() != -1 &&
-                    getTcpLateralCacheAttributes().isFilterRemoveByHashCode())
-                {
-                    final ICacheElement<K, V> test = getCache( cacheName ).localGet( key );
-                    if ( test != null )
-                    {
-                        if ( test.getVal().hashCode() == led.getValHashCode() )
-                        {
-                            log.debug( "Filtering detected identical hashCode [{0}], "
-                                    + "not issuing a remove for led {1}",
-                                    led.getValHashCode(), led );
-                            return null;
-                        }
-                        log.debug( "Different hashcodes, in cache [{0}] sent [{1}]",
-                                test.getVal()::hashCode, led::getValHashCode );
-                    }
-                }
-                handleRemove( cacheName, key );
-                break;
-
-            case REMOVEALL:
-                handleRemoveAll( cacheName );
-                break;
-
-            case GET:
-                obj = handleGet( cacheName, key );
-                break;
-
-            case GET_MATCHING:
-                obj = handleGetMatching( cacheName, (String) key );
-                break;
-
-            case GET_KEYSET:
-                obj = handleGetKeySet(cacheName);
-                break;
-
-            default: break;
-        }
-
-        return obj;
-    }
-
-    /**
-     * Shuts down the receiver.
-     */
-    @Override
-    public void shutdown()
-    {
-        if ( shutdown.compareAndSet(false, true) )
-        {
-            log.info( "Shutting down TCP Lateral receiver." );
-            dispose();
-        }
-        else
-        {
-            log.debug( "Shutdown already called." );
-        }
-    }
-}
+package org.apache.commons.jcs3.auxiliary.lateral.socket.tcp;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.jcs3.auxiliary.lateral.LateralElementDescriptor;
+import org.apache.commons.jcs3.auxiliary.lateral.behavior.ILateralCacheListener;
+import org.apache.commons.jcs3.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
+import org.apache.commons.jcs3.engine.CacheInfo;
+import org.apache.commons.jcs3.engine.behavior.ICacheElement;
+import org.apache.commons.jcs3.engine.behavior.ICompositeCacheManager;
+import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
+import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
+import org.apache.commons.jcs3.engine.control.CompositeCache;
+import org.apache.commons.jcs3.log.Log;
+import org.apache.commons.jcs3.log.LogManager;
+import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
+
+/**
+ * Listens for connections from other TCP lateral caches and handles them. The initialization method
+ * starts a listening thread, which creates a socket server. When messages are received they are
+ * passed to a pooled executor which then calls the appropriate handle method.
+ */
+public class LateralTCPListener<K, V>
+    implements ILateralCacheListener<K, V>, IShutdownObserver
+{
+    /** The logger */
+    private static final Log log = LogManager.getLog( LateralTCPListener.class );
+
+    /** How long the server will block on an accept(). 0 is infinite. */
+    private static final int acceptTimeOut = 1000;
+
+    /** The CacheHub this listener is associated with */
+    private transient ICompositeCacheManager cacheManager;
+
+    /** Map of available instances, keyed by port */
+    private static final ConcurrentHashMap<String, ILateralCacheListener<?, ?>> instances =
+        new ConcurrentHashMap<>();
+
+    /** Configuration attributes */
+    private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
+
+    /** The listener thread */
+    private Thread listenerThread;
+
+    /**
+     * Serializer for reading and writing
+     */
+    private IElementSerializer serializer;
+
+    /** put count */
+    private int putCnt;
+
+    /** remove count */
+    private int removeCnt;
+
+    /** get count */
+    private int getCnt;
+
+    /**
+     * Use the vmid by default. This can be set for testing. If we ever need to run more than one
+     * per vm, then we need a new technique.
+     */
+    private long listenerId = CacheInfo.listenerId;
+
+    /** is this shut down? */
+    private final AtomicBoolean shutdown = new AtomicBoolean();
+
+    /** is this terminated? */
+    private final AtomicBoolean terminated = new AtomicBoolean();
+
+    /**
+     * Gets the instance attribute of the LateralCacheTCPListener class.
+     * <p>
+     * @param ilca ITCPLateralCacheAttributes
+     * @param cacheMgr
+     * @return The instance value
+     * @deprecated Specify serializer
+     */
+    @Deprecated
+    @SuppressWarnings("unchecked") // Need to cast because of common map for all instances
+    public static <K, V> LateralTCPListener<K, V>
+        getInstance( final ITCPLateralCacheAttributes ilca, final ICompositeCacheManager cacheMgr)
+    {
+        return (LateralTCPListener<K, V>) instances.computeIfAbsent(
+                String.valueOf( ilca.getTcpListenerPort() ),
+                k -> {
+                    final LateralTCPListener<K, V> newIns = new LateralTCPListener<>( ilca, new StandardSerializer() );
+
+                    newIns.init();
+                    newIns.setCacheManager( cacheMgr );
+
+                    log.info("Created new listener {0}", ilca::getTcpListenerPort);
+
+                    return newIns;
+                });
+    }
+    
+    /**
+     * Gets the instance attribute of the LateralCacheTCPListener class.
+     * <p>
+     * @param ilca ITCPLateralCacheAttributes
+     * @param cacheMgr
+     * @param serializer the serializer to use when receiving
+     * @return The instance value
+     */
+    @SuppressWarnings("unchecked") // Need to cast because of common map for all instances
+    public static <K, V> LateralTCPListener<K, V>
+        getInstance( final ITCPLateralCacheAttributes ilca, final ICompositeCacheManager cacheMgr, final IElementSerializer serializer )
+    {
+        return (LateralTCPListener<K, V>) instances.computeIfAbsent(
+                String.valueOf( ilca.getTcpListenerPort() ),
+                k -> {
+                    final LateralTCPListener<K, V> newIns = new LateralTCPListener<>( ilca, serializer );
+
+                    newIns.init();
+                    newIns.setCacheManager( cacheMgr );
+
+                    log.info("Created new listener {0}", ilca::getTcpListenerPort);
+
+                    return newIns;
+                });
+    }
+
+    /**
+     * Only need one since it does work for all regions, just reference by multiple region names.
+     * <p>
+     * @param ilca
+     * @param serializer the serializer to use when receiving
+     */
+    protected LateralTCPListener( final ITCPLateralCacheAttributes ilca, final IElementSerializer serializer )
+    {
+        this.setTcpLateralCacheAttributes( ilca );
+        this.serializer = serializer;
+    }
+
+    /**
+     * This starts the ListenerThread on the specified port.
+     */
+    @Override
+    public synchronized void init()
+    {
+        try
+        {
+            final int port = getTcpLateralCacheAttributes().getTcpListenerPort();
+            final String host = getTcpLateralCacheAttributes().getTcpListenerHost();
+
+            terminated.set(false);
+            shutdown.set(false);
+
+            final ServerSocketChannel serverSocket = ServerSocketChannel.open();
+
+            SocketAddress endPoint;
+
+            if (host != null && !host.isEmpty())
+            {
+                log.info( "Listening on {0}:{1}", host, port );
+                //Bind the SocketAddress with host and port
+                endPoint = new InetSocketAddress(host, port);
+            }
+            else
+            {
+                log.info( "Listening on port {0}", port );
+                endPoint = new InetSocketAddress(port);
+            }
+
+            serverSocket.bind(endPoint);
+            serverSocket.configureBlocking(false);
+
+            listenerThread = new Thread(() -> runListener(serverSocket),
+                    "JCS-LateralTCPListener-" + host + ":" + port);
+            listenerThread.setDaemon(true);
+            listenerThread.start();
+        }
+        catch ( final IOException ex )
+        {
+            throw new IllegalStateException( ex );
+        }
+    }
+
+    /**
+     * Let the lateral cache set a listener_id. Since there is only one listener for all the
+     * regions and every region gets registered? the id shouldn't be set if it isn't zero. If it is
+     * we assume that it is a reconnect.
+     * <p>
+     * By default, the listener id is the vmid.
+     * <p>
+     * The service should set this value. This value will never be changed by a server we connect
+     * to. It needs to be non static, for unit tests.
+     * <p>
+     * The service will use the value it sets in all send requests to the sender.
+     * <p>
+     * @param id The new listenerId value
+     * @throws IOException
+     */
+    @Override
+    public void setListenerId( final long id )
+        throws IOException
+    {
+        this.listenerId = id;
+        log.debug( "set listenerId = {0}", id );
+    }
+
+    /**
+     * Gets the listenerId attribute of the LateralCacheTCPListener object
+     * <p>
+     * @return The listenerId value
+     * @throws IOException
+     */
+    @Override
+    public long getListenerId()
+        throws IOException
+    {
+        return this.listenerId;
+    }
+
+    /**
+     * Increments the put count. Gets the cache that was injected by the lateral factory. Calls put
+     * on the cache.
+     * <p>
+     * @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handlePut(org.apache.commons.jcs3.engine.behavior.ICacheElement)
+     */
+    @Override
+    public void handlePut( final ICacheElement<K, V> element )
+        throws IOException
+    {
+        putCnt++;
+        if ( log.isInfoEnabled() && getPutCnt() % 100 == 0 )
+        {
+            log.info( "Put Count (port {0}) = {1}",
+                    () -> getTcpLateralCacheAttributes().getTcpListenerPort(),
+                    this::getPutCnt);
+        }
+
+        log.debug( "handlePut> cacheName={0}, key={1}",
+                element::getCacheName, element::getKey);
+
+        getCache( element.getCacheName() ).localUpdate( element );
+    }
+
+    /**
+     * Increments the remove count. Gets the cache that was injected by the lateral factory. Calls
+     * remove on the cache.
+     * <p>
+     * @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handleRemove(String,
+     *      Object)
+     */
+    @Override
+    public void handleRemove( final String cacheName, final K key )
+        throws IOException
+    {
+        removeCnt++;
+        if ( log.isInfoEnabled() && getRemoveCnt() % 100 == 0 )
+        {
+            log.info( "Remove Count = {0}", this::getRemoveCnt);
+        }
+
+        log.debug( "handleRemove> cacheName={0}, key={1}", cacheName, key );
+
+        getCache( cacheName ).localRemove( key );
+    }
+
+    /**
+     * Gets the cache that was injected by the lateral factory. Calls removeAll on the cache.
+     * <p>
+     * @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handleRemoveAll(String)
+     */
+    @Override
+    public void handleRemoveAll( final String cacheName )
+        throws IOException
+    {
+        log.debug( "handleRemoveAll> cacheName={0}", cacheName );
+
+        getCache( cacheName ).localRemoveAll();
+    }
+
+    /**
+     * Gets the cache that was injected by the lateral factory. Calls get on the cache.
+     * <p>
+     * @param cacheName
+     * @param key
+     * @return a ICacheElement
+     * @throws IOException
+     */
+    public ICacheElement<K, V> handleGet( final String cacheName, final K key )
+        throws IOException
+    {
+        getCnt++;
+        if ( log.isInfoEnabled() && getGetCnt() % 100 == 0 )
+        {
+            log.info( "Get Count (port {0}) = {1}",
+                    () -> getTcpLateralCacheAttributes().getTcpListenerPort(),
+                    this::getGetCnt);
+        }
+
+        log.debug( "handleGet> cacheName={0}, key={1}", cacheName, key );
+
+        return getCache( cacheName ).localGet( key );
+    }
+
+    /**
+     * Gets the cache that was injected by the lateral factory. Calls get on the cache.
+     * <p>
+     * @param cacheName the name of the cache
+     * @param pattern the matching pattern
+     * @return Map
+     * @throws IOException
+     */
+    public Map<K, ICacheElement<K, V>> handleGetMatching( final String cacheName, final String pattern )
+        throws IOException
+    {
+        getCnt++;
+        if ( log.isInfoEnabled() && getGetCnt() % 100 == 0 )
+        {
+            log.info( "GetMatching Count (port {0}) = {1}",
+                    () -> getTcpLateralCacheAttributes().getTcpListenerPort(),
+                    this::getGetCnt);
+        }
+
+        log.debug( "handleGetMatching> cacheName={0}, pattern={1}", cacheName, pattern );
+
+        return getCache( cacheName ).localGetMatching( pattern );
+    }
+
+    /**
+     * Gets the cache that was injected by the lateral factory. Calls getKeySet on the cache.
+     * <p>
+     * @param cacheName the name of the cache
+     * @return a set of keys
+     * @throws IOException
+     */
+    public Set<K> handleGetKeySet( final String cacheName ) throws IOException
+    {
+    	return getCache( cacheName ).getKeySet(true);
+    }
+
+    /**
+     * This marks this instance as terminated.
+     * <p>
+     * @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handleDispose(String)
+     */
+    @Override
+    public void handleDispose( final String cacheName )
+        throws IOException
+    {
+        log.info( "handleDispose > cacheName={0} | Ignoring message. "
+                + "Do not dispose from remote.", cacheName );
+
+        // TODO handle active deregistration, rather than passive detection
+        dispose();
+    }
+
+    @Override
+    public synchronized void dispose()
+    {
+        if (terminated.compareAndSet(false, true))
+        {
+            notify();
+            listenerThread.interrupt();
+        }
+    }
+
+    /**
+     * Gets the cacheManager attribute of the LateralCacheTCPListener object.
+     * <p>
+     * Normally this is set by the factory. If it wasn't set the listener defaults to the expected
+     * singleton behavior of the cache manager.
+     * <p>
+     * @param name
+     * @return CompositeCache
+     */
+    protected CompositeCache<K, V> getCache( final String name )
+    {
+        return getCacheManager().getCache( name );
+    }
+
+    /**
+     * This is roughly the number of updates the lateral has received.
+     * <p>
+     * @return Returns the putCnt.
+     */
+    public int getPutCnt()
+    {
+        return putCnt;
+    }
+
+    /**
+     * @return Returns the getCnt.
+     */
+    public int getGetCnt()
+    {
+        return getCnt;
+    }
+
+    /**
+     * @return Returns the removeCnt.
+     */
+    public int getRemoveCnt()
+    {
+        return removeCnt;
+    }
+
+    /**
+     * @param cacheMgr The cacheMgr to set.
+     */
+    @Override
+    public void setCacheManager( final ICompositeCacheManager cacheMgr )
+    {
+        this.cacheManager = cacheMgr;
+    }
+
+    /**
+     * @return Returns the cacheMgr.
+     */
+    @Override
+    public ICompositeCacheManager getCacheManager()
+    {
+        return cacheManager;
+    }
+
+    /**
+     * @param tcpLateralCacheAttributes The tcpLateralCacheAttributes to set.
+     */
+    public void setTcpLateralCacheAttributes( final ITCPLateralCacheAttributes tcpLateralCacheAttributes )
+    {
+        this.tcpLateralCacheAttributes = tcpLateralCacheAttributes;
+    }
+
+    /**
+     * @return Returns the tcpLateralCacheAttributes.
+     */
+    public ITCPLateralCacheAttributes getTcpLateralCacheAttributes()
+    {
+        return tcpLateralCacheAttributes;
+    }
+
+    /**
+     * Processes commands from the server socket. There should be one listener for each configured
+     * TCP lateral.
+     * @deprecated No longer used
+     */
+    @Deprecated
+    public class ListenerThread
+        extends Thread
+    {
+        /** The socket listener */
+        private final ServerSocket serverSocket;
+
+        /**
+         * Constructor
+         *
+         * @param serverSocket
+         */
+        public ListenerThread(final ServerSocket serverSocket)
+        {
+            this.serverSocket = serverSocket;
+        }
+
+        /** Main processing method for the ListenerThread object */
+        @Override
+        public void run()
+        {
+            runListener(serverSocket.getChannel());
+        }
+    }
+
+    /**
+     * Processes commands from the server socket. There should be one listener for each configured
+     * TCP lateral.
+     */
+    private void runListener(final ServerSocketChannel serverSocket)
+    {
+        try (Selector selector = Selector.open())
+        {
+            serverSocket.register(selector, SelectionKey.OP_ACCEPT);
+            log.debug("Waiting for clients to connect");
+
+            // Check to see if we've been asked to exit, and exit
+            while (!terminated.get())
+            {
+                int activeKeys = selector.select(acceptTimeOut);
+                if (activeKeys == 0)
+                {
+                    continue;
+                }
+
+                for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();)
+                {
+                    if (terminated.get())
+                    {
+                        break;
+                    }
+
+                    SelectionKey key = i.next();
+
+                    if (!key.isValid())
+                    {
+                        continue;
+                    }
+
+                    if (key.isAcceptable())
+                    {
+                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
+                        SocketChannel client = server.accept();
+                        if (client == null)
+                        {
+                            //may happen in non-blocking mode
+                            continue;
+                        }
+
+                        log.info("Connected to client at {0}", client.getRemoteAddress());
+
+                        client.configureBlocking(false);
+                        client.register(selector, SelectionKey.OP_READ);
+                    }
+
+                    if (key.isReadable())
+                    {
+                        handleClient(key);
+                    }
+
+                    i.remove();
+                }
+            }
+
+            log.debug("Thread terminated, exiting gracefully");
+
+            //close all registered channels
+            selector.keys().forEach(key -> {
+                try
+                {
+                    key.channel().close();
+                }
+                catch (IOException e)
+                {
+                    log.warn("Problem closing channel", e);
+                }
+            });
+        }
+        catch (final IOException e)
+        {
+            log.error( "Exception caught in TCP listener", e );
+        }
+        finally
+        {
+            try
+            {
+                serverSocket.close();
+            }
+            catch (IOException e)
+            {
+                log.error( "Exception closing TCP listener", e );
+            }
+        }
+    }
+
+    /**
+     * A Separate thread that runs when a command comes into the LateralTCPReceiver.
+     * @deprecated No longer used
+     */
+    @Deprecated
+    public class ConnectionHandler
+        implements Runnable
+    {
+        /** The socket connection, passed in via constructor */
+        private final Socket socket;
+
+        /**
+         * Construct for a given socket
+         * @param socket
+         */
+        public ConnectionHandler( final Socket socket )
+        {
+            this.socket = socket;
+        }
+
+        /**
+         * Main processing method for the LateralTCPReceiverConnection object
+         */
+        @Override
+        public void run()
+        {
+            try (InputStream is = socket.getInputStream())
+            {
+                while ( true )
+                {
+                    final LateralElementDescriptor<K, V> led =
+                            serializer.deSerializeFrom(is, null);
+
+                    if ( led == null )
+                    {
+                        log.debug( "LateralElementDescriptor is null" );
+                        continue;
+                    }
+                    if ( led.getRequesterId() == getListenerId() )
+                    {
+                        log.debug( "from self" );
+                    }
+                    else
+                    {
+                        log.debug( "receiving LateralElementDescriptor from another led = {0}",
+                                led );
+
+                        Object obj = handleElement(led);
+                        if (obj != null)
+                        {
+                            OutputStream os = socket.getOutputStream();
+                            serializer.serializeTo(obj, os);
+                            os.flush();
+                        }
+                    }
+                }
+            }
+            catch (final IOException e)
+            {
+                log.info("Caught {0}, closing connection.", e.getClass().getSimpleName(), e);
+            }
+            catch (final ClassNotFoundException e)
+            {
+                log.error( "Deserialization failed reading from socket", e );
+            }
+        }
+    }
+
+    /**
+     * A Separate thread that runs when a command comes into the LateralTCPReceiver.
+     */
+    private void handleClient(final SelectionKey key)
+    {
+        final SocketChannel socketChannel = (SocketChannel) key.channel();
+
+        try
+        {
+            final LateralElementDescriptor<K, V> led =
+                    serializer.deSerializeFrom(socketChannel, null);
+
+            if ( led == null )
+            {
+                log.debug("LateralElementDescriptor is null");
+                return;
+            }
+
+            if ( led.getRequesterId() == getListenerId() )
+            {
+                log.debug( "from self" );
+            }
+            else
+            {
+                log.debug( "receiving LateralElementDescriptor from another led = {0}",
+                        led );
+
+                Object obj = handleElement(led);
+                if (obj != null)
+                {
+                    serializer.serializeTo(obj, socketChannel);
+                }
+            }
+        }
+        catch (final IOException e)
+        {
+            log.info("Caught {0}, closing connection.", e.getClass().getSimpleName(), e);
+            try
+            {
+                socketChannel.close();
+            }
+            catch (IOException e1)
+            {
+                log.error("Error while closing connection", e );
+            }
+        }
+        catch (final ClassNotFoundException e)
+        {
+            log.error( "Deserialization failed reading from socket", e );
+        }
+    }
+
+    /**
+     * This calls the appropriate method, based on the command sent in the Lateral element
+     * descriptor.
+     * <p>
+     * @param led the lateral element
+     * @return a possible response
+     * @throws IOException
+     */
+    private Object handleElement(final LateralElementDescriptor<K, V> led) throws IOException
+    {
+        final String cacheName = led.getPayload().getCacheName();
+        final K key = led.getPayload().getKey();
+        Object obj = null;
+
+        switch (led.getCommand())
+        {
+            case UPDATE:
+                handlePut(led.getPayload());
+                break;
+
+            case REMOVE:
+                // if a hashcode was given and filtering is on
+                // check to see if they are the same
+                // if so, then don't remove, otherwise issue a remove
+                if (led.getValHashCode() != -1 &&
+                    getTcpLateralCacheAttributes().isFilterRemoveByHashCode())
+                {
+                    final ICacheElement<K, V> test = getCache( cacheName ).localGet( key );
+                    if ( test != null )
+                    {
+                        if ( test.getVal().hashCode() == led.getValHashCode() )
+                        {
+                            log.debug( "Filtering detected identical hashCode [{0}], "
+                                    + "not issuing a remove for led {1}",
+                                    led.getValHashCode(), led );
+                            return null;
+                        }
+                        log.debug( "Different hashcodes, in cache [{0}] sent [{1}]",
+                                test.getVal()::hashCode, led::getValHashCode );
+                    }
+                }
+                handleRemove( cacheName, key );
+                break;
+
+            case REMOVEALL:
+                handleRemoveAll( cacheName );
+                break;
+
+            case GET:
+                obj = handleGet( cacheName, key );
+                break;
+
+            case GET_MATCHING:
+                obj = handleGetMatching( cacheName, (String) key );
+                break;
+
+            case GET_KEYSET:
+                obj = handleGetKeySet(cacheName);
+                break;
+
+            default: break;
+        }
+
+        return obj;
+    }
+
+    /**
+     * Shuts down the receiver.
+     */
+    @Override
+    public void shutdown()
+    {
+        if ( shutdown.compareAndSet(false, true) )
+        {
+            log.info( "Shutting down TCP Lateral receiver." );
+            dispose();
+        }
+        else
+        {
+            log.debug( "Shutdown already called." );
+        }
+    }
+}
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheNoWait.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheNoWait.java
index e0d77337..c8c998e9 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheNoWait.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheNoWait.java
@@ -1,516 +1,516 @@
-package org.apache.commons.jcs3.auxiliary.remote;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.IOException;
-import java.rmi.UnmarshalException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.jcs3.auxiliary.AbstractAuxiliaryCache;
-import org.apache.commons.jcs3.auxiliary.AuxiliaryCacheAttributes;
-import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheClient;
-import org.apache.commons.jcs3.engine.CacheAdaptor;
-import org.apache.commons.jcs3.engine.CacheEventQueueFactory;
-import org.apache.commons.jcs3.engine.CacheStatus;
-import org.apache.commons.jcs3.engine.behavior.ICacheElement;
-import org.apache.commons.jcs3.engine.behavior.ICacheEventQueue;
-import org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal;
-import org.apache.commons.jcs3.engine.stats.StatElement;
-import org.apache.commons.jcs3.engine.stats.Stats;
-import org.apache.commons.jcs3.engine.stats.behavior.IStatElement;
-import org.apache.commons.jcs3.engine.stats.behavior.IStats;
-import org.apache.commons.jcs3.log.Log;
-import org.apache.commons.jcs3.log.LogManager;
-
-/**
- * The RemoteCacheNoWait wraps the RemoteCacheClient. The client holds a handle on the
- * RemoteCacheService.
- * <p>
- * Used to queue up update requests to the underlying cache. These requests will be processed in
- * their order of arrival via the cache event queue processor.
- * <p>
- * Typically errors will be handled down stream. We only need to kill the queue if an error makes it
- * to this level from the queue. That can only happen if the queue is damaged, since the events are
- * Processed asynchronously.
- * <p>
- * There is no reason to create a queue on startup if the remote is not healthy.
- * <p>
- * If the remote cache encounters an error it will zombie--create a balking facade for the service.
- * The Zombie will queue up items until the connection is restored. An alternative way to accomplish
- * the same thing would be to stop, not destroy the queue at this level. That way items would be
- * added to the queue and then when the connection is restored, we could start the worker threads
- * again. This is a better long term solution, but it requires some significant changes to the
- * complicated worker queues.
- */
-public class RemoteCacheNoWait<K, V>
-    extends AbstractAuxiliaryCache<K, V>
-{
-    /** log instance */
-    private static final Log log = LogManager.getLog( RemoteCacheNoWait.class );
-
-    /** The remote cache client */
-    private final IRemoteCacheClient<K, V> remoteCacheClient;
-
-    /** Event queue for queuing up calls like put and remove. */
-    private ICacheEventQueue<K, V> cacheEventQueue;
-
-    /** how many times get has been called. */
-    private int getCount;
-
-    /** how many times getMatching has been called. */
-    private int getMatchingCount;
-
-    /** how many times getMultiple has been called. */
-    private int getMultipleCount;
-
-    /** how many times remove has been called. */
-    private int removeCount;
-
-    /** how many times put has been called. */
-    private int putCount;
-
-    /**
-     * Constructs with the given remote cache, and fires up an event queue for asynchronous
-     * processing.
-     * <p>
-     * @param cache
-     */
-    public RemoteCacheNoWait( final IRemoteCacheClient<K, V> cache )
-    {
-        this.remoteCacheClient = cache;
-        this.cacheEventQueue = createCacheEventQueue(cache);
-
-        if ( remoteCacheClient.getStatus() == CacheStatus.ERROR )
-        {
-            cacheEventQueue.destroy();
-        }
-    }
-
-    /**
-     * Create a cache event queue from the parameters of the remote client
-     * @param client the remote client
-     */
-    private ICacheEventQueue<K, V> createCacheEventQueue( final IRemoteCacheClient<K, V> client )
-    {
-        final CacheEventQueueFactory<K, V> factory = new CacheEventQueueFactory<>();
-        return factory.createCacheEventQueue(
-            new CacheAdaptor<>( client ),
-            client.getListenerId(),
-            client.getCacheName(),
-            client.getAuxiliaryCacheAttributes().getEventQueuePoolName(),
-            client.getAuxiliaryCacheAttributes().getEventQueueType() );
-    }
-
-    /**
-     * Adds a put event to the queue.
-     * <p>
-     * @param element
-     * @throws IOException
-     */
-    @Override
-    public void update( final ICacheElement<K, V> element )
-        throws IOException
-    {
-        putCount++;
-        try
-        {
-            cacheEventQueue.addPutEvent( element );
-        }
-        catch ( final IOException e )
-        {
-            log.error( "Problem adding putEvent to queue.", e );
-            cacheEventQueue.destroy();
-            throw e;
-        }
-    }
-
-    /**
-     * Synchronously reads from the remote cache.
-     * <p>
-     * @param key
-     * @return element from the remote cache, or null if not present
-     * @throws IOException
-     */
-    @Override
-    public ICacheElement<K, V> get( final K key )
-        throws IOException
-    {
-        getCount++;
-        try
-        {
-            return remoteCacheClient.get( key );
-        }
-        catch ( final UnmarshalException ue )
-        {
-            log.debug( "Retrying the get owing to UnmarshalException." );
-
-            try
-            {
-                return remoteCacheClient.get( key );
-            }
-            catch ( final IOException ex )
-            {
-                log.info( "Failed in retrying the get for the second time. ", ex );
-            }
-        }
-        catch ( final IOException ex )
-        {
-            // We don't want to destroy the queue on a get failure.
-            // The RemoteCache will Zombie and queue.
-            // Since get does not use the queue, I don't want to kill the queue.
-            throw ex;
-        }
-
-        return null;
-    }
-
-    /**
-     * @param pattern
-     * @return Map
-     * @throws IOException
-     *
-     */
-    @Override
-    public Map<K, ICacheElement<K, V>> getMatching( final String pattern )
-        throws IOException
-    {
-        getMatchingCount++;
-        try
-        {
-            return remoteCacheClient.getMatching( pattern );
-        }
-        catch ( final UnmarshalException ue )
-        {
-            log.debug( "Retrying the getMatching owing to UnmarshalException." );
-
-            try
-            {
-                return remoteCacheClient.getMatching( pattern );
-            }
-            catch ( final IOException ex )
-            {
-                log.info( "Failed in retrying the getMatching for the second time.", ex );
-            }
-        }
-        catch ( final IOException ex )
-        {
-            // We don't want to destroy the queue on a get failure.
-            // The RemoteCache will Zombie and queue.
-            // Since get does not use the queue, I don't want to kill the queue.
-            throw ex;
-        }
-
-        return Collections.emptyMap();
-    }
-
-    /**
-     * Gets multiple items from the cache based on the given set of keys. Sends the getMultiple
-     * request on to the server rather than looping through the requested keys.
-     * <p>
-     * @param keys
-     * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
-     *         data in cache for any of these keys
-     * @throws IOException
-     */
-    @Override
-    public Map<K, ICacheElement<K, V>> getMultiple( final Set<K> keys )
-        throws IOException
-    {
-        getMultipleCount++;
-        try
-        {
-            return remoteCacheClient.getMultiple( keys );
-        }
-        catch ( final UnmarshalException ue )
-        {
-            log.debug( "Retrying the getMultiple owing to UnmarshalException..." );
-
-            try
-            {
-                return remoteCacheClient.getMultiple( keys );
-            }
-            catch ( final IOException ex )
-            {
-                log.info( "Failed in retrying the getMultiple for the second time.", ex );
-            }
-        }
-        catch ( final IOException ex )
-        {
-            // We don't want to destroy the queue on a get failure.
-            // The RemoteCache will Zombie and queue.
-            // Since get does not use the queue, I don't want to kill the queue.
-            throw ex;
-        }
-
-        return new HashMap<>();
-    }
-
-    /**
-     * Return the keys in this cache.
-     * <p>
-     * @see org.apache.commons.jcs3.auxiliary.AuxiliaryCache#getKeySet()
-     */
-    @Override
-    public Set<K> getKeySet() throws IOException
-    {
-        return remoteCacheClient.getKeySet();
-    }
-
-    /**
-     * Adds a remove request to the remote cache.
-     * <p>
-     * @param key
-     * @return if this was successful
-     * @throws IOException
-     */
-    @Override
-    public boolean remove( final K key )
-        throws IOException
-    {
-        removeCount++;
-        try
-        {
-            cacheEventQueue.addRemoveEvent( key );
-        }
-        catch ( final IOException e )
-        {
-            log.error( "Problem adding RemoveEvent to queue.", e );
-            cacheEventQueue.destroy();
-            throw e;
-        }
-        return false;
-    }
-
-    /**
-     * Adds a removeAll request to the remote cache.
-     * <p>
-     * @throws IOException
-     */
-    @Override
-    public void removeAll()
-        throws IOException
-    {
-        try
-        {
-            cacheEventQueue.addRemoveAllEvent();
-        }
-        catch ( final IOException e )
-        {
-            log.error( "Problem adding RemoveAllEvent to queue.", e );
-            cacheEventQueue.destroy();
-            throw e;
-        }
-    }
-
-    /** Adds a dispose request to the remote cache. */
-    @Override
-    public void dispose()
-    {
-        try
-        {
-            cacheEventQueue.addDisposeEvent();
-        }
-        catch ( final IOException e )
-        {
-            log.error( "Problem adding DisposeEvent to queue.", e );
-            cacheEventQueue.destroy();
-        }
-    }
-
-    /**
-     * No remote invocation.
-     * <p>
-     * @return The size value
-     */
-    @Override
-    public int getSize()
-    {
-        return remoteCacheClient.getSize();
-    }
-
-    /**
-     * No remote invocation.
-     * <p>
-     * @return The cacheType value
-     */
-    @Override
-    public CacheType getCacheType()
-    {
-        return CacheType.REMOTE_CACHE;
-    }
-
-    /**
-     * Returns the async cache status. An error status indicates either the remote connection is not
-     * available, or the asyn queue has been unexpectedly destroyed. No remote invocation.
-     * <p>
-     * @return The status value
-     */
-    @Override
-    public CacheStatus getStatus()
-    {
-        return cacheEventQueue.isWorking() ? remoteCacheClient.getStatus() : CacheStatus.ERROR;
-    }
-
-    /**
-     * Gets the cacheName attribute of the RemoteCacheNoWait object
-     * <p>
-     * @return The cacheName value
-     */
-    @Override
-    public String getCacheName()
-    {
-        return remoteCacheClient.getCacheName();
-    }
-
-    /**
-     * Replaces the remote cache service handle with the given handle and reset the event queue by
-     * starting up a new instance.
-     * <p>
-     * @param remote
-     */
-    public void fixCache( final ICacheServiceNonLocal<?, ?> remote )
-    {
-        remoteCacheClient.fixCache( remote );
-        resetEventQ();
-    }
-
-    /**
-     * Resets the event q by first destroying the existing one and starting up new one.
-     * <p>
-     * There may be no good reason to kill the existing queue. We will sometimes need to set a new
-     * listener id, so we should create a new queue. We should let the old queue drain. If we were
-     * Connected to the failover, it would be best to finish sending items.
-     */
-    public void resetEventQ()
-    {
-        final ICacheEventQueue<K, V> previousQueue = cacheEventQueue;
-
-        this.cacheEventQueue = createCacheEventQueue(this.remoteCacheClient);
-
-        if ( previousQueue.isWorking() )
-        {
-            // we don't expect anything, it would have all gone to the zombie
-            log.info( "resetEventQ, previous queue has [{0}] items queued up.",
-                    previousQueue::size);
-            previousQueue.destroy();
-        }
-    }
-
-    /**
-     * This is temporary. It allows the manager to get the lister.
-     * <p>
-     * @return the instance of the remote cache client used by this object
-     */
-    protected IRemoteCacheClient<K, V> getRemoteCache()
-    {
-        return remoteCacheClient;
-    }
-
-    /**
-     * @return Returns the AuxiliaryCacheAttributes.
-     */
-    @Override
-    public AuxiliaryCacheAttributes getAuxiliaryCacheAttributes()
-    {
-        return remoteCacheClient.getAuxiliaryCacheAttributes();
-    }
-
-    /**
-     * This is for testing only. It allows you to take a look at the event queue.
-     * <p>
-     * @return ICacheEventQueue
-     */
-    protected ICacheEventQueue<K, V> getCacheEventQueue()
-    {
-        return this.cacheEventQueue;
-    }
-
-    /**
-     * Returns the stats and the cache.toString().
-     * <p>
-     * @see java.lang.Object#toString()
-     */
-    @Override
-    public String toString()
-    {
-        return getStats() + "\n" + remoteCacheClient.toString();
-    }
-
-    /**
-     * Returns the statistics in String form.
-     * <p>
-     * @return String
-     */
-    @Override
-    public String getStats()
-    {
-        return getStatistics().toString();
-    }
-
-    /**
-     * @return statistics about this communication
-     */
-    @Override
-    public IStats getStatistics()
-    {
-        final IStats stats = new Stats();
-        stats.setTypeName( "Remote Cache No Wait" );
-
-        final ArrayList<IStatElement<?>> elems = new ArrayList<>();
-
-        elems.add(new StatElement<>( "Status", getStatus() ) );
-
-        // get the stats from the cache queue too
-        final IStats cStats = this.remoteCacheClient.getStatistics();
-        if ( cStats != null )
-        {
-            elems.addAll(cStats.getStatElements());
-        }
-
-        // get the stats from the event queue too
-        final IStats eqStats = this.cacheEventQueue.getStatistics();
-        elems.addAll(eqStats.getStatElements());
-
-        elems.add(new StatElement<>( "Get Count", Integer.valueOf(this.getCount) ) );
-        elems.add(new StatElement<>( "GetMatching Count", Integer.valueOf(this.getMatchingCount) ) );
-        elems.add(new StatElement<>( "GetMultiple Count", Integer.valueOf(this.getMultipleCount) ) );
-        elems.add(new StatElement<>( "Remove Count", Integer.valueOf(this.removeCount) ) );
-        elems.add(new StatElement<>( "Put Count", Integer.valueOf(this.putCount) ) );
-
-        stats.setStatElements( elems );
-
-        return stats;
-    }
-
-    /**
-     * this won't be called since we don't do ICache logging here.
-     * <p>
-     * @return String
-     */
-    @Override
-    public String getEventLoggingExtraInfo()
-    {
-        return "Remote Cache No Wait";
-    }
-}
+package org.apache.commons.jcs3.auxiliary.remote;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.rmi.UnmarshalException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.jcs3.auxiliary.AbstractAuxiliaryCache;
+import org.apache.commons.jcs3.auxiliary.AuxiliaryCacheAttributes;
+import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheClient;
+import org.apache.commons.jcs3.engine.CacheAdaptor;
+import org.apache.commons.jcs3.engine.CacheEventQueueFactory;
+import org.apache.commons.jcs3.engine.CacheStatus;
+import org.apache.commons.jcs3.engine.behavior.ICacheElement;
+import org.apache.commons.jcs3.engine.behavior.ICacheEventQueue;
+import org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal;
+import org.apache.commons.jcs3.engine.stats.StatElement;
+import org.apache.commons.jcs3.engine.stats.Stats;
+import org.apache.commons.jcs3.engine.stats.behavior.IStatElement;
+import org.apache.commons.jcs3.engine.stats.behavior.IStats;
+import org.apache.commons.jcs3.log.Log;
+import org.apache.commons.jcs3.log.LogManager;
+
+/**
+ * The RemoteCacheNoWait wraps the RemoteCacheClient. The client holds a handle on the
+ * RemoteCacheService.
+ * <p>
+ * Used to queue up update requests to the underlying cache. These requests will be processed in
+ * their order of arrival via the cache event queue processor.
+ * <p>
+ * Typically errors will be handled down stream. We only need to kill the queue if an error makes it
+ * to this level from the queue. That can only happen if the queue is damaged, since the events are
+ * Processed asynchronously.
+ * <p>
+ * There is no reason to create a queue on startup if the remote is not healthy.
+ * <p>
+ * If the remote cache encounters an error it will zombie--create a balking facade for the service.
+ * The Zombie will queue up items until the connection is restored. An alternative way to accomplish
+ * the same thing would be to stop, not destroy the queue at this level. That way items would be
+ * added to the queue and then when the connection is restored, we could start the worker threads
+ * again. This is a better long term solution, but it requires some significant changes to the
+ * complicated worker queues.
+ */
+public class RemoteCacheNoWait<K, V>
+    extends AbstractAuxiliaryCache<K, V>
+{
+    /** log instance */
+    private static final Log log = LogManager.getLog( RemoteCacheNoWait.class );
+
+    /** The remote cache client */
+    private final IRemoteCacheClient<K, V> remoteCacheClient;
+
+    /** Event queue for queuing up calls like put and remove. */
+    private ICacheEventQueue<K, V> cacheEventQueue;
+
+    /** how many times get has been called. */
+    private int getCount;
+
+    /** how many times getMatching has been called. */
+    private int getMatchingCount;
+
+    /** how many times getMultiple has been called. */
+    private int getMultipleCount;
+
+    /** how many times remove has been called. */
+    private int removeCount;
+
+    /** how many times put has been called. */
+    private int putCount;
+
+    /**
+     * Constructs with the given remote cache, and fires up an event queue for asynchronous
+     * processing.
+     * <p>
+     * @param cache
+     */
+    public RemoteCacheNoWait( final IRemoteCacheClient<K, V> cache )
+    {
+        this.remoteCacheClient = cache;
+        this.cacheEventQueue = createCacheEventQueue(cache);
+
+        if ( remoteCacheClient.getStatus() == CacheStatus.ERROR )
+        {
+            cacheEventQueue.destroy();
+        }
+    }
+
+    /**
+     * Create a cache event queue from the parameters of the remote client
+     * @param client the remote client
+     */
+    private ICacheEventQueue<K, V> createCacheEventQueue( final IRemoteCacheClient<K, V> client )
+    {
+        final CacheEventQueueFactory<K, V> factory = new CacheEventQueueFactory<>();
+        return factory.createCacheEventQueue(
+            new CacheAdaptor<>( client ),
+            client.getListenerId(),
+            client.getCacheName(),
+            client.getAuxiliaryCacheAttributes().getEventQueuePoolName(),
+            client.getAuxiliaryCacheAttributes().getEventQueueType() );
+    }
+
+    /**
+     * Adds a put event to the queue.
+     * <p>
+     * @param element
+     * @throws IOException
+     */
+    @Override
+    public void update( final ICacheElement<K, V> element )
+        throws IOException
+    {
+        putCount++;
+        try
+        {
+            cacheEventQueue.addPutEvent( element );
+        }
+        catch ( final IOException e )
+        {
+            log.error( "Problem adding putEvent to queue.", e );
+            cacheEventQueue.destroy();
+            throw e;
+        }
+    }
+
+    /**
+     * Synchronously reads from the remote cache.
+     * <p>
+     * @param key
+     * @return element from the remote cache, or null if not present
+     * @throws IOException
+     */
+    @Override
+    public ICacheElement<K, V> get( final K key )
+        throws IOException
+    {
+        getCount++;
+        try
+        {
+            return remoteCacheClient.get( key );
+        }
+        catch ( final UnmarshalException ue )
+        {
+            log.debug( "Retrying the get owing to UnmarshalException." );
+
+            try
+            {
+                return remoteCacheClient.get( key );
+            }
+            catch ( final IOException ex )
+            {
+                log.info( "Failed in retrying the get for the second time. ", ex );
+            }
+        }
+        catch ( final IOException ex )
+        {
+            // We don't want to destroy the queue on a get failure.
+            // The RemoteCache will Zombie and queue.
+            // Since get does not use the queue, I don't want to kill the queue.
+            throw ex;
+        }
+
+        return null;
+    }
+
+    /**
+     * @param pattern
+     * @return Map
+     * @throws IOException
+     *
+     */
+    @Override
+    public Map<K, ICacheElement<K, V>> getMatching( final String pattern )
+        throws IOException
+    {
+        getMatchingCount++;
+        try
+        {
+            return remoteCacheClient.getMatching( pattern );
+        }
+        catch ( final UnmarshalException ue )
+        {
+            log.debug( "Retrying the getMatching owing to UnmarshalException." );
+
+            try
+            {
+                return remoteCacheClient.getMatching( pattern );
+            }
+            catch ( final IOException ex )
+            {
+                log.info( "Failed in retrying the getMatching for the second time.", ex );
+            }
+        }
+        catch ( final IOException ex )
+        {
+            // We don't want to destroy the queue on a get failure.
+            // The RemoteCache will Zombie and queue.
+            // Since get does not use the queue, I don't want to kill the queue.
+            throw ex;
+        }
+
+        return Collections.emptyMap();
+    }
+
+    /**
+     * Gets multiple items from the cache based on the given set of keys. Sends the getMultiple
+     * request on to the server rather than looping through the requested keys.
+     * <p>
+     * @param keys
+     * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
+     *         data in cache for any of these keys
+     * @throws IOException
+     */
+    @Override
+    public Map<K, ICacheElement<K, V>> getMultiple( final Set<K> keys )
+        throws IOException
+    {
+        getMultipleCount++;
+        try
+        {
+            return remoteCacheClient.getMultiple( keys );
+        }
+        catch ( final UnmarshalException ue )
+        {
+            log.debug( "Retrying the getMultiple owing to UnmarshalException..." );
+
+            try
+            {
+                return remoteCacheClient.getMultiple( keys );
+            }
+            catch ( final IOException ex )
+            {
+                log.info( "Failed in retrying the getMultiple for the second time.", ex );
+            }
+        }
+        catch ( final IOException ex )
+        {
+            // We don't want to destroy the queue on a get failure.
+            // The RemoteCache will Zombie and queue.
+            // Since get does not use the queue, I don't want to kill the queue.
+            throw ex;
+        }
+
+        return new HashMap<>();
+    }
+
+    /**
+     * Return the keys in this cache.
+     * <p>
+     * @see org.apache.commons.jcs3.auxiliary.AuxiliaryCache#getKeySet()
+     */
+    @Override
+    public Set<K> getKeySet() throws IOException
+    {
+        return remoteCacheClient.getKeySet();
+    }
+
+    /**
+     * Adds a remove request to the remote cache.
+     * <p>
+     * @param key
+     * @return if this was successful
+     * @throws IOException
+     */
+    @Override
+    public boolean remove( final K key )
+        throws IOException
+    {
+        removeCount++;
+        try
+        {
+            cacheEventQueue.addRemoveEvent( key );
+        }
+        catch ( final IOException e )
+        {
+            log.error( "Problem adding RemoveEvent to queue.", e );
+            cacheEventQueue.destroy();
+            throw e;
+        }
+        return false;
+    }
+
+    /**
+     * Adds a removeAll request to the remote cache.
+     * <p>
+     * @throws IOException
+     */
+    @Override
+    public void removeAll()
+        throws IOException
+    {
+        try
+        {
+            cacheEventQueue.addRemoveAllEvent();
+        }
+        catch ( final IOException e )
+        {
+            log.error( "Problem adding RemoveAllEvent to queue.", e );
+            cacheEventQueue.destroy();
+            throw e;
+        }
+    }
+
+    /** Adds a dispose request to the remote cache. */
+    @Override
+    public void dispose()
+    {
+        try
+        {
+            cacheEventQueue.addDisposeEvent();
+        }
+        catch ( final IOException e )
+        {
+            log.error( "Problem adding DisposeEvent to queue.", e );
+            cacheEventQueue.destroy();
+        }
+    }
+
+    /**
+     * No remote invocation.
+     * <p>
+     * @return The size value
+     */
+    @Override
+    public int getSize()
+    {
+        return remoteCacheClient.getSize();
+    }
+
+    /**
+     * No remote invocation.
+     * <p>
+     * @return The cacheType value
+     */
+    @Override
+    public CacheType getCacheType()
+    {
+        return CacheType.REMOTE_CACHE;
+    }
+
+    /**
+     * Returns the async cache status. An error status indicates either the remote connection is not
+     * available, or the asyn queue has been unexpectedly destroyed. No remote invocation.
+     * <p>
+     * @return The status value
+     */
+    @Override
+    public CacheStatus getStatus()
+    {
+        return cacheEventQueue.isWorking() ? remoteCacheClient.getStatus() : CacheStatus.ERROR;
+    }
+
+    /**
+     * Gets the cacheName attribute of the RemoteCacheNoWait object
+     * <p>
+     * @return The cacheName value
+     */
+    @Override
+    public String getCacheName()
+    {
+        return remoteCacheClient.getCacheName();
+    }
+
+    /**
+     * Replaces the remote cache service handle with the given handle and reset the event queue by
+     * starting up a new instance.
+     * <p>
+     * @param remote
+     */
+    public void fixCache( final ICacheServiceNonLocal<?, ?> remote )
+    {
+        remoteCacheClient.fixCache( remote );
+        resetEventQ();
+    }
+
+    /**
+     * Resets the event q by first destroying the existing one and starting up new one.
+     * <p>
+     * There may be no good reason to kill the existing queue. We will sometimes need to set a new
+     * listener id, so we should create a new queue. We should let the old queue drain. If we were
+     * Connected to the failover, it would be best to finish sending items.
+     */
+    public void resetEventQ()
+    {
+        final ICacheEventQueue<K, V> previousQueue = cacheEventQueue;
+
+        this.cacheEventQueue = createCacheEventQueue(this.remoteCacheClient);
+
+        if ( previousQueue.isWorking() )
+        {
+            // we don't expect anything, it would have all gone to the zombie
+            log.info( "resetEventQ, previous queue has [{0}] items queued up.",
+                    previousQueue::size);
+            previousQueue.destroy();
+        }
+    }
+
+    /**
+     * This is temporary. It allows the manager to get the lister.
+     * <p>
+     * @return the instance of the remote cache client used by this object
+     */
+    protected IRemoteCacheClient<K, V> getRemoteCache()
+    {
+        return remoteCacheClient;
+    }
+
+    /**
+     * @return Returns the AuxiliaryCacheAttributes.
+     */
+    @Override
+    public AuxiliaryCacheAttributes getAuxiliaryCacheAttributes()
+    {
+        return remoteCacheClient.getAuxiliaryCacheAttributes();
+    }
+
+    /**
+     * This is for testing only. It allows you to take a look at the event queue.
+     * <p>
+     * @return ICacheEventQueue
+     */
+    protected ICacheEventQueue<K, V> getCacheEventQueue()
+    {
+        return this.cacheEventQueue;
+    }
+
+    /**
+     * Returns the stats and the cache.toString().
+     * <p>
+     * @see Object#toString()
+     */
+    @Override
+    public String toString()
+    {
+        return getStats() + "\n" + remoteCacheClient.toString();
+    }
+
+    /**
+     * Returns the statistics in String form.
+     * <p>
+     * @return String
+     */
+    @Override
+    public String getStats()
+    {
+        return getStatistics().toString();
+    }
+
+    /**
+     * @return statistics about this communication
+     */
+    @Override
+    public IStats getStatistics()
+    {
+        final IStats stats = new Stats();
+        stats.setTypeName( "Remote Cache No Wait" );
+
+        final ArrayList<IStatElement<?>> elems = new ArrayList<>();
+
+        elems.add(new StatElement<>( "Status", getStatus() ) );
+
+        // get the stats from the cache queue too
+        final IStats cStats = this.remoteCacheClient.getStatistics();
+        if ( cStats != null )
+        {
+            elems.addAll(cStats.getStatElements());
+        }
+
+        // get the stats from the event queue too
+        final IStats eqStats = this.cacheEventQueue.getStatistics();
+        elems.addAll(eqStats.getStatElements());
+
+        elems.add(new StatElement<>( "Get Count", Integer.valueOf(this.getCount) ) );
+        elems.add(new StatElement<>( "GetMatching Count", Integer.valueOf(this.getMatchingCount) ) );
+        elems.add(new StatElement<>( "GetMultiple Count", Integer.valueOf(this.getMultipleCount) ) );
+        elems.add(new StatElement<>( "Remove Count", Integer.valueOf(this.removeCount) ) );
+        elems.add(new StatElement<>( "Put Count", Integer.valueOf(this.putCount) ) );
+
+        stats.setStatElements( elems );
+
+        return stats;
+    }
+
+    /**
+     * this won't be called since we don't do ICache logging here.
+     * <p>
+     * @return String
+     */
+    @Override
+    public String getEventLoggingExtraInfo()
+    {
+        return "Remote Cache No Wait";
+    }
+}
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteLocation.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteLocation.java
index 7e6f9f79..52c69fa1 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteLocation.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteLocation.java
@@ -103,7 +103,7 @@ public final class RemoteLocation
     }
 
     /**
-     * @see java.lang.Object#toString()
+     * @see Object#toString()
      */
     @Override
     public String toString()
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/CompositeCacheAttributes.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/CompositeCacheAttributes.java
index f98df335..f304a86a 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/CompositeCacheAttributes.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/CompositeCacheAttributes.java
@@ -1,442 +1,442 @@
-package org.apache.commons.jcs3.engine;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.commons.jcs3.engine.behavior.ICompositeCacheAttributes;
-
-/**
- * The CompositeCacheAttributes defines the general cache region settings. If a region is not
- * explicitly defined in the cache.ccf then it inherits the cache default settings.
- * <p>
- * If all the default attributes are not defined in the default region definition in the cache.ccf,
- * the hard coded defaults will be used.
- */
-public class CompositeCacheAttributes
-    implements ICompositeCacheAttributes
-{
-    /** Don't change */
-    private static final long serialVersionUID = 6754049978134196787L;
-
-    /** default lateral switch */
-    private static final boolean DEFAULT_USE_LATERAL = true;
-
-    /** default remote switch */
-    private static final boolean DEFAULT_USE_REMOTE = true;
-
-    /** default disk switch */
-    private static final boolean DEFAULT_USE_DISK = true;
-
-    /** default shrinker setting */
-    private static final boolean DEFAULT_USE_SHRINKER = false;
-
-    /** default max objects value */
-    private static final int DEFAULT_MAX_OBJECTS = 100;
-
-    /** default */
-    private static final int DEFAULT_MAX_MEMORY_IDLE_TIME_SECONDS = 60 * 120;
-
-    /** default interval to run the shrinker */
-    private static final int DEFAULT_SHRINKER_INTERVAL_SECONDS = 30;
-
-    /** default */
-    private static final int DEFAULT_MAX_SPOOL_PER_RUN = -1;
-
-    /** default */
-    private static final String DEFAULT_MEMORY_CACHE_NAME = "org.apache.commons.jcs3.engine.memory.lru.LRUMemoryCache";
-
-    /** Default number to send to disk at a time when memory fills. */
-    private static final int DEFAULT_CHUNK_SIZE = 2;
-
-    /** allow lateral caches */
-    private boolean useLateral = DEFAULT_USE_LATERAL;
-
-    /** allow remote caches */
-    private boolean useRemote = DEFAULT_USE_REMOTE;
-
-    /** Whether we should use a disk cache if it is configured. */
-    private boolean useDisk = DEFAULT_USE_DISK;
-
-    /** Whether or not we should run the memory shrinker thread. */
-    private boolean useMemoryShrinker = DEFAULT_USE_SHRINKER;
-
-    /** The maximum objects that the memory cache will be allowed to hold. */
-    private int maxObjs = DEFAULT_MAX_OBJECTS;
-
-    /** maxMemoryIdleTimeSeconds */
-    private long maxMemoryIdleTimeSeconds = DEFAULT_MAX_MEMORY_IDLE_TIME_SECONDS;
-
-    /** shrinkerIntervalSeconds */
-    private long shrinkerIntervalSeconds = DEFAULT_SHRINKER_INTERVAL_SECONDS;
-
-    /** The maximum number the shrinker will spool to disk per run. */
-    private int maxSpoolPerRun = DEFAULT_MAX_SPOOL_PER_RUN;
-
-    /** The name of this cache region. */
-    private String cacheName;
-
-    /** The name of the memory cache implementation class. */
-    private String memoryCacheName;
-
-    /** Set via DISK_USAGE_PATTERN_NAME */
-    private DiskUsagePattern diskUsagePattern = DiskUsagePattern.SWAP;
-
-    /** How many to spool to disk at a time. */
-    private int spoolChunkSize = DEFAULT_CHUNK_SIZE;
-
-    /**
-     * Constructor for the CompositeCacheAttributes object
-     */
-    public CompositeCacheAttributes()
-    {
-        // set this as the default so the configuration is a bit simpler
-        memoryCacheName = DEFAULT_MEMORY_CACHE_NAME;
-    }
-
-    /**
-     * Sets the maxObjects attribute of the CompositeCacheAttributes object
-     * <p>
-     * @param maxObjs The new maxObjects value
-     */
-    @Override
-    public void setMaxObjects( final int maxObjs )
-    {
-        this.maxObjs = maxObjs;
-    }
-
-    /**
-     * Gets the maxObjects attribute of the CompositeCacheAttributes object
-     * <p>
-     * @return The maxObjects value
-     */
-    @Override
-    public int getMaxObjects()
-    {
-        return this.maxObjs;
-    }
-
-    /**
-     * Sets the useDisk attribute of the CompositeCacheAttributes object
-     * <p>
-     * @param useDisk The new useDisk value
-     */
-    @Override
-    public void setUseDisk( final boolean useDisk )
-    {
-        this.useDisk = useDisk;
-    }
-
-    /**
-     * Gets the useDisk attribute of the CompositeCacheAttributes object
-     * <p>
-     * @return The useDisk value
-     */
-    @Override
-    public boolean isUseDisk()
-    {
-        return useDisk;
-    }
-
-    /**
-     * Sets the useLateral attribute of the CompositeCacheAttributes object
-     * <p>
-     * @param b The new useLateral value
-     */
-    @Override
-    public void setUseLateral( final boolean b )
-    {
-        this.useLateral = b;
-    }
-
-    /**
-     * Gets the useLateral attribute of the CompositeCacheAttributes object
-     * <p>
-     * @return The useLateral value
-     */
-    @Override
-    public boolean isUseLateral()
-    {
-        return this.useLateral;
-    }
-
-    /**
-     * Sets the useRemote attribute of the CompositeCacheAttributes object
-     * <p>
-     * @param useRemote The new useRemote value
-     */
-    @Override
-    public void setUseRemote( final boolean useRemote )
-    {
-        this.useRemote = useRemote;
-    }
-
-    /**
-     * Gets the useRemote attribute of the CompositeCacheAttributes object
-     * <p>
-     * @return The useRemote value
-     */
-    @Override
-    public boolean isUseRemote()
-    {
-        return this.useRemote;
-    }
-
-    /**
-     * Sets the cacheName attribute of the CompositeCacheAttributes object
-     * <p>
-     * @param s The new cacheName value
-     */
-    @Override
-    public void setCacheName( final String s )
-    {
-        this.cacheName = s;
-    }
-
-    /**
-     * Gets the cacheName attribute of the CompositeCacheAttributes object
-     * <p>
-     * @return The cacheName value
-     */
-    @Override
-    public String getCacheName()
-    {
-        return this.cacheName;
-    }
-
-    /**
-     * Sets the memoryCacheName attribute of the CompositeCacheAttributes object
-     * <p>
-     * @param s The new memoryCacheName value
-     */
-    @Override
-    public void setMemoryCacheName( final String s )
-    {
-        this.memoryCacheName = s;
-    }
-
-    /**
-     * Gets the memoryCacheName attribute of the CompositeCacheAttributes object
-     * <p>
-     * @return The memoryCacheName value
-     */
-    @Override
-    public String getMemoryCacheName()
-    {
-        return this.memoryCacheName;
-    }
-
-    /**
-     * Whether the memory cache should perform background memory shrinkage.
-     * <p>
-     * @param useShrinker The new UseMemoryShrinker value
-     */
-    @Override
-    public void setUseMemoryShrinker( final boolean useShrinker )
-    {
-        this.useMemoryShrinker = useShrinker;
-    }
-
-    /**
-     * Whether the memory cache should perform background memory shrinkage.
-     * <p>
-     * @return The UseMemoryShrinker value
-     */
-    @Override
-    public boolean isUseMemoryShrinker()
-    {
-        return this.useMemoryShrinker;
-    }
-
-    /**
-     * If UseMemoryShrinker is true the memory cache should auto-expire elements to reclaim space.
-     * <p>
-     * @param seconds The new MaxMemoryIdleTimeSeconds value
-     */
-    @Override
-    public void setMaxMemoryIdleTimeSeconds( final long seconds )
-    {
-        this.maxMemoryIdleTimeSeconds = seconds;
-    }
-
-    /**
-     * If UseMemoryShrinker is true the memory cache should auto-expire elements to reclaim space.
-     * <p>
-     * @return The MaxMemoryIdleTimeSeconds value
-     */
-    @Override
-    public long getMaxMemoryIdleTimeSeconds()
-    {
-        return this.maxMemoryIdleTimeSeconds;
-    }
-
-    /**
-     * If UseMemoryShrinker is true the memory cache should auto-expire elements to reclaim space.
-     * This sets the shrinker interval.
-     * <p>
-     * @param seconds The new ShrinkerIntervalSeconds value
-     */
-    @Override
-    public void setShrinkerIntervalSeconds( final long seconds )
-    {
-        this.shrinkerIntervalSeconds = seconds;
-    }
-
-    /**
-     * If UseMemoryShrinker is true the memory cache should auto-expire elements to reclaim space.
-     * This gets the shrinker interval.
-     * <p>
-     * @return The ShrinkerIntervalSeconds value
-     */
-    @Override
-    public long getShrinkerIntervalSeconds()
-    {
-        return this.shrinkerIntervalSeconds;
-    }
-
-    /**
-     * If UseMemoryShrinker is true the memory cache should auto-expire elements to reclaim space.
-     * This sets the maximum number of items to spool per run.
-     * <p>
-     * If the value is -1, then there is no limit to the number of items to be spooled.
-     * <p>
-     * @param maxSpoolPerRun The new maxSpoolPerRun value
-     */
-    @Override
-    public void setMaxSpoolPerRun( final int maxSpoolPerRun )
-    {
-        this.maxSpoolPerRun = maxSpoolPerRun;
-    }
-
-    /**
-     * If UseMemoryShrinker is true the memory cache should auto-expire elements to reclaim space.
-     * This gets the maximum number of items to spool per run.
-     * <p>
-     * @return The maxSpoolPerRun value
-     */
-    @Override
-    public int getMaxSpoolPerRun()
-    {
-        return this.maxSpoolPerRun;
-    }
-
-    /**
-     * By default this is SWAP_ONLY.
-     * <p>
-     * @param diskUsagePattern The diskUsagePattern to set.
-     */
-    @Override
-    public void setDiskUsagePattern( final DiskUsagePattern diskUsagePattern )
-    {
-        this.diskUsagePattern = diskUsagePattern;
-    }
-
-    /**
-     * Translates the name to the disk usage pattern short value.
-     * <p>
-     * The allowed values are SWAP and UPDATE.
-     * <p>
-     * @param diskUsagePatternName The diskUsagePattern to set.
-     */
-    @Override
-    public void setDiskUsagePatternName( final String diskUsagePatternName )
-    {
-        if ( diskUsagePatternName != null )
-        {
-            final String name = diskUsagePatternName.toUpperCase().trim();
-            if ( name.startsWith( "SWAP" ) )
-            {
-                this.setDiskUsagePattern( DiskUsagePattern.SWAP );
-            }
-            else if ( name.startsWith( "UPDATE" ) )
-            {
-                this.setDiskUsagePattern( DiskUsagePattern.UPDATE );
-            }
-        }
-    }
-
-    /**
-     * Number to send to disk at at time when memory is full.
-     * <p>
-     * @return int
-     */
-    @Override
-    public int getSpoolChunkSize()
-    {
-        return spoolChunkSize;
-    }
-
-    /**
-     * Number to send to disk at a time.
-     * <p>
-     * @param spoolChunkSize
-     */
-    @Override
-    public void setSpoolChunkSize( final int spoolChunkSize )
-    {
-        this.spoolChunkSize = spoolChunkSize;
-    }
-
-    /**
-     * @return Returns the diskUsagePattern.
-     */
-    @Override
-    public DiskUsagePattern getDiskUsagePattern()
-    {
-        return diskUsagePattern;
-    }
-
-    /**
-     * Dumps the core attributes.
-     * <p>
-     * @return For debugging.
-     */
-    @Override
-    public String toString()
-    {
-        final StringBuilder dump = new StringBuilder();
-
-        dump.append( "[ " );
-        dump.append( "useLateral = " ).append( useLateral );
-        dump.append( ", useRemote = " ).append( useRemote );
-        dump.append( ", useDisk = " ).append( useDisk );
-        dump.append( ", maxObjs = " ).append( maxObjs );
-        dump.append( ", maxSpoolPerRun = " ).append( maxSpoolPerRun );
-        dump.append( ", diskUsagePattern = " ).append( diskUsagePattern );
-        dump.append( ", spoolChunkSize = " ).append( spoolChunkSize );
-        dump.append( " ]" );
-
-        return dump.toString();
-    }
-
-    /**
-     * @see java.lang.Object#clone()
-     */
-    @Override
-    public ICompositeCacheAttributes clone()
-    {
-        try
-        {
-            return (ICompositeCacheAttributes)super.clone();
-        }
-        catch (final CloneNotSupportedException e)
-        {
-            throw new RuntimeException("Clone not supported. This should never happen.", e);
-        }
-    }
-}
+package org.apache.commons.jcs3.engine;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.commons.jcs3.engine.behavior.ICompositeCacheAttributes;
+
+/**
+ * The CompositeCacheAttributes defines the general cache region settings. If a region is not
+ * explicitly defined in the cache.ccf then it inherits the cache default settings.
+ * <p>
+ * If all the default attributes are not defined in the default region definition in the cache.ccf,
+ * the hard coded defaults will be used.
+ */
+public class CompositeCacheAttributes
+    implements ICompositeCacheAttributes
+{
+    /** Don't change */
+    private static final long serialVersionUID = 6754049978134196787L;
+
+    /** default lateral switch */
+    private static final boolean DEFAULT_USE_LATERAL = true;
+
+    /** default remote switch */
+    private static final boolean DEFAULT_USE_REMOTE = true;
+
+    /** default disk switch */
+    private static final boolean DEFAULT_USE_DISK = true;
+
+    /** default shrinker setting */
+    private static final boolean DEFAULT_USE_SHRINKER = false;
+
+    /** default max objects value */
+    private static final int DEFAULT_MAX_OBJECTS = 100;
+
+    /** default */
+    private static final int DEFAULT_MAX_MEMORY_IDLE_TIME_SECONDS = 60 * 120;
+
+    /** default interval to run the shrinker */
+    private static final int DEFAULT_SHRINKER_INTERVAL_SECONDS = 30;
+
+    /** default */
+    private static final int DEFAULT_MAX_SPOOL_PER_RUN = -1;
+
+    /** default */
+    private static final String DEFAULT_MEMORY_CACHE_NAME = "org.apache.commons.jcs3.engine.memory.lru.LRUMemoryCache";
+
+    /** Default number to send to disk at a time when memory fills. */
+    private static final int DEFAULT_CHUNK_SIZE = 2;
+
+    /** allow lateral caches */
+    private boolean useLateral = DEFAULT_USE_LATERAL;
+
+    /** allow remote caches */
+    private boolean useRemote = DEFAULT_USE_REMOTE;
+
+    /** Whether we should use a disk cache if it is configured. */
+    private boolean useDisk = DEFAULT_USE_DISK;
+
+    /** Whether or not we should run the memory shrinker thread. */
+    private boolean useMemoryShrinker = DEFAULT_USE_SHRINKER;
+
+    /** The maximum objects that the memory cache will be allowed to hold. */
+    private int maxObjs = DEFAULT_MAX_OBJECTS;
+
+    /** maxMemoryIdleTimeSeconds */
+    private long maxMemoryIdleTimeSeconds = DEFAULT_MAX_MEMORY_IDLE_TIME_SECONDS;
+
+    /** shrinkerIntervalSeconds */
+    private long shrinkerIntervalSeconds = DEFAULT_SHRINKER_INTERVAL_SECONDS;
+
+    /** The maximum number the shrinker will spool to disk per run. */
+    private int maxSpoolPerRun = DEFAULT_MAX_SPOOL_PER_RUN;
+
+    /** The name of this cache region. */
+    private String cacheName;
+
+    /** The name of the memory cache implementation class. */
+    private String memoryCacheName;
+
+    /** Set via DISK_USAGE_PATTERN_NAME */
+    private DiskUsagePattern diskUsagePattern = DiskUsagePattern.SWAP;
+
+    /** How many to spool to disk at a time. */
+    private int spoolChunkSize = DEFAULT_CHUNK_SIZE;
+
+    /**
+     * Constructor for the CompositeCacheAttributes object
+     */
+    public CompositeCacheAttributes()
+    {
+        // set this as the default so the configuration is a bit simpler
+        memoryCacheName = DEFAULT_MEMORY_CACHE_NAME;
+    }
+
+    /**
+     * Sets the maxObjects attribute of the CompositeCacheAttributes object
+     * <p>
+     * @param maxObjs The new maxObjects value
+     */
+    @Override
+    public void setMaxObjects( final int maxObjs )
+    {
+        this.maxObjs = maxObjs;
+    }
+
+    /**
+     * Gets the maxObjects attribute of the CompositeCacheAttributes object
+     * <p>
+     * @return The maxObjects value
+     */
+    @Override
+    public int getMaxObjects()
+    {
+        return this.maxObjs;
+    }
+
+    /**
+     * Sets the useDisk attribute of the CompositeCacheAttributes object
+     * <p>
+     * @param useDisk The new useDisk value
+     */
+    @Override
+    public void setUseDisk( final boolean useDisk )
+    {
+        this.useDisk = useDisk;
+    }
+
+    /**
+     * Gets the useDisk attribute of the CompositeCacheAttributes object
+     * <p>
+     * @return The useDisk value
+     */
+    @Override
+    public boolean isUseDisk()
+    {
+        return useDisk;
+    }
+
+    /**
+     * Sets the useLateral attribute of the CompositeCacheAttributes object
+     * <p>
+     * @param b The new useLateral value
+     */
+    @Override
+    public void setUseLateral( final boolean b )
+    {
+        this.useLateral = b;
+    }
+
+    /**
+     * Gets the useLateral attribute of the CompositeCacheAttributes object
+     * <p>
+     * @return The useLateral value
+     */
+    @Override
+    public boolean isUseLateral()
+    {
+        return this.useLateral;
+    }
+
+    /**
+     * Sets the useRemote attribute of the CompositeCacheAttributes object
+     * <p>
+     * @param useRemote The new useRemote value
+     */
+    @Override
+    public void setUseRemote( final boolean useRemote )
+    {
+        this.useRemote = useRemote;
+    }
+
+    /**
+     * Gets the useRemote attribute of the CompositeCacheAttributes object
+     * <p>
+     * @return The useRemote value
+     */
+    @Override
+    public boolean isUseRemote()
+    {
+        return this.useRemote;
+    }
+
+    /**
+     * Sets the cacheName attribute of the CompositeCacheAttributes object
+     * <p>
+     * @param s The new cacheName value
+     */
+    @Override
+    public void setCacheName( final String s )
+    {
+        this.cacheName = s;
+    }
+
+    /**
+     * Gets the cacheName attribute of the CompositeCacheAttributes object
+     * <p>
+     * @return The cacheName value
+     */
+    @Override
+    public String getCacheName()
+    {
+        return this.cacheName;
+    }
+
+    /**
+     * Sets the memoryCacheName attribute of the CompositeCacheAttributes object
+     * <p>
+     * @param s The new memoryCacheName value
+     */
+    @Override
+    public void setMemoryCacheName( final String s )
+    {
+        this.memoryCacheName = s;
+    }
+
+    /**
+     * Gets the memoryCacheName attribute of the CompositeCacheAttributes object
+     * <p>
+     * @return The memoryCacheName value
+     */
+    @Override
+    public String getMemoryCacheName()
+    {
+        return this.memoryCacheName;
+    }
+
+    /**
+     * Whether the memory cache should perform background memory shrinkage.
+     * <p>
+     * @param useShrinker The new UseMemoryShrinker value
+     */
+    @Override
+    public void setUseMemoryShrinker( final boolean useShrinker )
+    {
+        this.useMemoryShrinker = useShrinker;
+    }
+
+    /**
+     * Whether the memory cache should perform background memory shrinkage.
+     * <p>
+     * @return The UseMemoryShrinker value
+     */
+    @Override
+    public boolean isUseMemoryShrinker()
+    {
+        return this.useMemoryShrinker;
+    }
+
+    /**
+     * If UseMemoryShrinker is true the memory cache should auto-expire elements to reclaim space.
+     * <p>
+     * @param seconds The new MaxMemoryIdleTimeSeconds value
+     */
+    @Override
+    public void setMaxMemoryIdleTimeSeconds( final long seconds )
+    {
+        this.maxMemoryIdleTimeSeconds = seconds;
+    }
+
+    /**
+     * If UseMemoryShrinker is true the memory cache should auto-expire elements to reclaim space.
+     * <p>
+     * @return The MaxMemoryIdleTimeSeconds value
+     */
+    @Override
+    public long getMaxMemoryIdleTimeSeconds()
+    {
+        return this.maxMemoryIdleTimeSeconds;
+    }
+
+    /**
+     * If UseMemoryShrinker is true the memory cache should auto-expire elements to reclaim space.
+     * This sets the shrinker interval.
+     * <p>
+     * @param seconds The new ShrinkerIntervalSeconds value
+     */
+    @Override
+    public void setShrinkerIntervalSeconds( final long seconds )
+    {
+        this.shrinkerIntervalSeconds = seconds;
+    }
+
+    /**
+     * If UseMemoryShrinker is true the memory cache should auto-expire elements to reclaim space.
+     * This gets the shrinker interval.
+     * <p>
+     * @return The ShrinkerIntervalSeconds value
+     */
+    @Override
+    public long getShrinkerIntervalSeconds()
+    {
+        return this.shrinkerIntervalSeconds;
+    }
+
+    /**
+     * If UseMemoryShrinker is true the memory cache should auto-expire elements to reclaim space.
+     * This sets the maximum number of items to spool per run.
+     * <p>
+     * If the value is -1, then there is no limit to the number of items to be spooled.
+     * <p>
+     * @param maxSpoolPerRun The new maxSpoolPerRun value
+     */
+    @Override
+    public void setMaxSpoolPerRun( final int maxSpoolPerRun )
+    {
+        this.maxSpoolPerRun = maxSpoolPerRun;
+    }
+
+    /**
+     * If UseMemoryShrinker is true the memory cache should auto-expire elements to reclaim space.
+     * This gets the maximum number of items to spool per run.
+     * <p>
+     * @return The maxSpoolPerRun value
+     */
+    @Override
+    public int getMaxSpoolPerRun()
+    {
+        return this.maxSpoolPerRun;
+    }
+
+    /**
+     * By default this is SWAP_ONLY.
+     * <p>
+     * @param diskUsagePattern The diskUsagePattern to set.
+     */
+    @Override
+    public void setDiskUsagePattern( final DiskUsagePattern diskUsagePattern )
+    {
+        this.diskUsagePattern = diskUsagePattern;
+    }
+
+    /**
+     * Translates the name to the disk usage pattern short value.
+     * <p>
+     * The allowed values are SWAP and UPDATE.
+     * <p>
+     * @param diskUsagePatternName The diskUsagePattern to set.
+     */
+    @Override
+    public void setDiskUsagePatternName( final String diskUsagePatternName )
+    {
+        if ( diskUsagePatternName != null )
+        {
+            final String name = diskUsagePatternName.toUpperCase().trim();
+            if ( name.startsWith( "SWAP" ) )
+            {
+                this.setDiskUsagePattern( DiskUsagePattern.SWAP );
+            }
+            else if ( name.startsWith( "UPDATE" ) )
+            {
+                this.setDiskUsagePattern( DiskUsagePattern.UPDATE );
+            }
+        }
+    }
+
+    /**
+     * Number to send to disk at at time when memory is full.
+     * <p>
+     * @return int
+     */
+    @Override
+    public int getSpoolChunkSize()
+    {
+        return spoolChunkSize;
+    }
+
+    /**
+     * Number to send to disk at a time.
+     * <p>
+     * @param spoolChunkSize
+     */
+    @Override
+    public void setSpoolChunkSize( final int spoolChunkSize )
+    {
+        this.spoolChunkSize = spoolChunkSize;
+    }
+
+    /**
+     * @return Returns the diskUsagePattern.
+     */
+    @Override
+    public DiskUsagePattern getDiskUsagePattern()
+    {
+        return diskUsagePattern;
+    }
+
+    /**
+     * Dumps the core attributes.
+     * <p>
+     * @return For debugging.
+     */
+    @Override
+    public String toString()
+    {
+        final StringBuilder dump = new StringBuilder();
+
+        dump.append( "[ " );
+        dump.append( "useLateral = " ).append( useLateral );
+        dump.append( ", useRemote = " ).append( useRemote );
+        dump.append( ", useDisk = " ).append( useDisk );
+        dump.append( ", maxObjs = " ).append( maxObjs );
+        dump.append( ", maxSpoolPerRun = " ).append( maxSpoolPerRun );
+        dump.append( ", diskUsagePattern = " ).append( diskUsagePattern );
+        dump.append( ", spoolChunkSize = " ).append( spoolChunkSize );
+        dump.append( " ]" );
+
+        return dump.toString();
+    }
+
+    /**
+     * @see Object#clone()
+     */
+    @Override
+    public ICompositeCacheAttributes clone()
+    {
+        try
+        {
+            return (ICompositeCacheAttributes)super.clone();
+        }
+        catch (final CloneNotSupportedException e)
+        {
+            throw new RuntimeException("Clone not supported. This should never happen.", e);
+        }
+    }
+}
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/ElementAttributes.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/ElementAttributes.java
index e35a2349..cb5150a3 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/ElementAttributes.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/ElementAttributes.java
@@ -1,459 +1,459 @@
-package org.apache.commons.jcs3.engine;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.jcs3.engine.behavior.IElementAttributes;
-import org.apache.commons.jcs3.engine.control.event.behavior.IElementEventHandler;
-
-/**
- * This it the element attribute descriptor class. Each element in the cache has an ElementAttribute
- * object associated with it. An ElementAttributes object can be associated with an element in 3
- * ways:
- * <ol>
- * <li>When the item is put into the cache, you can associate an element attributes object.</li>
- * <li>If not attributes object is include when the element is put into the cache, then the default
- * attributes for the region will be used.</li>
- * <li>The element attributes can be reset. This effectively results in a retrieval followed by a
- * put. Hence, this is the same as 1.</li>
- * </ol>
- */
-public class ElementAttributes
-    implements IElementAttributes
-{
-    /** Don't change. */
-    private static final long serialVersionUID = 7814990748035017441L;
-
-    /** Can this item be flushed to disk */
-    private boolean IS_SPOOL = true;
-
-    /** Is this item laterally distributable */
-    private boolean IS_LATERAL = true;
-
-    /** Can this item be sent to the remote cache */
-    private boolean IS_REMOTE = true;
-
-    /**
-     * You can turn off expiration by setting this to true. This causes the cache to bypass both max
-     * life and idle time expiration.
-     */
-    private boolean IS_ETERNAL = true;
-
-    /** Max life seconds */
-    private long maxLife = -1;
-
-    /**
-     * The maximum time an entry can be idle. Setting this to -1 causes the idle time check to be
-     * ignored.
-     */
-    private long maxIdleTime = -1;
-
-    /** The byte size of the field. Must be manually set. */
-    private int size;
-
-    /** The creation time. This is used to enforce the max life. */
-    private long createTime;
-
-    /** The last access time. This is used to enforce the max idel time. */
-    private long lastAccessTime;
-
-    /**
-     * The list of Event handlers to use. This is transient, since the event handlers cannot usually
-     * be serialized. This means that you cannot attach a post serialization event to an item.
-     * <p>
-     * TODO we need to check that when an item is passed to a non-local cache that if the local
-     * cache had a copy with event handlers, that those handlers are used.
-     */
-    private transient ArrayList<IElementEventHandler> eventHandlers;
-
-    private long timeFactor = 1000;
-
-    /**
-     * Constructor for the IElementAttributes object
-     */
-    public ElementAttributes()
-    {
-        this.createTime = System.currentTimeMillis();
-        this.lastAccessTime = this.createTime;
-    }
-
-    /**
-     * Constructor for the IElementAttributes object
-     * <p>
-     * @param attr
-     */
-    protected ElementAttributes( final ElementAttributes attr )
-    {
-        IS_ETERNAL = attr.IS_ETERNAL;
-
-        // waterfall onto disk, for pure disk set memory to 0
-        IS_SPOOL = attr.IS_SPOOL;
-
-        // lateral
-        IS_LATERAL = attr.IS_LATERAL;
-
-        // central rmi store
-        IS_REMOTE = attr.IS_REMOTE;
-
-        maxLife = attr.maxLife;
-        // time-to-live
-        maxIdleTime = attr.maxIdleTime;
-        size = attr.size;
-    }
-
-    /**
-     * Sets the maxLife attribute of the IAttributes object.
-     * <p>
-     * @param mls The new MaxLifeSeconds value
-     */
-    @Override
-    public void setMaxLife(final long mls)
-    {
-        this.maxLife = mls;
-    }
-
-    /**
-     * Sets the maxLife attribute of the IAttributes object. How many seconds it can live after
-     * creation.
-     * <p>
-     * If this is exceeded the element will not be returned, instead it will be removed. It will be
-     * removed on retrieval, or removed actively if the memory shrinker is turned on.
-     * @return The MaxLifeSeconds value
-     */
-    @Override
-    public long getMaxLife()
-    {
-        return this.maxLife;
-    }
-
-    /**
-     * Sets the idleTime attribute of the IAttributes object. This is the maximum time the item can
-     * be idle in the cache, that is not accessed.
-     * <p>
-     * If this is exceeded the element will not be returned, instead it will be removed. It will be
-     * removed on retrieval, or removed actively if the memory shrinker is turned on.
-     * @param idle The new idleTime value
-     */
-    @Override
-    public void setIdleTime( final long idle )
-    {
-        this.maxIdleTime = idle;
-    }
-
-    /**
-     * Size in bytes. This is not used except in the admin pages. It will be 0 by default
-     * and is only updated when the element is serialized.
-     * <p>
-     * @param size The new size value
-     */
-    @Override
-    public void setSize( final int size )
-    {
-        this.size = size;
-    }
-
-    /**
-     * Gets the size attribute of the IAttributes object
-     * <p>
-     * @return The size value
-     */
-    @Override
-    public int getSize()
-    {
-        return size;
-    }
-
-    /**
-     * Gets the createTime attribute of the IAttributes object.
-     * <p>
-     * This should be the current time in milliseconds returned by the sysutem call when the element
-     * is put in the cache.
-     * <p>
-     * Putting an item in the cache overrides any existing items.
-     * @return The createTime value
-     */
-    @Override
-    public long getCreateTime()
-    {
-        return createTime;
-    }
-
-    /**
-     * Sets the createTime attribute of the IElementAttributes object
-     */
-    public void setCreateTime()
-    {
-        createTime = System.currentTimeMillis();
-    }
-
-    /**
-     * Gets the idleTime attribute of the IAttributes object.
-     * <p>
-     * @return The idleTime value
-     */
-    @Override
-    public long getIdleTime()
-    {
-        return this.maxIdleTime;
-    }
-
-    /**
-     * Gets the time left to live of the IAttributes object.
-     * <p>
-     * This is the (max life + create time) - current time.
-     * @return The TimeToLiveSeconds value
-     */
-    @Override
-    public long getTimeToLiveSeconds()
-    {
-        final long now = System.currentTimeMillis();
-        final long timeFactorForMilliseconds = getTimeFactorForMilliseconds();
-        return ( this.getCreateTime() + this.getMaxLife() * timeFactorForMilliseconds - now ) / 1000;
-    }
-
-    /**
-     * Gets the LastAccess attribute of the IAttributes object.
-     * <p>
-     * @return The LastAccess value.
-     */
-    @Override
-    public long getLastAccessTime()
-    {
-        return this.lastAccessTime;
-    }
-
-    /**
-     * Sets the LastAccessTime as now of the IElementAttributes object
-     */
-    @Override
-    public void setLastAccessTimeNow()
-    {
-        this.lastAccessTime = System.currentTimeMillis();
-    }
-
-    /**
-     * only for use from test code
-     */
-    public void setLastAccessTime(final long time)
-    {
-        this.lastAccessTime = time;
-    }
-
-    /**
-     * Can this item be spooled to disk
-     * <p>
-     * By default this is true.
-     * @return The spoolable value
-     */
-    @Override
-    public boolean getIsSpool()
-    {
-        return this.IS_SPOOL;
-    }
-
-    /**
-     * Sets the isSpool attribute of the IElementAttributes object
-     * <p>
-     * By default this is true.
-     * @param val The new isSpool value
-     */
-    @Override
-    public void setIsSpool( final boolean val )
-    {
-        this.IS_SPOOL = val;
-    }
-
-    /**
-     * Is this item laterally distributable. Can it be sent to auxiliaries of type lateral.
-     * <p>
-     * By default this is true.
-     * @return The isLateral value
-     */
-    @Override
-    public boolean getIsLateral()
-    {
-        return this.IS_LATERAL;
-    }
-
-    /**
-     * Sets the isLateral attribute of the IElementAttributes object
-     * <p>
-     * By default this is true.
-     * @param val The new isLateral value
-     */
-    @Override
-    public void setIsLateral( final boolean val )
-    {
-        this.IS_LATERAL = val;
-    }
-
-    /**
-     * Can this item be sent to the remote cache
-     * @return true if the item can be sent to a remote auxiliary
-     */
-    @Override
-    public boolean getIsRemote()
-    {
-        return this.IS_REMOTE;
-    }
-
-    /**
-     * Sets the isRemote attribute of the ElementAttributes object
-     * @param val The new isRemote value
-     */
-    @Override
-    public void setIsRemote( final boolean val )
-    {
-        this.IS_REMOTE = val;
-    }
-
-    /**
-     * You can turn off expiration by setting this to true. The max life value will be ignored.
-     * <p>
-     * @return true if the item cannot expire.
-     */
-    @Override
-    public boolean getIsEternal()
-    {
-        return this.IS_ETERNAL;
-    }
-
-    /**
-     * Sets the isEternal attribute of the ElementAttributes object. True means that the item should
-     * never expire. If can still be removed if it is the least recently used, and you are using the
-     * LRUMemory cache. it just will not be filtered for expiration by the cache hub.
-     * <p>
-     * @param val The new isEternal value
-     */
-    @Override
-    public void setIsEternal( final boolean val )
-    {
-        this.IS_ETERNAL = val;
-    }
-
-    /**
-     * Adds a ElementEventHandler. Handler's can be registered for multiple events. A registered
-     * handler will be called at every recognized event.
-     * <p>
-     * The alternative would be to register handlers for each event. Or maybe The handler interface
-     * should have a method to return whether it cares about certain events.
-     * <p>
-     * @param eventHandler The ElementEventHandler to be added to the list.
-     */
-    @Override
-    public void addElementEventHandler( final IElementEventHandler eventHandler )
-    {
-        // lazy here, no concurrency problems expected
-        if ( this.eventHandlers == null )
-        {
-            this.eventHandlers = new ArrayList<>();
-        }
-        this.eventHandlers.add( eventHandler );
-    }
-
-    /**
-     * Sets the eventHandlers of the IElementAttributes object.
-     * <p>
-     * This add the references to the local list. Subsequent changes in the caller's list will not
-     * be reflected.
-     * <p>
-     * @param eventHandlers List of IElementEventHandler objects
-     */
-    @Override
-    public void addElementEventHandlers( final List<IElementEventHandler> eventHandlers )
-    {
-        if ( eventHandlers == null )
-        {
-            return;
-        }
-
-        for (final IElementEventHandler handler : eventHandlers)
-        {
-            addElementEventHandler(handler);
-        }
-    }
-
-    @Override
-    public long getTimeFactorForMilliseconds()
-    {
-        return timeFactor;
-    }
-
-    @Override
-    public void setTimeFactorForMilliseconds(final long factor)
-    {
-        this.timeFactor = factor;
-    }
-
-    /**
-     * Gets the elementEventHandlers. Returns null if none exist. Makes checking easy.
-     * <p>
-     * @return The elementEventHandlers List of IElementEventHandler objects
-     */
-    @Override
-    public ArrayList<IElementEventHandler> getElementEventHandlers()
-    {
-        return this.eventHandlers;
-    }
-
-    /**
-     * For logging and debugging the element IElementAttributes.
-     * <p>
-     * @return String info about the values.
-     */
-    @Override
-    public String toString()
-    {
-        final StringBuilder dump = new StringBuilder();
-
-        dump.append( "[ IS_LATERAL = " ).append( IS_LATERAL );
-        dump.append( ", IS_SPOOL = " ).append( IS_SPOOL );
-        dump.append( ", IS_REMOTE = " ).append( IS_REMOTE );
-        dump.append( ", IS_ETERNAL = " ).append( IS_ETERNAL );
-        dump.append( ", MaxLifeSeconds = " ).append( this.getMaxLife() );
-        dump.append( ", IdleTime = " ).append( this.getIdleTime() );
-        dump.append( ", CreateTime = " ).append( this.getCreateTime() );
-        dump.append( ", LastAccessTime = " ).append( this.getLastAccessTime() );
-        dump.append( ", getTimeToLiveSeconds() = " ).append( String.valueOf( getTimeToLiveSeconds() ) );
-        dump.append( ", createTime = " ).append( String.valueOf( createTime ) ).append( " ]" );
-
-        return dump.toString();
-    }
-
-    /**
-     * @see java.lang.Object#clone()
-     */
-    @Override
-    public IElementAttributes clone()
-    {
-        try
-        {
-        	final ElementAttributes c = (ElementAttributes) super.clone();
-        	c.setCreateTime();
-            return c;
-        }
-        catch (final CloneNotSupportedException e)
-        {
-            throw new RuntimeException("Clone not supported. This should never happen.", e);
-        }
-    }
-}
+package org.apache.commons.jcs3.engine;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.jcs3.engine.behavior.IElementAttributes;
+import org.apache.commons.jcs3.engine.control.event.behavior.IElementEventHandler;
+
+/**
+ * This it the element attribute descriptor class. Each element in the cache has an ElementAttribute
+ * object associated with it. An ElementAttributes object can be associated with an element in 3
+ * ways:
+ * <ol>
+ * <li>When the item is put into the cache, you can associate an element attributes object.</li>
+ * <li>If not attributes object is include when the element is put into the cache, then the default
+ * attributes for the region will be used.</li>
+ * <li>The element attributes can be reset. This effectively results in a retrieval followed by a
+ * put. Hence, this is the same as 1.</li>
+ * </ol>
+ */
+public class ElementAttributes
+    implements IElementAttributes
+{
+    /** Don't change. */
+    private static final long serialVersionUID = 7814990748035017441L;
+
+    /** Can this item be flushed to disk */
+    private boolean IS_SPOOL = true;
+
+    /** Is this item laterally distributable */
+    private boolean IS_LATERAL = true;
+
+    /** Can this item be sent to the remote cache */
+    private boolean IS_REMOTE = true;
+
+    /**
+     * You can turn off expiration by setting this to true. This causes the cache to bypass both max
+     * life and idle time expiration.
+     */
+    private boolean IS_ETERNAL = true;
+
+    /** Max life seconds */
+    private long maxLife = -1;
+
+    /**
+     * The maximum time an entry can be idle. Setting this to -1 causes the idle time check to be
+     * ignored.
+     */
+    private long maxIdleTime = -1;
+
+    /** The byte size of the field. Must be manually set. */
+    private int size;
+
+    /** The creation time. This is used to enforce the max life. */
+    private long createTime;
+
+    /** The last access time. This is used to enforce the max idel time. */
+    private long lastAccessTime;
+
+    /**
+     * The list of Event handlers to use. This is transient, since the event handlers cannot usually
+     * be serialized. This means that you cannot attach a post serialization event to an item.
+     * <p>
+     * TODO we need to check that when an item is passed to a non-local cache that if the local
+     * cache had a copy with event handlers, that those handlers are used.
+     */
+    private transient ArrayList<IElementEventHandler> eventHandlers;
+
+    private long timeFactor = 1000;
+
+    /**
+     * Constructor for the IElementAttributes object
+     */
+    public ElementAttributes()
+    {
+        this.createTime = System.currentTimeMillis();
+        this.lastAccessTime = this.createTime;
+    }
+
+    /**
+     * Constructor for the IElementAttributes object
+     * <p>
+     * @param attr
+     */
+    protected ElementAttributes( final ElementAttributes attr )
+    {
+        IS_ETERNAL = attr.IS_ETERNAL;
+
+        // waterfall onto disk, for pure disk set memory to 0
+        IS_SPOOL = attr.IS_SPOOL;
+
+        // lateral
+        IS_LATERAL = attr.IS_LATERAL;
+
+        // central rmi store
+        IS_REMOTE = attr.IS_REMOTE;
+
+        maxLife = attr.maxLife;
+        // time-to-live
+        maxIdleTime = attr.maxIdleTime;
+        size = attr.size;
+    }
+
+    /**
+     * Sets the maxLife attribute of the IAttributes object.
+     * <p>
+     * @param mls The new MaxLifeSeconds value
+     */
+    @Override
+    public void setMaxLife(final long mls)
+    {
+        this.maxLife = mls;
+    }
+
+    /**
+     * Sets the maxLife attribute of the IAttributes object. How many seconds it can live after
+     * creation.
+     * <p>
+     * If this is exceeded the element will not be returned, instead it will be removed. It will be
+     * removed on retrieval, or removed actively if the memory shrinker is turned on.
+     * @return The MaxLifeSeconds value
+     */
+    @Override
+    public long getMaxLife()
+    {
+        return this.maxLife;
+    }
+
+    /**
+     * Sets the idleTime attribute of the IAttributes object. This is the maximum time the item can
+     * be idle in the cache, that is not accessed.
+     * <p>
+     * If this is exceeded the element will not be returned, instead it will be removed. It will be
+     * removed on retrieval, or removed actively if the memory shrinker is turned on.
+     * @param idle The new idleTime value
+     */
+    @Override
+    public void setIdleTime( final long idle )
+    {
+        this.maxIdleTime = idle;
+    }
+
+    /**
+     * Size in bytes. This is not used except in the admin pages. It will be 0 by default
+     * and is only updated when the element is serialized.
+     * <p>
+     * @param size The new size value
+     */
+    @Override
+    public void setSize( final int size )
+    {
+        this.size = size;
+    }
+
+    /**
+     * Gets the size attribute of the IAttributes object
+     * <p>
+     * @return The size value
+     */
+    @Override
+    public int getSize()
+    {
+        return size;
+    }
+
+    /**
+     * Gets the createTime attribute of the IAttributes object.
+     * <p>
+     * This should be the current time in milliseconds returned by the sysutem call when the element
+     * is put in the cache.
+     * <p>
+     * Putting an item in the cache overrides any existing items.
+     * @return The createTime value
+     */
+    @Override
+    public long getCreateTime()
+    {
+        return createTime;
+    }
+
+    /**
+     * Sets the createTime attribute of the IElementAttributes object
+     */
+    public void setCreateTime()
+    {
+        createTime = System.currentTimeMillis();
+    }
+
+    /**
+     * Gets the idleTime attribute of the IAttributes object.
+     * <p>
+     * @return The idleTime value
+     */
+    @Override
+    public long getIdleTime()
+    {
+        return this.maxIdleTime;
+    }
+
+    /**
+     * Gets the time left to live of the IAttributes object.
+     * <p>
+     * This is the (max life + create time) - current time.
+     * @return The TimeToLiveSeconds value
+     */
+    @Override
+    public long getTimeToLiveSeconds()
+    {
+        final long now = System.currentTimeMillis();
+        final long timeFactorForMilliseconds = getTimeFactorForMilliseconds();
+        return ( this.getCreateTime() + this.getMaxLife() * timeFactorForMilliseconds - now ) / 1000;
+    }
+
+    /**
+     * Gets the LastAccess attribute of the IAttributes object.
+     * <p>
+     * @return The LastAccess value.
+     */
+    @Override
+    public long getLastAccessTime()
+    {
+        return this.lastAccessTime;
+    }
+
+    /**
+     * Sets the LastAccessTime as now of the IElementAttributes object
+     */
+    @Override
+    public void setLastAccessTimeNow()
+    {
+        this.lastAccessTime = System.currentTimeMillis();
+    }
+
+    /**
+     * only for use from test code
+     */
+    public void setLastAccessTime(final long time)
+    {
+        this.lastAccessTime = time;
+    }
+
+    /**
+     * Can this item be spooled to disk
+     * <p>
+     * By default this is true.
+     * @return The spoolable value
+     */
+    @Override
+    public boolean getIsSpool()
+    {
+        return this.IS_SPOOL;
+    }
+
+    /**
+     * Sets the isSpool attribute of the IElementAttributes object
+     * <p>
+     * By default this is true.
+     * @param val The new isSpool value
+     */
+    @Override
+    public void setIsSpool( final boolean val )
+    {
+        this.IS_SPOOL = val;
+    }
+
+    /**
+     * Is this item laterally distributable. Can it be sent to auxiliaries of type lateral.
+     * <p>
+     * By default this is true.
+     * @return The isLateral value
+     */
+    @Override
+    public boolean getIsLateral()
+    {
+        return this.IS_LATERAL;
+    }
+
+    /**
+     * Sets the isLateral attribute of the IElementAttributes object
+     * <p>
+     * By default this is true.
+     * @param val The new isLateral value
+     */
+    @Override
+    public void setIsLateral( final boolean val )
+    {
+        this.IS_LATERAL = val;
+    }
+
+    /**
+     * Can this item be sent to the remote cache
+     * @return true if the item can be sent to a remote auxiliary
+     */
+    @Override
+    public boolean getIsRemote()
+    {
+        return this.IS_REMOTE;
+    }
+
+    /**
+     * Sets the isRemote attribute of the ElementAttributes object
+     * @param val The new isRemote value
+     */
+    @Override
+    public void setIsRemote( final boolean val )
+    {
+        this.IS_REMOTE = val;
+    }
+
+    /**
+     * You can turn off expiration by setting this to true. The max life value will be ignored.
+     * <p>
+     * @return true if the item cannot expire.
+     */
+    @Override
+    public boolean getIsEternal()
+    {
+        return this.IS_ETERNAL;
+    }
+
+    /**
+     * Sets the isEternal attribute of the ElementAttributes object. True means that the item should
+     * never expire. If can still be removed if it is the least recently used, and you are using the
+     * LRUMemory cache. it just will not be filtered for expiration by the cache hub.
+     * <p>
+     * @param val The new isEternal value
+     */
+    @Override
+    public void setIsEternal( final boolean val )
+    {
+        this.IS_ETERNAL = val;
+    }
+
+    /**
+     * Adds a ElementEventHandler. Handler's can be registered for multiple events. A registered
+     * handler will be called at every recognized event.
+     * <p>
+     * The alternative would be to register handlers for each event. Or maybe The handler interface
+     * should have a method to return whether it cares about certain events.
+     * <p>
+     * @param eventHandler The ElementEventHandler to be added to the list.
+     */
+    @Override
+    public void addElementEventHandler( final IElementEventHandler eventHandler )
+    {
+        // lazy here, no concurrency problems expected
+        if ( this.eventHandlers == null )
+        {
+            this.eventHandlers = new ArrayList<>();
+        }
+        this.eventHandlers.add( eventHandler );
+    }
+
+    /**
+     * Sets the eventHandlers of the IElementAttributes object.
+     * <p>
+     * This add the references to the local list. Subsequent changes in the caller's list will not
+     * be reflected.
+     * <p>
+     * @param eventHandlers List of IElementEventHandler objects
+     */
+    @Override
+    public void addElementEventHandlers( final List<IElementEventHandler> eventHandlers )
+    {
+        if ( eventHandlers == null )
+        {
+            return;
+        }
+
+        for (final IElementEventHandler handler : eventHandlers)
+        {
+            addElementEventHandler(handler);
+        }
+    }
+
+    @Override
+    public long getTimeFactorForMilliseconds()
+    {
+        return timeFactor;
+    }
+
+    @Override
+    public void setTimeFactorForMilliseconds(final long factor)
+    {
+        this.timeFactor = factor;
+    }
+
+    /**
+     * Gets the elementEventHandlers. Returns null if none exist. Makes checking easy.
+     * <p>
+     * @return The elementEventHandlers List of IElementEventHandler objects
+     */
+    @Override
+    public ArrayList<IElementEventHandler> getElementEventHandlers()
+    {
+        return this.eventHandlers;
+    }
+
+    /**
+     * For logging and debugging the element IElementAttributes.
+     * <p>
+     * @return String info about the values.
+     */
+    @Override
+    public String toString()
+    {
+        final StringBuilder dump = new StringBuilder();
+
+        dump.append( "[ IS_LATERAL = " ).append( IS_LATERAL );
+        dump.append( ", IS_SPOOL = " ).append( IS_SPOOL );
+        dump.append( ", IS_REMOTE = " ).append( IS_REMOTE );
+        dump.append( ", IS_ETERNAL = " ).append( IS_ETERNAL );
+        dump.append( ", MaxLifeSeconds = " ).append( this.getMaxLife() );
+        dump.append( ", IdleTime = " ).append( this.getIdleTime() );
+        dump.append( ", CreateTime = " ).append( this.getCreateTime() );
+        dump.append( ", LastAccessTime = " ).append( this.getLastAccessTime() );
+        dump.append( ", getTimeToLiveSeconds() = " ).append( String.valueOf( getTimeToLiveSeconds() ) );
+        dump.append( ", createTime = " ).append( String.valueOf( createTime ) ).append( " ]" );
+
+        return dump.toString();
+    }
+
+    /**
+     * @see Object#clone()
+     */
+    @Override
+    public IElementAttributes clone()
+    {
+        try
+        {
+        	final ElementAttributes c = (ElementAttributes) super.clone();
+        	c.setCreateTime();
+            return c;
+        }
+        catch (final CloneNotSupportedException e)
+        {
+            throw new RuntimeException("Clone not supported. This should never happen.", e);
+        }
+    }
+}
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/memory/AbstractDoubleLinkedListMemoryCache.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/memory/AbstractDoubleLinkedListMemoryCache.java
index 92bd24b4..ea23ce7f 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/memory/AbstractDoubleLinkedListMemoryCache.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/memory/AbstractDoubleLinkedListMemoryCache.java
@@ -1,510 +1,510 @@
-package org.apache.commons.jcs3.engine.memory;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.commons.jcs3.engine.behavior.ICacheElement;
-import org.apache.commons.jcs3.engine.control.CompositeCache;
-import org.apache.commons.jcs3.engine.control.group.GroupAttrName;
-import org.apache.commons.jcs3.engine.memory.util.MemoryElementDescriptor;
-import org.apache.commons.jcs3.engine.stats.StatElement;
-import org.apache.commons.jcs3.engine.stats.behavior.IStatElement;
-import org.apache.commons.jcs3.engine.stats.behavior.IStats;
-import org.apache.commons.jcs3.log.Log;
-import org.apache.commons.jcs3.log.LogManager;
-import org.apache.commons.jcs3.utils.struct.DoubleLinkedList;
-
-/**
- * This class contains methods that are common to memory caches using the double linked list, such
- * as the LRU, MRU, FIFO, and LIFO caches.
- * <p>
- * Children can control the expiration algorithm by controlling the update and get. The last item in the list will be the one
- * removed when the list fills. For instance LRU should more items to the front as they are used. FIFO should simply add new items
- * to the front of the list.
- */
-public abstract class AbstractDoubleLinkedListMemoryCache<K, V> extends AbstractMemoryCache<K, V>
-{
-    /** The logger. */
-    private static final Log log = LogManager.getLog(AbstractDoubleLinkedListMemoryCache.class);
-
-    /** thread-safe double linked list for lru */
-    protected DoubleLinkedList<MemoryElementDescriptor<K, V>> list; // TODO privatise
-
-    /**
-     * For post reflection creation initialization.
-     * <p>
-     *
-     * @param hub
-     */
-    @Override
-    public void initialize(final CompositeCache<K, V> hub)
-    {
-        super.initialize(hub);
-        list = new DoubleLinkedList<>();
-        log.info("initialized MemoryCache for {0}", this::getCacheName);
-    }
-
-    /**
-     * This is called by super initialize.
-     *
-     * NOTE: should return a thread safe map
-     *
-     * <p>
-     *
-     * @return new ConcurrentHashMap()
-     */
-    @Override
-    public ConcurrentMap<K, MemoryElementDescriptor<K, V>> createMap()
-    {
-        return new ConcurrentHashMap<>();
-    }
-
-    /**
-     * Calls the abstract method updateList.
-     * <p>
-     * If the max size is reached, an element will be put to disk.
-     * <p>
-     *
-     * @param ce
-     *            The cache element, or entry wrapper
-     * @throws IOException
-     */
-    @Override
-    public final void update(final ICacheElement<K, V> ce) throws IOException
-    {
-        putCnt.incrementAndGet();
-
-        lock.lock();
-        try
-        {
-            final MemoryElementDescriptor<K, V> newNode = adjustListForUpdate(ce);
-
-            // this should be synchronized if we were not using a ConcurrentHashMap
-            final K key = newNode.getCacheElement().getKey();
-            final MemoryElementDescriptor<K, V> oldNode = map.put(key, newNode);
-
-            // If the node was the same as an existing node, remove it.
-            if (oldNode != null && key.equals(oldNode.getCacheElement().getKey()))
-            {
-                list.remove(oldNode);
-            }
-        }
-        finally
-        {
-            lock.unlock();
-        }
-
-        // If we are over the max spool some
-        spoolIfNeeded();
-    }
-
-    /**
-     * Children implement this to control the cache expiration algorithm
-     * <p>
-     *
-     * @param ce
-     * @return MemoryElementDescriptor the new node
-     * @throws IOException
-     */
-    protected abstract MemoryElementDescriptor<K, V> adjustListForUpdate(ICacheElement<K, V> ce) throws IOException;
-
-    /**
-     * If the max size has been reached, spool.
-     * <p>
-     *
-     * @throws Error
-     */
-    private void spoolIfNeeded() throws Error
-    {
-        final int size = map.size();
-        // If the element limit is reached, we need to spool
-
-        if (size <= this.getCacheAttributes().getMaxObjects())
-        {
-            return;
-        }
-
-        log.debug("In memory limit reached, spooling");
-
-        // Write the last 'chunkSize' items to disk.
-        final int chunkSizeCorrected = Math.min(size, chunkSize);
-
-        log.debug("About to spool to disk cache, map size: {0}, max objects: {1}, "
-                + "maximum items to spool: {2}", () -> size,
-                this.getCacheAttributes()::getMaxObjects,
-                () -> chunkSizeCorrected);
-
-        // The spool will put them in a disk event queue, so there is no
-        // need to pre-queue the queuing. This would be a bit wasteful
-        // and wouldn't save much time in this synchronous call.
-        lock.lock();
-
-        try
-        {
-            freeElements(chunkSizeCorrected);
-
-            // If this is out of the sync block it can detect a mismatch
-            // where there is none.
-            if (log.isDebugEnabled() && map.size() != list.size())
-            {
-                log.debug("update: After spool, size mismatch: map.size() = {0}, "
-                        + "linked list size = {1}", map.size(), list.size());
-            }
-        }
-        finally
-        {
-            lock.unlock();
-        }
-
-        log.debug("update: After spool map size: {0} linked list size = {1}",
-                () -> map.size(), () -> list.size());
-    }
-
-    /**
-     * This instructs the memory cache to remove the <i>numberToFree</i> according to its eviction
-     * policy. For example, the LRUMemoryCache will remove the <i>numberToFree</i> least recently
-     * used items. These will be spooled to disk if a disk auxiliary is available.
-     * <p>
-     *
-     * @param numberToFree
-     * @return the number that were removed. if you ask to free 5, but there are only 3, you will
-     *         get 3.
-     */
-    @Override
-    public int freeElements(final int numberToFree)
-    {
-        int freed = 0;
-
-        lock.lock();
-
-        try
-        {
-            for (; freed < numberToFree; freed++)
-            {
-                final ICacheElement<K, V> element = spoolLastElement();
-                if (element == null)
-                {
-                    break;
-                }
-            }
-        }
-        finally
-        {
-            lock.unlock();
-        }
-
-        return freed;
-    }
-
-    /**
-     * This spools the last element in the LRU, if one exists.
-     * The method is called guarded by the lock
-     * <p>
-     *
-     * @return ICacheElement&lt;K, V&gt; if there was a last element, else null.
-     * @throws Error
-     */
-    private ICacheElement<K, V> spoolLastElement() throws Error
-    {
-        ICacheElement<K, V> toSpool = null;
-
-        final MemoryElementDescriptor<K, V> last = list.getLast();
-        if (last != null)
-        {
-            toSpool = last.getCacheElement();
-            if (toSpool == null)
-            {
-                throw new Error("update: last.ce is null!");
-            }
-            getCompositeCache().spoolToDisk(toSpool);
-            if (map.remove(toSpool.getKey()) == null)
-            {
-                log.warn("update: remove failed for key: {0}", toSpool.getKey());
-
-                if (log.isTraceEnabled())
-                {
-                    verifyCache();
-                }
-            }
-
-            list.remove(last);
-        }
-
-        return toSpool;
-    }
-
-    /**
-     * @see org.apache.commons.jcs3.engine.memory.AbstractMemoryCache#get(java.lang.Object)
-     */
-    @Override
-    public ICacheElement<K, V> get(final K key) throws IOException
-    {
-        final ICacheElement<K, V> ce = super.get(key);
-
-        if (log.isTraceEnabled())
-        {
-            verifyCache();
-        }
-
-        return ce;
-    }
-
-    /**
-     * Adjust the list as needed for a get. This allows children to control the algorithm
-     * <p>
-     *
-     * @param me
-     */
-    protected abstract void adjustListForGet(MemoryElementDescriptor<K, V> me);
-
-    /**
-     * Update control structures after get
-     * (guarded by the lock)
-     *
-     * @param me the memory element descriptor
-     */
-    @Override
-    protected void lockedGetElement(final MemoryElementDescriptor<K, V> me)
-    {
-        adjustListForGet(me);
-    }
-
-    /**
-     * Remove element from control structure
-     * (guarded by the lock)
-     *
-     * @param me the memory element descriptor
-     */
-    @Override
-    protected void lockedRemoveElement(final MemoryElementDescriptor<K, V> me)
-    {
-        list.remove(me);
-    }
-
-    /**
-     * Removes all cached items from the cache control structures.
-     * (guarded by the lock)
-     */
-    @Override
-    protected void lockedRemoveAll()
-    {
-        list.removeAll();
-    }
-
-    // --------------------------- internal methods (linked list implementation)
-    /**
-     * Adds a new node to the start of the link list.
-     * <p>
-     *
-     * @param ce
-     *            The feature to be added to the First
-     * @return MemoryElementDescriptor
-     */
-    protected MemoryElementDescriptor<K, V> addFirst(final ICacheElement<K, V> ce)
-    {
-        lock.lock();
-        try
-        {
-            final MemoryElementDescriptor<K, V> me = new MemoryElementDescriptor<>(ce);
-            list.addFirst(me);
-            if ( log.isTraceEnabled() )
-            {
-                verifyCache(ce.getKey());
-            }
-            return me;
-        }
-        finally
-        {
-            lock.unlock();
-        }
-    }
-
-    /**
-     * Adds a new node to the end of the link list.
-     * <p>
-     *
-     * @param ce
-     *            The feature to be added to the First
-     * @return MemoryElementDescriptor
-     */
-    protected MemoryElementDescriptor<K, V> addLast(final ICacheElement<K, V> ce)
-    {
-        lock.lock();
-        try
-        {
-            final MemoryElementDescriptor<K, V> me = new MemoryElementDescriptor<>(ce);
-            list.addLast(me);
-            if ( log.isTraceEnabled() )
-            {
-                verifyCache(ce.getKey());
-            }
-            return me;
-        }
-        finally
-        {
-            lock.unlock();
-        }
-    }
-
-    // ---------------------------------------------------------- debug methods
-
-    /**
-     * Dump the cache entries from first to list for debugging.
-     */
-    private void dumpCacheEntries()
-    {
-        log.trace("dumpingCacheEntries");
-        for (MemoryElementDescriptor<K, V> me = list.getFirst(); me != null; me = (MemoryElementDescriptor<K, V>) me.next)
-        {
-            log.trace("dumpCacheEntries> key={0}, val={1}",
-                    me.getCacheElement().getKey(), me.getCacheElement().getVal());
-        }
-    }
-
-    /**
-     * Checks to see if all the items that should be in the cache are. Checks consistency between
-     * List and map.
-     */
-    private void verifyCache()
-    {
-        boolean found = false;
-        log.trace("verifycache[{0}]: map contains {1} elements, linked list "
-                + "contains {2} elements", getCacheName(), map.size(),
-                list.size());
-        log.trace("verifycache: checking linked list by key ");
-        for (MemoryElementDescriptor<K, V> li = list.getFirst(); li != null; li = (MemoryElementDescriptor<K, V>) li.next)
-        {
-            final K key = li.getCacheElement().getKey();
-            if (!map.containsKey(key))
-            {
-                log.error("verifycache[{0}]: map does not contain key : {1}",
-                        getCacheName(), key);
-                log.error("key class={0}", key.getClass());
-                log.error("key hashcode={0}", key.hashCode());
-                log.error("key toString={0}", key.toString());
-                if (key instanceof GroupAttrName)
-                {
-                    final GroupAttrName<?> name = (GroupAttrName<?>) key;
-                    log.error("GroupID hashcode={0}", name.groupId.hashCode());
-                    log.error("GroupID.class={0}", name.groupId.getClass());
-                    log.error("AttrName hashcode={0}", name.attrName.hashCode());
-                    log.error("AttrName.class={0}", name.attrName.getClass());
-                }
-                dumpMap();
-            }
-            else if (map.get(key) == null)
-            {
-                log.error("verifycache[{0}]: linked list retrieval returned "
-                        + "null for key: {1}", getCacheName(), key);
-            }
-        }
-
-        log.trace("verifycache: checking linked list by value ");
-        for (MemoryElementDescriptor<K, V> li = list.getFirst(); li != null; li = (MemoryElementDescriptor<K, V>) li.next)
-        {
-            if (!map.containsValue(li))
-            {
-                log.error("verifycache[{0}]: map does not contain value: {1}",
-                        getCacheName(), li);
-                dumpMap();
-            }
-        }
-
-        log.trace("verifycache: checking via keysets!");
-        for (final Object val : map.keySet())
-        {
-            found = false;
-
-            for (MemoryElementDescriptor<K, V> li = list.getFirst(); li != null; li = (MemoryElementDescriptor<K, V>) li.next)
-            {
-                if (val.equals(li.getCacheElement().getKey()))
-                {
-                    found = true;
-                    break;
-                }
-            }
-            if (!found)
-            {
-                log.error("verifycache[{0}]: key not found in list : {1}",
-                        getCacheName(), val);
-                dumpCacheEntries();
-                if (map.containsKey(val))
-                {
-                    log.error("verifycache: map contains key");
-                }
-                else
-                {
-                    log.error("verifycache: map does NOT contain key, what the HECK!");
-                }
-            }
-        }
-    }
-
-    /**
-     * Logs an error if an element that should be in the cache is not.
-     * <p>
-     *
-     * @param key
-     */
-    private void verifyCache(final K key)
-    {
-        boolean found = false;
-
-        // go through the linked list looking for the key
-        for (MemoryElementDescriptor<K, V> li = list.getFirst(); li != null; li = (MemoryElementDescriptor<K, V>) li.next)
-        {
-            if (li.getCacheElement().getKey() == key)
-            {
-                found = true;
-                log.trace("verifycache(key) key match: {0}", key);
-                break;
-            }
-        }
-        if (!found)
-        {
-            log.error("verifycache(key)[{0}], couldn't find key! : {1}",
-                    getCacheName(), key);
-        }
-    }
-
-    /**
-     * This returns semi-structured information on the memory cache, such as the size, put count,
-     * hit count, and miss count.
-     * <p>
-     *
-     * @see org.apache.commons.jcs3.engine.memory.behavior.IMemoryCache#getStatistics()
-     */
-    @Override
-    public IStats getStatistics()
-    {
-        final IStats stats = super.getStatistics();
-        stats.setTypeName( /* add algorithm name */"Memory Cache");
-
-        final List<IStatElement<?>> elems = stats.getStatElements();
-
-        elems.add(new StatElement<>("List Size", Integer.valueOf(list.size())));
-
-        return stats;
-    }
-}
+package org.apache.commons.jcs3.engine.memory;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.jcs3.engine.behavior.ICacheElement;
+import org.apache.commons.jcs3.engine.control.CompositeCache;
+import org.apache.commons.jcs3.engine.control.group.GroupAttrName;
+import org.apache.commons.jcs3.engine.memory.util.MemoryElementDescriptor;
+import org.apache.commons.jcs3.engine.stats.StatElement;
+import org.apache.commons.jcs3.engine.stats.behavior.IStatElement;
+import org.apache.commons.jcs3.engine.stats.behavior.IStats;
+import org.apache.commons.jcs3.log.Log;
+import org.apache.commons.jcs3.log.LogManager;
+import org.apache.commons.jcs3.utils.struct.DoubleLinkedList;
+
+/**
+ * This class contains methods that are common to memory caches using the double linked list, such
+ * as the LRU, MRU, FIFO, and LIFO caches.
+ * <p>
+ * Children can control the expiration algorithm by controlling the update and get. The last item in the list will be the one
+ * removed when the list fills. For instance LRU should more items to the front as they are used. FIFO should simply add new items
+ * to the front of the list.
+ */
+public abstract class AbstractDoubleLinkedListMemoryCache<K, V> extends AbstractMemoryCache<K, V>
+{
+    /** The logger. */
+    private static final Log log = LogManager.getLog(AbstractDoubleLinkedListMemoryCache.class);
+
+    /** thread-safe double linked list for lru */
+    protected DoubleLinkedList<MemoryElementDescriptor<K, V>> list; // TODO privatise
+
+    /**
+     * For post reflection creation initialization.
+     * <p>
+     *
+     * @param hub
+     */
+    @Override
+    public void initialize(final CompositeCache<K, V> hub)
+    {
+        super.initialize(hub);
+        list = new DoubleLinkedList<>();
+        log.info("initialized MemoryCache for {0}", this::getCacheName);
+    }
+
+    /**
+     * This is called by super initialize.
+     *
+     * NOTE: should return a thread safe map
+     *
+     * <p>
+     *
+     * @return new ConcurrentHashMap()
+     */
+    @Override
+    public ConcurrentMap<K, MemoryElementDescriptor<K, V>> createMap()
+    {
+        return new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Calls the abstract method updateList.
+     * <p>
+     * If the max size is reached, an element will be put to disk.
+     * <p>
+     *
+     * @param ce
+     *            The cache element, or entry wrapper
+     * @throws IOException
+     */
+    @Override
+    public final void update(final ICacheElement<K, V> ce) throws IOException
+    {
+        putCnt.incrementAndGet();
+
+        lock.lock();
+        try
+        {
+            final MemoryElementDescriptor<K, V> newNode = adjustListForUpdate(ce);
+
+            // this should be synchronized if we were not using a ConcurrentHashMap
+            final K key = newNode.getCacheElement().getKey();
+            final MemoryElementDescriptor<K, V> oldNode = map.put(key, newNode);
+
+            // If the node was the same as an existing node, remove it.
+            if (oldNode != null && key.equals(oldNode.getCacheElement().getKey()))
+            {
+                list.remove(oldNode);
+            }
+        }
+        finally
+        {
+            lock.unlock();
+        }
+
+        // If we are over the max spool some
+        spoolIfNeeded();
+    }
+
+    /**
+     * Children implement this to control the cache expiration algorithm
+     * <p>
+     *
+     * @param ce
+     * @return MemoryElementDescriptor the new node
+     * @throws IOException
+     */
+    protected abstract MemoryElementDescriptor<K, V> adjustListForUpdate(ICacheElement<K, V> ce) throws IOException;
+
+    /**
+     * If the max size has been reached, spool.
+     * <p>
+     *
+     * @throws Error
+     */
+    private void spoolIfNeeded() throws Error
+    {
+        final int size = map.size();
+        // If the element limit is reached, we need to spool
+
+        if (size <= this.getCacheAttributes().getMaxObjects())
+        {
+            return;
+        }
+
+        log.debug("In memory limit reached, spooling");
+
+        // Write the last 'chunkSize' items to disk.
+        final int chunkSizeCorrected = Math.min(size, chunkSize);
+
+        log.debug("About to spool to disk cache, map size: {0}, max objects: {1}, "
+                + "maximum items to spool: {2}", () -> size,
+                this.getCacheAttributes()::getMaxObjects,
+                () -> chunkSizeCorrected);
+
+        // The spool will put them in a disk event queue, so there is no
+        // need to pre-queue the queuing. This would be a bit wasteful
+        // and wouldn't save much time in this synchronous call.
+        lock.lock();
+
+        try
+        {
+            freeElements(chunkSizeCorrected);
+
+            // If this is out of the sync block it can detect a mismatch
+            // where there is none.
+            if (log.isDebugEnabled() && map.size() != list.size())
+            {
+                log.debug("update: After spool, size mismatch: map.size() = {0}, "
+                        + "linked list size = {1}", map.size(), list.size());
+            }
+        }
+        finally
+        {
+            lock.unlock();
+        }
+
+        log.debug("update: After spool map size: {0} linked list size = {1}",
+                () -> map.size(), () -> list.size());
+    }
+
+    /**
+     * This instructs the memory cache to remove the <i>numberToFree</i> according to its eviction
+     * policy. For example, the LRUMemoryCache will remove the <i>numberToFree</i> least recently
+     * used items. These will be spooled to disk if a disk auxiliary is available.
+     * <p>
+     *
+     * @param numberToFree
+     * @return the number that were removed. if you ask to free 5, but there are only 3, you will
+     *         get 3.
+     */
+    @Override
+    public int freeElements(final int numberToFree)
+    {
+        int freed = 0;
+
+        lock.lock();
+
+        try
+        {
+            for (; freed < numberToFree; freed++)
+            {
+                final ICacheElement<K, V> element = spoolLastElement();
+                if (element == null)
+                {
+                    break;
+                }
+            }
+        }
+        finally
+        {
+            lock.unlock();
+        }
+
+        return freed;
+    }
+
+    /**
+     * This spools the last element in the LRU, if one exists.
+     * The method is called guarded by the lock
+     * <p>
+     *
+     * @return ICacheElement&lt;K, V&gt; if there was a last element, else null.
+     * @throws Error
+     */
+    private ICacheElement<K, V> spoolLastElement() throws Error
+    {
+        ICacheElement<K, V> toSpool = null;
+
+        final MemoryElementDescriptor<K, V> last = list.getLast();
+        if (last != null)
+        {
+            toSpool = last.getCacheElement();
+            if (toSpool == null)
+            {
+                throw new Error("update: last.ce is null!");
+            }
+            getCompositeCache().spoolToDisk(toSpool);
+            if (map.remove(toSpool.getKey()) == null)
+            {
+                log.warn("update: remove failed for key: {0}", toSpool.getKey());
+
+                if (log.isTraceEnabled())
+                {
+                    verifyCache();
+                }
+            }
+
+            list.remove(last);
+        }
+
+        return toSpool;
+    }
+
+    /**
+     * @see org.apache.commons.jcs3.engine.memory.AbstractMemoryCache#get(Object)
+     */
+    @Override
+    public ICacheElement<K, V> get(final K key) throws IOException
+    {
+        final ICacheElement<K, V> ce = super.get(key);
+
+        if (log.isTraceEnabled())
+        {
+            verifyCache();
+        }
+
+        return ce;
+    }
+
+    /**
+     * Adjust the list as needed for a get. This allows children to control the algorithm
+     * <p>
+     *
+     * @param me
+     */
+    protected abstract void adjustListForGet(MemoryElementDescriptor<K, V> me);
+
+    /**
+     * Update control structures after get
+     * (guarded by the lock)
+     *
+     * @param me the memory element descriptor
+     */
+    @Override
+    protected void lockedGetElement(final MemoryElementDescriptor<K, V> me)
+    {
+        adjustListForGet(me);
+    }
+
+    /**
+     * Remove element from control structure
+     * (guarded by the lock)
+     *
+     * @param me the memory element descriptor
+     */
+    @Override
+    protected void lockedRemoveElement(final MemoryElementDescriptor<K, V> me)
+    {
+        list.remove(me);
+    }
+
+    /**
+     * Removes all cached items from the cache control structures.
+     * (guarded by the lock)
+     */
+    @Override
+    protected void lockedRemoveAll()
+    {
+        list.removeAll();
+    }
+
+    // --------------------------- internal methods (linked list implementation)
+    /**
+     * Adds a new node to the start of the link list.
+     * <p>
+     *
+     * @param ce
+     *            The feature to be added to the First
+     * @return MemoryElementDescriptor
+     */
+    protected MemoryElementDescriptor<K, V> addFirst(final ICacheElement<K, V> ce)
+    {
+        lock.lock();
+        try
+        {
+            final MemoryElementDescriptor<K, V> me = new MemoryElementDescriptor<>(ce);
+            list.addFirst(me);
+            if ( log.isTraceEnabled() )
+            {
+                verifyCache(ce.getKey());
+            }
+            return me;
+        }
+        finally
+        {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Adds a new node to the end of the link list.
+     * <p>
+     *
+     * @param ce
+     *            The feature to be added to the First
+     * @return MemoryElementDescriptor
+     */
+    protected MemoryElementDescriptor<K, V> addLast(final ICacheElement<K, V> ce)
+    {
+        lock.lock();
+        try
+        {
+            final MemoryElementDescriptor<K, V> me = new MemoryElementDescriptor<>(ce);
+            list.addLast(me);
+            if ( log.isTraceEnabled() )
+            {
+                verifyCache(ce.getKey());
+            }
+            return me;
+        }
+        finally
+        {
+            lock.unlock();
+        }
+    }
+
+    // ---------------------------------------------------------- debug methods
+
+    /**
+     * Dump the cache entries from first to list for debugging.
+     */
+    private void dumpCacheEntries()
+    {
+        log.trace("dumpingCacheEntries");
+        for (MemoryElementDescriptor<K, V> me = list.getFirst(); me != null; me = (MemoryElementDescriptor<K, V>) me.next)
+        {
+            log.trace("dumpCacheEntries> key={0}, val={1}",
+                    me.getCacheElement().getKey(), me.getCacheElement().getVal());
+        }
+    }
+
+    /**
+     * Checks to see if all the items that should be in the cache are. Checks consistency between
+     * List and map.
+     */
+    private void verifyCache()
+    {
+        boolean found = false;
+        log.trace("verifycache[{0}]: map contains {1} elements, linked list "
+                + "contains {2} elements", getCacheName(), map.size(),
+                list.size());
+        log.trace("verifycache: checking linked list by key ");
+        for (MemoryElementDescriptor<K, V> li = list.getFirst(); li != null; li = (MemoryElementDescriptor<K, V>) li.next)
+        {
+            final K key = li.getCacheElement().getKey();
+            if (!map.containsKey(key))
+            {
+                log.error("verifycache[{0}]: map does not contain key : {1}",
+                        getCacheName(), key);
+                log.error("key class={0}", key.getClass());
+                log.error("key hashcode={0}", key.hashCode());
+                log.error("key toString={0}", key.toString());
+                if (key instanceof GroupAttrName)
+                {
+                    final GroupAttrName<?> name = (GroupAttrName<?>) key;
+                    log.error("GroupID hashcode={0}", name.groupId.hashCode());
+                    log.error("GroupID.class={0}", name.groupId.getClass());
+                    log.error("AttrName hashcode={0}", name.attrName.hashCode());
+                    log.error("AttrName.class={0}", name.attrName.getClass());
+                }
+                dumpMap();
+            }
+            else if (map.get(key) == null)
+            {
+                log.error("verifycache[{0}]: linked list retrieval returned "
+                        + "null for key: {1}", getCacheName(), key);
+            }
+        }
+
+        log.trace("verifycache: checking linked list by value ");
+        for (MemoryElementDescriptor<K, V> li = list.getFirst(); li != null; li = (MemoryElementDescriptor<K, V>) li.next)
+        {
+            if (!map.containsValue(li))
+            {
+                log.error("verifycache[{0}]: map does not contain value: {1}",
+                        getCacheName(), li);
+                dumpMap();
+            }
+        }
+
+        log.trace("verifycache: checking via keysets!");
+        for (final Object val : map.keySet())
+        {
+            found = false;
+
+            for (MemoryElementDescriptor<K, V> li = list.getFirst(); li != null; li = (MemoryElementDescriptor<K, V>) li.next)
+            {
+                if (val.equals(li.getCacheElement().getKey()))
+                {
+                    found = true;
+                    break;
+                }
+            }
+            if (!found)
+            {
+                log.error("verifycache[{0}]: key not found in list : {1}",
+                        getCacheName(), val);
+                dumpCacheEntries();
+                if (map.containsKey(val))
+                {
+                    log.error("verifycache: map contains key");
+                }
+                else
+                {
+                    log.error("verifycache: map does NOT contain key, what the HECK!");
+                }
+            }
+        }
+    }
+
+    /**
+     * Logs an error if an element that should be in the cache is not.
+     * <p>
+     *
+     * @param key
+     */
+    private void verifyCache(final K key)
+    {
+        boolean found = false;
+
+        // go through the linked list looking for the key
+        for (MemoryElementDescriptor<K, V> li = list.getFirst(); li != null; li = (MemoryElementDescriptor<K, V>) li.next)
+        {
+            if (li.getCacheElement().getKey() == key)
+            {
+                found = true;
+                log.trace("verifycache(key) key match: {0}", key);
+                break;
+            }
+        }
+        if (!found)
+        {
+            log.error("verifycache(key)[{0}], couldn't find key! : {1}",
+                    getCacheName(), key);
+        }
+    }
+
+    /**
+     * This returns semi-structured information on the memory cache, such as the size, put count,
+     * hit count, and miss count.
+     * <p>
+     *
+     * @see org.apache.commons.jcs3.engine.memory.behavior.IMemoryCache#getStatistics()
+     */
+    @Override
+    public IStats getStatistics()
+    {
+        final IStats stats = super.getStatistics();
+        stats.setTypeName( /* add algorithm name */"Memory Cache");
+
+        final List<IStatElement<?>> elems = stats.getStatElements();
+
+        elems.add(new StatElement<>("List Size", Integer.valueOf(list.size())));
+
+        return stats;
+    }
+}
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPCleanupRunner.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPCleanupRunner.java
index d741d421..5fa2c904 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPCleanupRunner.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPCleanupRunner.java
@@ -1,56 +1,56 @@
-package org.apache.commons.jcs3.utils.discovery;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * This class periodically check the lastHeardFrom time on the services.
- * <p>
- * If they exceed the configurable limit, it removes them from the set.
- * <p>
- * @author Aaron Smuts
- * @deprecated Functionality moved to UDPDiscoveryService
- */
-@Deprecated
-public class UDPCleanupRunner
-    implements Runnable
-{
-    /** UDP discovery service */
-    private final UDPDiscoveryService discoveryService;
-
-    /**
-     * @param service UDPDiscoveryService
-     */
-    public UDPCleanupRunner( final UDPDiscoveryService service )
-    {
-        this.discoveryService = service;
-    }
-
-    /**
-     * This goes through the list of services and removes those that we haven't heard from in longer
-     * than the max idle time.
-     * <p>
-     * @see java.lang.Runnable#run()
-     */
-    @Override
-    public void run()
-    {
-        discoveryService.cleanup();
-    }
-}
+package org.apache.commons.jcs3.utils.discovery;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * This class periodically check the lastHeardFrom time on the services.
+ * <p>
+ * If they exceed the configurable limit, it removes them from the set.
+ * <p>
+ * @author Aaron Smuts
+ * @deprecated Functionality moved to UDPDiscoveryService
+ */
+@Deprecated
+public class UDPCleanupRunner
+    implements Runnable
+{
+    /** UDP discovery service */
+    private final UDPDiscoveryService discoveryService;
+
+    /**
+     * @param service UDPDiscoveryService
+     */
+    public UDPCleanupRunner( final UDPDiscoveryService service )
+    {
+        this.discoveryService = service;
+    }
+
+    /**
+     * This goes through the list of services and removes those that we haven't heard from in longer
+     * than the max idle time.
+     * <p>
+     * @see Runnable#run()
+     */
+    @Override
+    public void run()
+    {
+        discoveryService.cleanup();
+    }
+}
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/struct/AbstractLRUMap.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/struct/AbstractLRUMap.java
index b6c9ad29..fbf1268f 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/struct/AbstractLRUMap.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/struct/AbstractLRUMap.java
@@ -1,535 +1,535 @@
-package org.apache.commons.jcs3.utils.struct;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
-
-import org.apache.commons.jcs3.engine.control.group.GroupAttrName;
-import org.apache.commons.jcs3.engine.stats.StatElement;
-import org.apache.commons.jcs3.engine.stats.Stats;
-import org.apache.commons.jcs3.engine.stats.behavior.IStatElement;
-import org.apache.commons.jcs3.engine.stats.behavior.IStats;
-import org.apache.commons.jcs3.log.Log;
-import org.apache.commons.jcs3.log.LogManager;
-
-/**
- * This is a simple LRUMap. It implements most of the map methods. It is not recommended that you
- * use any but put, get, remove, and clear.
- * <p>
- * Children can implement the processRemovedLRU method if they want to handle the removal of the
- * least recently used item.
- * <p>
- * This class was abstracted out of the LRU Memory cache. Put, remove, and get should be thread
- * safe. It uses a hashtable and our own double linked list.
- * <p>
- * Locking is done on the instance.
- * <p>
- * @author aaron smuts
- */
-public abstract class AbstractLRUMap<K, V>
-    implements Map<K, V>
-{
-    /** The logger */
-    private static final Log log = LogManager.getLog( AbstractLRUMap.class );
-
-    /** double linked list for lru */
-    private final DoubleLinkedList<LRUElementDescriptor<K, V>> list;
-
-    /** Map where items are stored by key. */
-    private final Map<K, LRUElementDescriptor<K, V>> map;
-
-    /** lock to keep map and list synchronous */
-    private final Lock lock = new ReentrantLock();
-
-    /** stats */
-    private long hitCnt;
-
-    /** stats */
-    private long missCnt;
-
-    /** stats */
-    private long putCnt;
-
-    /**
-     * This creates an unbounded version. Setting the max objects will result in spooling on
-     * subsequent puts.
-     */
-    public AbstractLRUMap()
-    {
-        list = new DoubleLinkedList<>();
-
-        // normal hashtable is faster for
-        // sequential keys.
-        map = new ConcurrentHashMap<>();
-    }
-
-
-    /**
-     * This simply returns the number of elements in the map.
-     * <p>
-     * @see java.util.Map#size()
-     */
-    @Override
-    public int size()
-    {
-        return map.size();
-    }
-
-    /**
-     * This removes all the items. It clears the map and the double linked list.
-     * <p>
-     * @see java.util.Map#clear()
-     */
-    @Override
-    public void clear()
-    {
-        lock.lock();
-        try
-        {
-            map.clear();
-            list.removeAll();
-        }
-        finally
-        {
-            lock.unlock();
-        }
-    }
-
-    /**
-     * Returns true if the map is empty.
-     * <p>
-     * @see java.util.Map#isEmpty()
-     */
-    @Override
-    public boolean isEmpty()
-    {
-        return map.isEmpty();
-    }
-
-    /**
-     * Returns true if the map contains an element for the supplied key.
-     * <p>
-     * @see java.util.Map#containsKey(java.lang.Object)
-     */
-    @Override
-    public boolean containsKey( final Object key )
-    {
-        return map.containsKey( key );
-    }
-
-    /**
-     * This is an expensive operation that determines if the object supplied is mapped to any key.
-     * <p>
-     * @see java.util.Map#containsValue(java.lang.Object)
-     */
-    @Override
-    public boolean containsValue( final Object value )
-    {
-        return map.containsValue( value );
-    }
-
-    /**
-     * @return map.values();
-     */
-    @Override
-    public Collection<V> values()
-    {
-        return map.values().stream()
-                .map(LRUElementDescriptor::getPayload)
-                .collect(Collectors.toList());
-    }
-
-    /**
-     * @param source
-     */
-    @Override
-    public void putAll( final Map<? extends K, ? extends V> source )
-    {
-        if ( source != null )
-        {
-            source.forEach(this::put);
-        }
-    }
-
-    /**
-     * @param key
-     * @return Object
-     */
-    @Override
-    public V get( final Object key )
-    {
-        final V retVal;
-
-        log.debug( "getting item  for key {0}", key );
-
-        final LRUElementDescriptor<K, V> me = map.get( key );
-
-        if ( me == null )
-        {
-            missCnt++;
-            retVal = null;
-        }
-        else
-        {
-            hitCnt++;
-            retVal = me.getPayload();
-            list.makeFirst( me );
-        }
-
-        if ( me == null )
-        {
-            log.debug( "LRUMap miss for {0}", key );
-        }
-        else
-        {
-            log.debug( "LRUMap hit for {0}", key );
-        }
-
-        // verifyCache();
-        return retVal;
-    }
-
-    /**
-     * This gets an element out of the map without adjusting it's position in the LRU. In other
-     * words, this does not count as being used. If the element is the last item in the list, it
-     * will still be the last time in the list.
-     * <p>
-     * @param key
-     * @return Object
-     */
-    public V getQuiet( final Object key )
-    {
-        V ce = null;
-        final LRUElementDescriptor<K, V> me = map.get( key );
-
-        if ( me != null )
-        {
-            ce = me.getPayload();
-        }
-
-        if ( me == null )
-        {
-            log.debug( "LRUMap quiet miss for {0}", key );
-        }
-        else
-        {
-            log.debug( "LRUMap quiet hit for {0}", key );
-        }
-
-        return ce;
-    }
-
-    /**
-     * @param key
-     * @return Object removed
-     */
-    @Override
-    public V remove( final Object key )
-    {
-        log.debug( "removing item for key: {0}", key );
-
-        // remove single item.
-        lock.lock();
-        try
-        {
-            final LRUElementDescriptor<K, V> me = map.remove(key);
-
-            if (me != null)
-            {
-                list.remove(me);
-                return me.getPayload();
-            }
-        }
-        finally
-        {
-            lock.unlock();
-        }
-
-        return null;
-    }
-
-    /**
-     * @param key
-     * @param value
-     * @return Object
-     */
-    @Override
-    public V put(final K key, final V value)
-    {
-        putCnt++;
-
-        LRUElementDescriptor<K, V> old = null;
-        final LRUElementDescriptor<K, V> me = new LRUElementDescriptor<>(key, value);
-
-        lock.lock();
-        try
-        {
-            list.addFirst( me );
-            old = map.put(key, me);
-
-            // If the node was the same as an existing node, remove it.
-            if ( old != null && key.equals(old.getKey()))
-            {
-                list.remove( old );
-            }
-        }
-        finally
-        {
-            lock.unlock();
-        }
-
-        // If the element limit is reached, we need to spool
-        if (shouldRemove())
-        {
-            log.debug( "In memory limit reached, removing least recently used." );
-
-            // The spool will put them in a disk event queue, so there is no
-            // need to pre-queue the queuing. This would be a bit wasteful
-            // and wouldn't save much time in this synchronous call.
-            while (shouldRemove())
-            {
-                lock.lock();
-                try
-                {
-                    final LRUElementDescriptor<K, V> last = list.getLast();
-                    if (last == null) {
-                        verifyCache();
-                        throw new Error("update: last is null!");
-                    }
-                    processRemovedLRU(last.getKey(), last.getPayload());
-                    if (map.remove(last.getKey()) == null)
-                    {
-                        log.warn("update: remove failed for key: {0}",
-                                last::getKey);
-                        verifyCache();
-                    }
-                    list.removeLast();
-                }
-                finally
-                {
-                    lock.unlock();
-                }
-            }
-
-            log.debug( "update: After spool map size: {0}", map::size);
-            if ( map.size() != list.size() )
-            {
-                log.error("update: After spool, size mismatch: map.size() = {0}, "
-                        + "linked list size = {1}",
-                        map::size, list::size);
-            }
-        }
-
-        if ( old != null )
-        {
-            return old.getPayload();
-        }
-        return null;
-    }
-
-    protected abstract boolean shouldRemove();
-
-    /**
-     * Dump the cache entries from first to list for debugging.
-     */
-    @SuppressWarnings("unchecked") // No generics for public fields
-    public void dumpCacheEntries()
-    {
-        if (log.isTraceEnabled())
-        {
-            log.trace("dumpingCacheEntries");
-            for (LRUElementDescriptor<K, V> me = list.getFirst(); me != null; me = (LRUElementDescriptor<K, V>) me.next)
-            {
-                log.trace("dumpCacheEntries> key={0}, val={1}", me.getKey(), me.getPayload());
-            }
-        }
-    }
-
-    /**
-     * Dump the cache map for debugging.
-     */
-    public void dumpMap()
-    {
-        if (log.isTraceEnabled())
-        {
-            log.trace("dumpingMap");
-            map.forEach((key, value) -> log.trace("dumpMap> key={0}, val={1}", key, value.getPayload()));
-        }
-    }
-
-    /**
-     * Checks to see if all the items that should be in the cache are. Checks consistency between
-     * List and map.
-     */
-    @SuppressWarnings("unchecked") // No generics for public fields
-    protected void verifyCache()
-    {
-        if ( !log.isTraceEnabled() )
-        {
-            return;
-        }
-
-        log.trace( "verifycache: mapContains {0} elements, linked list "
-                + "contains {1} elements", map.size(), list.size() );
-        log.trace( "verifycache: checking linked list by key" );
-        for (LRUElementDescriptor<K, V> li = list.getFirst(); li != null; li = (LRUElementDescriptor<K, V>) li.next )
-        {
-            final K key = li.getKey();
-            if ( !map.containsKey( key ) )
-            {
-                log.error( "verifycache: map does not contain key : {0}", li.getKey() );
-                log.error( "li.hashcode={0}", li.getKey().hashCode() );
-                log.error( "key class={0}", key.getClass() );
-                log.error( "key hashcode={0}", key.hashCode() );
-                log.error( "key toString={0}", key.toString() );
-                if ( key instanceof GroupAttrName )
-                {
-                    final GroupAttrName<?> name = (GroupAttrName<?>) key;
-                    log.error( "GroupID hashcode={0}", name.groupId.hashCode() );
-                    log.error( "GroupID.class={0}", name.groupId.getClass() );
-                    log.error( "AttrName hashcode={0}", name.attrName.hashCode() );
-                    log.error( "AttrName.class={0}", name.attrName.getClass() );
-                }
-                dumpMap();
-            }
-            else if ( map.get( li.getKey() ) == null )
-            {
-                log.error( "verifycache: linked list retrieval returned null for key: {0}",
-                        li.getKey() );
-            }
-        }
-
-        log.trace( "verifycache: checking linked list by value " );
-        for (LRUElementDescriptor<K, V> li3 = list.getFirst(); li3 != null; li3 = (LRUElementDescriptor<K, V>) li3.next )
-        {
-            if (!map.containsValue(li3))
-            {
-                log.error( "verifycache: map does not contain value : {0}", li3 );
-                dumpMap();
-            }
-        }
-
-        log.trace( "verifycache: checking via keysets!" );
-        map.keySet().stream()
-            .filter(key -> {
-                for (LRUElementDescriptor<K, V> li2 = list.getFirst(); li2 != null; li2 = (LRUElementDescriptor<K, V>) li2.next )
-                {
-                    if ( key.equals( li2.getKey() ) )
-                    {
-                        return true;
-                    }
-                }
-
-                log.error( "verifycache: key not found in list : {0}", key );
-                dumpCacheEntries();
-                if ( map.containsKey( key ) )
-                {
-                    log.error( "verifycache: map contains key" );
-                }
-                else
-                {
-                    log.error( "verifycache: map does NOT contain key, what the HECK!" );
-                }
-
-                return false;
-            })
-            .findFirst();
-    }
-
-    /**
-     * This is called when an item is removed from the LRU. We just log some information.
-     * <p>
-     * Children can implement this method for special behavior.
-     * @param key
-     * @param value
-     */
-    protected void processRemovedLRU(final K key, final V value )
-    {
-        log.debug( "Removing key: [{0}] from LRUMap store, value = [{1}]", key, value );
-        log.debug( "LRUMap store size: \"{0}\".", this.size() );
-    }
-
-    /**
-     * @return IStats
-     */
-    public IStats getStatistics()
-    {
-        final IStats stats = new Stats();
-        stats.setTypeName( "LRUMap" );
-
-        final ArrayList<IStatElement<?>> elems = new ArrayList<>();
-
-        elems.add(new StatElement<>( "List Size", Integer.valueOf(list.size()) ) );
-        elems.add(new StatElement<>( "Map Size", Integer.valueOf(map.size()) ) );
-        elems.add(new StatElement<>( "Put Count", Long.valueOf(putCnt) ) );
-        elems.add(new StatElement<>( "Hit Count", Long.valueOf(hitCnt) ) );
-        elems.add(new StatElement<>( "Miss Count", Long.valueOf(missCnt) ) );
-
-        stats.setStatElements( elems );
-
-        return stats;
-    }
-
-    /**
-     * This returns a set of entries. Our LRUMapEntry is used since the value stored in the
-     * underlying map is a node in the double linked list. We wouldn't want to return this to the
-     * client, so we construct a new entry with the payload of the node.
-     * <p>
-     * TODO we should return out own set wrapper, so we can avoid the extra object creation if it
-     * isn't necessary.
-     * <p>
-     * @see java.util.Map#entrySet()
-     */
-    @Override
-    public Set<Map.Entry<K, V>> entrySet()
-    {
-        lock.lock();
-        try
-        {
-            return map.entrySet().stream()
-                    .map(entry -> new AbstractMap.SimpleEntry<>(
-                            entry.getKey(), entry.getValue().getPayload()))
-                    .collect(Collectors.toSet());
-        }
-        finally
-        {
-            lock.unlock();
-        }
-    }
-
-    /**
-     * @return map.keySet();
-     */
-    @Override
-    public Set<K> keySet()
-    {
-        return map.values().stream()
-                .map(LRUElementDescriptor::getKey)
-                .collect(Collectors.toSet());
-    }
-}
+package org.apache.commons.jcs3.utils.struct;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
... 694 lines suppressed ...