You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by tv...@apache.org on 2019/05/28 13:48:56 UTC
[commons-jcs] 08/09: Use lambdas for Runnables
This is an automated email from the ASF dual-hosted git repository.
tv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-jcs.git
commit b8486dccdc7bec2314a9d7e0b3d445c34e5169ad
Author: Thomas Vandahl <tv...@apache.org>
AuthorDate: Tue May 28 15:46:27 2019 +0200
Use lambdas for Runnables
---
.../jcs/auxiliary/disk/AbstractDiskCache.java | 37 +++++-----
.../jcs/auxiliary/disk/block/BlockDiskCache.java | 29 +-------
.../auxiliary/disk/indexed/IndexedDiskCache.java | 78 ++++++++--------------
.../jcs/utils/discovery/UDPDiscoveryReceiver.java | 36 ++++------
.../jcs/utils/discovery/UDPDiscoveryService.java | 4 +-
5 files changed, 62 insertions(+), 122 deletions(-)
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/AbstractDiskCache.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/AbstractDiskCache.java
index 6f42f20..a536876 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/AbstractDiskCache.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/AbstractDiskCache.java
@@ -437,32 +437,27 @@ public abstract class AbstractDiskCache<K, V>
public final void dispose()
throws IOException
{
- Runnable disR = new Runnable()
+ Thread t = new Thread(() ->
{
- @Override
- public void run()
+ boolean keepGoing = true;
+ // long total = 0;
+ long interval = 100;
+ while ( keepGoing )
{
- boolean keepGoing = true;
- // long total = 0;
- long interval = 100;
- while ( keepGoing )
+ keepGoing = !cacheEventQueue.isEmpty();
+ try
{
- keepGoing = !cacheEventQueue.isEmpty();
- try
- {
- Thread.sleep( interval );
- // total += interval;
- // log.info( "total = " + total );
- }
- catch ( InterruptedException e )
- {
- break;
- }
+ Thread.sleep( interval );
+ // total += interval;
+ // log.info( "total = " + total );
+ }
+ catch ( InterruptedException e )
+ {
+ break;
}
- log.info( "No longer waiting for event queue to finish: " + cacheEventQueue.getStatistics() );
}
- };
- Thread t = new Thread( disR );
+ log.info( "No longer waiting for event queue to finish: " + cacheEventQueue.getStatistics() );
+ });
t.start();
// wait up to 60 seconds for dispose and then quit if not done.
try
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/block/BlockDiskCache.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/block/BlockDiskCache.java
index 2eae22b..06f89ba 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/block/BlockDiskCache.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/block/BlockDiskCache.java
@@ -171,15 +171,7 @@ public class BlockDiskCache<K, V>
// TODO we might need to stagger this a bit.
if ( this.blockDiskCacheAttributes.getKeyPersistenceIntervalSeconds() > 0 )
{
- future = scheduledExecutor.scheduleAtFixedRate(
- new Runnable()
- {
- @Override
- public void run()
- {
- keyStore.saveKeys();
- }
- },
+ future = scheduledExecutor.scheduleAtFixedRate(keyStore::saveKeys,
this.blockDiskCacheAttributes.getKeyPersistenceIntervalSeconds(),
this.blockDiskCacheAttributes.getKeyPersistenceIntervalSeconds(),
TimeUnit.SECONDS);
@@ -574,22 +566,7 @@ public class BlockDiskCache<K, V>
@Override
public void processDispose()
{
- Runnable disR = new Runnable()
- {
- @Override
- public void run()
- {
- try
- {
- disposeInternal();
- }
- catch ( InterruptedException e )
- {
- log.warn( "Interrupted while diposing." );
- }
- }
- };
- Thread t = new Thread( disR, "BlockDiskCache-DisposalThread" );
+ Thread t = new Thread(this::disposeInternal, "BlockDiskCache-DisposalThread" );
t.start();
// wait up to 60 seconds for dispose and then quit if not done.
try
@@ -604,10 +581,8 @@ public class BlockDiskCache<K, V>
/**
* Internal method that handles the disposal.
- * @throws InterruptedException
*/
protected void disposeInternal()
- throws InterruptedException
{
if ( !isAlive() )
{
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/indexed/IndexedDiskCache.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/indexed/IndexedDiskCache.java
index 825376e..c288784 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/indexed/IndexedDiskCache.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/indexed/IndexedDiskCache.java
@@ -79,7 +79,7 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V>
private IndexedDisk keyFile;
/** Map containing the keys and disk offsets. */
- private Map<K, IndexedDiskElementDescriptor> keyHash;
+ private final Map<K, IndexedDiskElementDescriptor> keyHash;
/** The maximum number of keys that we will keep in memory. */
private final int maxKeySize;
@@ -112,11 +112,10 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V>
private boolean queueInput = false;
/** list where puts made during optimization are made */
- private final ConcurrentSkipListSet<IndexedDiskElementDescriptor> queuedPutList =
- new ConcurrentSkipListSet<IndexedDiskElementDescriptor>(new PositionComparator());
+ private final ConcurrentSkipListSet<IndexedDiskElementDescriptor> queuedPutList;
/** RECYLCE BIN -- array of empty spots */
- private ConcurrentSkipListSet<IndexedDiskElementDescriptor> recycle;
+ private final ConcurrentSkipListSet<IndexedDiskElementDescriptor> recycle;
/** User configurable parameters */
private final IndexedDiskCacheAttributes cattr;
@@ -174,14 +173,15 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V>
this.diskLimitType = cattr.getDiskLimitType();
// Make a clean file name
this.fileName = getCacheName().replaceAll("[^a-zA-Z0-9-_\\.]", "_");
+ this.keyHash = createInitialKeyMap();
+ this.queuedPutList = new ConcurrentSkipListSet<>(new PositionComparator());
+ this.recycle = new ConcurrentSkipListSet<>();
try
{
- initializeRecycleBin();
initializeFileSystem(cattr);
initializeKeysAndData(cattr);
-
// Initialization finished successfully, so set alive to true.
setAlive(true);
if (log.isInfoEnabled())
@@ -263,7 +263,7 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V>
*/
private void initializeEmptyStore() throws IOException
{
- initializeKeyMap();
+ this.keyHash.clear();
if (dataFile.length() > 0)
{
@@ -321,8 +321,8 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V>
try
{
- // create a key map to use.
- initializeKeyMap();
+ // clear a key map to use.
+ keyHash.clear();
HashMap<K, IndexedDiskElementDescriptor> keys = keyFile.readObject(
new IndexedDiskElementDescriptor(0, (int) keyFile.length() - IndexedDisk.HEADER_SIZE_BYTES));
@@ -961,13 +961,12 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V>
dataFile = new IndexedDisk(new File(rafDir, fileName + ".data"), getElementSerializer());
keyFile = new IndexedDisk(new File(rafDir, fileName + ".key"), getElementSerializer());
- initializeRecycleBin();
-
- initializeKeyMap();
+ this.recycle.clear();
+ this.keyHash.clear();
}
catch (IOException e)
{
- log.error(logCacheName + "Failure reseting state", e);
+ log.error(logCacheName + "Failure resetting state", e);
}
finally
{
@@ -976,29 +975,22 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V>
}
/**
- * If the maxKeySize is < 0, use 5000, no way to have an unlimited recycle bin right now, or one
- * less than the mazKeySize.
- */
- private void initializeRecycleBin()
- {
- recycle = new ConcurrentSkipListSet<IndexedDiskElementDescriptor>();
- }
-
- /**
* Create the map for keys that contain the index position on disk.
+ *
+ * @return a new empty Map for keys and IndexedDiskElementDescriptors
*/
- private void initializeKeyMap()
+ private Map<K, IndexedDiskElementDescriptor> createInitialKeyMap()
{
- keyHash = null;
+ Map<K, IndexedDiskElementDescriptor> keyMap = null;
if (maxKeySize >= 0)
{
if (this.diskLimitType == DiskLimitType.COUNT)
{
- keyHash = new LRUMapCountLimited(maxKeySize);
+ keyMap = new LRUMapCountLimited(maxKeySize);
}
else
{
- keyHash = new LRUMapSizeLimited(maxKeySize);
+ keyMap = new LRUMapSizeLimited(maxKeySize);
}
if (log.isInfoEnabled())
@@ -1009,13 +1001,15 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V>
else
{
// If no max size, use a plain map for memory and processing efficiency.
- keyHash = new HashMap<K, IndexedDiskElementDescriptor>();
+ keyMap = new HashMap<>();
// keyHash = Collections.synchronizedMap( new HashMap() );
if (log.isInfoEnabled())
{
log.info(logCacheName + "Set maxKeySize to unlimited'");
}
}
+
+ return keyMap;
}
/**
@@ -1030,15 +1024,7 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V>
ICacheEvent<String> cacheEvent = createICacheEvent(getCacheName(), "none", ICacheEventLogger.DISPOSE_EVENT);
try
{
- Runnable disR = new Runnable()
- {
- @Override
- public void run()
- {
- disposeInternal();
- }
- };
- Thread t = new Thread(disR, "IndexedDiskCache-DisposalThread");
+ Thread t = new Thread(this::disposeInternal, "IndexedDiskCache-DisposalThread");
t.start();
// wait up to 60 seconds for dispose and then quit if not done.
try
@@ -1076,7 +1062,7 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V>
// Join with the current optimization thread.
if (log.isDebugEnabled())
{
- log.debug(logCacheName + "In dispose, optimization already " + "in progress; waiting for completion.");
+ log.debug(logCacheName + "In dispose, optimization already in progress; waiting for completion.");
}
try
{
@@ -1136,14 +1122,14 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V>
try
{
- this.adjustBytesFree(ded, true);
+ adjustBytesFree(ded, true);
if (doRecycle)
{
recycle.add(ded);
if (log.isDebugEnabled())
{
- log.debug(logCacheName + "recycled ded" + ded);
+ log.debug(logCacheName + "recycled ded " + ded);
}
}
@@ -1178,15 +1164,9 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V>
{
if (currentOptimizationThread == null)
{
- currentOptimizationThread = new Thread(new Runnable()
- {
- @Override
- public void run()
- {
- optimizeFile();
-
- currentOptimizationThread = null;
- }
+ currentOptimizationThread = new Thread(() -> {
+ optimizeFile();
+ currentOptimizationThread = null;
}, "IndexedDiskCache-OptimizationThread");
}
}
@@ -1279,7 +1259,7 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V>
// RESTORE NORMAL OPERATION
removeCount = 0;
resetBytesFree();
- initializeRecycleBin();
+ this.recycle.clear();
queuedPutList.clear();
queueInput = false;
// turn recycle back on.
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryReceiver.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryReceiver.java
index 0f5f510..b519159 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryReceiver.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryReceiver.java
@@ -26,6 +26,7 @@ import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.jcs.engine.CacheInfo;
import org.apache.commons.jcs.engine.behavior.IShutdownObserver;
@@ -57,19 +58,19 @@ public class UDPDiscoveryReceiver
private static final int maxPoolSize = 2;
/** The processor */
- private ExecutorService pooledExecutor = null;
+ private final ExecutorService pooledExecutor;
/** number of messages received. For debugging and testing. */
- private int cnt = 0;
+ private AtomicInteger cnt = new AtomicInteger(0);
/** Service to get cache names and handle request broadcasts */
- private UDPDiscoveryService service = null;
+ private final UDPDiscoveryService service;
/** Address */
- private String multicastAddressString = "";
+ private final String multicastAddressString;
/** The port */
- private int multicastPort = 0;
+ private final int multicastPort;
/** Is it shutdown. */
private boolean shutdown = false;
@@ -92,7 +93,7 @@ public class UDPDiscoveryReceiver
this.multicastPort = multicastPort;
// create a small thread pool to handle a barrage
- pooledExecutor = ThreadPoolManager.getInstance().createPool(
+ this.pooledExecutor = ThreadPoolManager.getInstance().createPool(
new PoolConfiguration(false, 0, maxPoolSize, maxPoolSize, 0, WhenBlockedPolicy.DISCARDOLDEST, maxPoolSize),
"JCS-UDPDiscoveryReceiver-", Thread.MIN_PRIORITY);
@@ -101,16 +102,7 @@ public class UDPDiscoveryReceiver
log.info( "Constructing listener, [" + this.multicastAddressString + ":" + this.multicastPort + "]" );
}
- try
- {
- createSocket( this.multicastAddressString, this.multicastPort );
- }
- catch ( IOException ioe )
- {
- // consider eating this so we can go on, or constructing the socket
- // later
- throw ioe;
- }
+ createSocket( this.multicastAddressString, this.multicastPort );
}
/**
@@ -165,9 +157,8 @@ public class UDPDiscoveryReceiver
log.debug( "Received packet from address [" + packet.getSocketAddress() + "]" );
}
- final ByteArrayInputStream byteStream = new ByteArrayInputStream( mBuffer, 0, packet.getLength() );
-
- try (ObjectInputStream objectStream = new ObjectInputStreamClassLoaderAware( byteStream, null ))
+ try (ByteArrayInputStream byteStream = new ByteArrayInputStream(mBuffer, 0, packet.getLength());
+ ObjectInputStream objectStream = new ObjectInputStreamClassLoaderAware(byteStream, null))
{
obj = objectStream.readObject();
}
@@ -204,8 +195,7 @@ public class UDPDiscoveryReceiver
{
Object obj = waitForMessage();
- // not thread safe, but just for debugging
- cnt++;
+ cnt.incrementAndGet();
if ( log.isDebugEnabled() )
{
@@ -261,7 +251,7 @@ public class UDPDiscoveryReceiver
*/
public void setCnt( int cnt )
{
- this.cnt = cnt;
+ this.cnt.set(cnt);
}
/**
@@ -269,7 +259,7 @@ public class UDPDiscoveryReceiver
*/
public int getCnt()
{
- return cnt;
+ return cnt.get();
}
/**
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryService.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryService.java
index b2047d7..15b635f 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryService.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryService.java
@@ -300,8 +300,8 @@ public class UDPDiscoveryService
*/
public void startup()
{
- udpReceiverThread = new Thread( receiver );
- udpReceiverThread.setDaemon( true );
+ udpReceiverThread = new Thread(receiver);
+ udpReceiverThread.setDaemon(true);
// udpReceiverThread.setName( t.getName() + "--UDPReceiver" );
udpReceiverThread.start();
}