You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by tv...@apache.org on 2019/05/28 13:48:56 UTC

[commons-jcs] 08/09: Use lambdas for Runnables

This is an automated email from the ASF dual-hosted git repository.

tv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-jcs.git

commit b8486dccdc7bec2314a9d7e0b3d445c34e5169ad
Author: Thomas Vandahl <tv...@apache.org>
AuthorDate: Tue May 28 15:46:27 2019 +0200

    Use lambdas for Runnables
---
 .../jcs/auxiliary/disk/AbstractDiskCache.java      | 37 +++++-----
 .../jcs/auxiliary/disk/block/BlockDiskCache.java   | 29 +-------
 .../auxiliary/disk/indexed/IndexedDiskCache.java   | 78 ++++++++--------------
 .../jcs/utils/discovery/UDPDiscoveryReceiver.java  | 36 ++++------
 .../jcs/utils/discovery/UDPDiscoveryService.java   |  4 +-
 5 files changed, 62 insertions(+), 122 deletions(-)

diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/AbstractDiskCache.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/AbstractDiskCache.java
index 6f42f20..a536876 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/AbstractDiskCache.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/AbstractDiskCache.java
@@ -437,32 +437,27 @@ public abstract class AbstractDiskCache<K, V>
     public final void dispose()
         throws IOException
     {
-        Runnable disR = new Runnable()
+        Thread t = new Thread(() ->
         {
-            @Override
-            public void run()
+            boolean keepGoing = true;
+            // long total = 0;
+            long interval = 100;
+            while ( keepGoing )
             {
-                boolean keepGoing = true;
-                // long total = 0;
-                long interval = 100;
-                while ( keepGoing )
+                keepGoing = !cacheEventQueue.isEmpty();
+                try
                 {
-                    keepGoing = !cacheEventQueue.isEmpty();
-                    try
-                    {
-                        Thread.sleep( interval );
-                        // total += interval;
-                        // log.info( "total = " + total );
-                    }
-                    catch ( InterruptedException e )
-                    {
-                        break;
-                    }
+                    Thread.sleep( interval );
+                    // total += interval;
+                    // log.info( "total = " + total );
+                }
+                catch ( InterruptedException e )
+                {
+                    break;
                 }
-                log.info( "No longer waiting for event queue to finish: " + cacheEventQueue.getStatistics() );
             }
-        };
-        Thread t = new Thread( disR );
+            log.info( "No longer waiting for event queue to finish: " + cacheEventQueue.getStatistics() );
+        });
         t.start();
         // wait up to 60 seconds for dispose and then quit if not done.
         try
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/block/BlockDiskCache.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/block/BlockDiskCache.java
index 2eae22b..06f89ba 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/block/BlockDiskCache.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/block/BlockDiskCache.java
@@ -171,15 +171,7 @@ public class BlockDiskCache<K, V>
         // TODO we might need to stagger this a bit.
         if ( this.blockDiskCacheAttributes.getKeyPersistenceIntervalSeconds() > 0 )
         {
-            future = scheduledExecutor.scheduleAtFixedRate(
-                    new Runnable()
-                    {
-                        @Override
-                        public void run()
-                        {
-                            keyStore.saveKeys();
-                        }
-                    },
+            future = scheduledExecutor.scheduleAtFixedRate(keyStore::saveKeys,
                     this.blockDiskCacheAttributes.getKeyPersistenceIntervalSeconds(),
                     this.blockDiskCacheAttributes.getKeyPersistenceIntervalSeconds(),
                     TimeUnit.SECONDS);
@@ -574,22 +566,7 @@ public class BlockDiskCache<K, V>
     @Override
     public void processDispose()
     {
-        Runnable disR = new Runnable()
-        {
-            @Override
-            public void run()
-            {
-                try
-                {
-                    disposeInternal();
-                }
-                catch ( InterruptedException e )
-                {
-                    log.warn( "Interrupted while diposing." );
-                }
-            }
-        };
-        Thread t = new Thread( disR, "BlockDiskCache-DisposalThread" );
+        Thread t = new Thread(this::disposeInternal, "BlockDiskCache-DisposalThread" );
         t.start();
         // wait up to 60 seconds for dispose and then quit if not done.
         try
@@ -604,10 +581,8 @@ public class BlockDiskCache<K, V>
 
     /**
      * Internal method that handles the disposal.
-     * @throws InterruptedException
      */
     protected void disposeInternal()
-        throws InterruptedException
     {
         if ( !isAlive() )
         {
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/indexed/IndexedDiskCache.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/indexed/IndexedDiskCache.java
index 825376e..c288784 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/indexed/IndexedDiskCache.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/indexed/IndexedDiskCache.java
@@ -79,7 +79,7 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V>
     private IndexedDisk keyFile;
 
     /** Map containing the keys and disk offsets. */
-    private Map<K, IndexedDiskElementDescriptor> keyHash;
+    private final Map<K, IndexedDiskElementDescriptor> keyHash;
 
     /** The maximum number of keys that we will keep in memory. */
     private final int maxKeySize;
@@ -112,11 +112,10 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V>
     private boolean queueInput = false;
 
     /** list where puts made during optimization are made */
-    private final ConcurrentSkipListSet<IndexedDiskElementDescriptor> queuedPutList =
-            new ConcurrentSkipListSet<IndexedDiskElementDescriptor>(new PositionComparator());
+    private final ConcurrentSkipListSet<IndexedDiskElementDescriptor> queuedPutList;
 
     /** RECYLCE BIN -- array of empty spots */
-    private ConcurrentSkipListSet<IndexedDiskElementDescriptor> recycle;
+    private final ConcurrentSkipListSet<IndexedDiskElementDescriptor> recycle;
 
     /** User configurable parameters */
     private final IndexedDiskCacheAttributes cattr;
@@ -174,14 +173,15 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V>
         this.diskLimitType = cattr.getDiskLimitType();
         // Make a clean file name
         this.fileName = getCacheName().replaceAll("[^a-zA-Z0-9-_\\.]", "_");
+        this.keyHash = createInitialKeyMap();
+        this.queuedPutList = new ConcurrentSkipListSet<>(new PositionComparator());
+        this.recycle = new ConcurrentSkipListSet<>();
 
         try
         {
-            initializeRecycleBin();
             initializeFileSystem(cattr);
             initializeKeysAndData(cattr);
 
-
             // Initialization finished successfully, so set alive to true.
             setAlive(true);
             if (log.isInfoEnabled())
@@ -263,7 +263,7 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V>
      */
     private void initializeEmptyStore() throws IOException
     {
-        initializeKeyMap();
+        this.keyHash.clear();
 
         if (dataFile.length() > 0)
         {
@@ -321,8 +321,8 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V>
 
         try
         {
-            // create a key map to use.
-            initializeKeyMap();
+            // clear a key map to use.
+            keyHash.clear();
 
             HashMap<K, IndexedDiskElementDescriptor> keys = keyFile.readObject(
                 new IndexedDiskElementDescriptor(0, (int) keyFile.length() - IndexedDisk.HEADER_SIZE_BYTES));
@@ -961,13 +961,12 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V>
             dataFile = new IndexedDisk(new File(rafDir, fileName + ".data"), getElementSerializer());
             keyFile = new IndexedDisk(new File(rafDir, fileName + ".key"), getElementSerializer());
 
-            initializeRecycleBin();
-
-            initializeKeyMap();
+            this.recycle.clear();
+            this.keyHash.clear();
         }
         catch (IOException e)
         {
-            log.error(logCacheName + "Failure reseting state", e);
+            log.error(logCacheName + "Failure resetting state", e);
         }
         finally
         {
@@ -976,29 +975,22 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V>
     }
 
     /**
-     * If the maxKeySize is < 0, use 5000, no way to have an unlimited recycle bin right now, or one
-     * less than the mazKeySize.
-     */
-    private void initializeRecycleBin()
-    {
-        recycle = new ConcurrentSkipListSet<IndexedDiskElementDescriptor>();
-    }
-
-    /**
      * Create the map for keys that contain the index position on disk.
+     * 
+     * @return a new empty Map for keys and IndexedDiskElementDescriptors
      */
-    private void initializeKeyMap()
+    private Map<K, IndexedDiskElementDescriptor> createInitialKeyMap()
     {
-        keyHash = null;
+        Map<K, IndexedDiskElementDescriptor> keyMap = null;
         if (maxKeySize >= 0)
         {
             if (this.diskLimitType == DiskLimitType.COUNT)
             {
-                keyHash = new LRUMapCountLimited(maxKeySize);
+                keyMap = new LRUMapCountLimited(maxKeySize);
             }
             else
             {
-                keyHash = new LRUMapSizeLimited(maxKeySize);
+                keyMap = new LRUMapSizeLimited(maxKeySize);
             }
 
             if (log.isInfoEnabled())
@@ -1009,13 +1001,15 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V>
         else
         {
             // If no max size, use a plain map for memory and processing efficiency.
-            keyHash = new HashMap<K, IndexedDiskElementDescriptor>();
+            keyMap = new HashMap<>();
             // keyHash = Collections.synchronizedMap( new HashMap() );
             if (log.isInfoEnabled())
             {
                 log.info(logCacheName + "Set maxKeySize to unlimited'");
             }
         }
+        
+        return keyMap;
     }
 
     /**
@@ -1030,15 +1024,7 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V>
         ICacheEvent<String> cacheEvent = createICacheEvent(getCacheName(), "none", ICacheEventLogger.DISPOSE_EVENT);
         try
         {
-            Runnable disR = new Runnable()
-            {
-                @Override
-                public void run()
-                {
-                    disposeInternal();
-                }
-            };
-            Thread t = new Thread(disR, "IndexedDiskCache-DisposalThread");
+            Thread t = new Thread(this::disposeInternal, "IndexedDiskCache-DisposalThread");
             t.start();
             // wait up to 60 seconds for dispose and then quit if not done.
             try
@@ -1076,7 +1062,7 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V>
             // Join with the current optimization thread.
             if (log.isDebugEnabled())
             {
-                log.debug(logCacheName + "In dispose, optimization already " + "in progress; waiting for completion.");
+                log.debug(logCacheName + "In dispose, optimization already in progress; waiting for completion.");
             }
             try
             {
@@ -1136,14 +1122,14 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V>
 
             try
             {
-                this.adjustBytesFree(ded, true);
+                adjustBytesFree(ded, true);
 
                 if (doRecycle)
                 {
                     recycle.add(ded);
                     if (log.isDebugEnabled())
                     {
-                        log.debug(logCacheName + "recycled ded" + ded);
+                        log.debug(logCacheName + "recycled ded " + ded);
                     }
 
                 }
@@ -1178,15 +1164,9 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V>
                 {
                     if (currentOptimizationThread == null)
                     {
-                        currentOptimizationThread = new Thread(new Runnable()
-                        {
-                            @Override
-                            public void run()
-                            {
-                                optimizeFile();
-
-                                currentOptimizationThread = null;
-                            }
+                        currentOptimizationThread = new Thread(() -> {
+                            optimizeFile();
+                            currentOptimizationThread = null;
                         }, "IndexedDiskCache-OptimizationThread");
                     }
                 }
@@ -1279,7 +1259,7 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V>
             // RESTORE NORMAL OPERATION
             removeCount = 0;
             resetBytesFree();
-            initializeRecycleBin();
+            this.recycle.clear();
             queuedPutList.clear();
             queueInput = false;
             // turn recycle back on.
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryReceiver.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryReceiver.java
index 0f5f510..b519159 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryReceiver.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryReceiver.java
@@ -26,6 +26,7 @@ import java.net.DatagramPacket;
 import java.net.InetAddress;
 import java.net.MulticastSocket;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.jcs.engine.CacheInfo;
 import org.apache.commons.jcs.engine.behavior.IShutdownObserver;
@@ -57,19 +58,19 @@ public class UDPDiscoveryReceiver
     private static final int maxPoolSize = 2;
 
     /** The processor */
-    private ExecutorService pooledExecutor = null;
+    private final ExecutorService pooledExecutor;
 
     /** number of messages received. For debugging and testing. */
-    private int cnt = 0;
+    private AtomicInteger cnt = new AtomicInteger(0);
 
     /** Service to get cache names and handle request broadcasts */
-    private UDPDiscoveryService service = null;
+    private final UDPDiscoveryService service;
 
     /** Address */
-    private String multicastAddressString = "";
+    private final String multicastAddressString;
 
     /** The port */
-    private int multicastPort = 0;
+    private final int multicastPort;
 
     /** Is it shutdown. */
     private boolean shutdown = false;
@@ -92,7 +93,7 @@ public class UDPDiscoveryReceiver
         this.multicastPort = multicastPort;
 
         // create a small thread pool to handle a barrage
-        pooledExecutor = ThreadPoolManager.getInstance().createPool(
+        this.pooledExecutor = ThreadPoolManager.getInstance().createPool(
         		new PoolConfiguration(false, 0, maxPoolSize, maxPoolSize, 0, WhenBlockedPolicy.DISCARDOLDEST, maxPoolSize),
         		"JCS-UDPDiscoveryReceiver-", Thread.MIN_PRIORITY);
 
@@ -101,16 +102,7 @@ public class UDPDiscoveryReceiver
             log.info( "Constructing listener, [" + this.multicastAddressString + ":" + this.multicastPort + "]" );
         }
 
-        try
-        {
-            createSocket( this.multicastAddressString, this.multicastPort );
-        }
-        catch ( IOException ioe )
-        {
-            // consider eating this so we can go on, or constructing the socket
-            // later
-            throw ioe;
-        }
+        createSocket( this.multicastAddressString, this.multicastPort );
     }
 
     /**
@@ -165,9 +157,8 @@ public class UDPDiscoveryReceiver
                 log.debug( "Received packet from address [" + packet.getSocketAddress() + "]" );
             }
 
-            final ByteArrayInputStream byteStream = new ByteArrayInputStream( mBuffer, 0, packet.getLength() );
-
-            try (ObjectInputStream objectStream = new ObjectInputStreamClassLoaderAware( byteStream, null ))
+            try (ByteArrayInputStream byteStream = new ByteArrayInputStream(mBuffer, 0, packet.getLength());
+                 ObjectInputStream objectStream = new ObjectInputStreamClassLoaderAware(byteStream, null))
             {
                 obj = objectStream.readObject();
             }
@@ -204,8 +195,7 @@ public class UDPDiscoveryReceiver
             {
                 Object obj = waitForMessage();
 
-                // not thread safe, but just for debugging
-                cnt++;
+                cnt.incrementAndGet();
 
                 if ( log.isDebugEnabled() )
                 {
@@ -261,7 +251,7 @@ public class UDPDiscoveryReceiver
      */
     public void setCnt( int cnt )
     {
-        this.cnt = cnt;
+        this.cnt.set(cnt);
     }
 
     /**
@@ -269,7 +259,7 @@ public class UDPDiscoveryReceiver
      */
     public int getCnt()
     {
-        return cnt;
+        return cnt.get();
     }
 
     /**
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryService.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryService.java
index b2047d7..15b635f 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryService.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryService.java
@@ -300,8 +300,8 @@ public class UDPDiscoveryService
      */
     public void startup()
     {
-        udpReceiverThread = new Thread( receiver );
-        udpReceiverThread.setDaemon( true );
+        udpReceiverThread = new Thread(receiver);
+        udpReceiverThread.setDaemon(true);
         // udpReceiverThread.setName( t.getName() + "--UDPReceiver" );
         udpReceiverThread.start();
     }