You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by tv...@apache.org on 2021/01/24 18:27:55 UTC
[commons-jcs] branch master updated: Use Lambdas instead of
Runnables, deprecate old inner classes
This is an automated email from the ASF dual-hosted git repository.
tv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-jcs.git
The following commit(s) were added to refs/heads/master by this push:
new 3ef95e5 Use Lambdas instead of Runnables, deprecate old inner classes
3ef95e5 is described below
commit 3ef95e540e4708b816aed38940eba8cec08a2a23
Author: Thomas Vandahl <tv...@apache.org>
AuthorDate: Sun Jan 24 19:26:39 2021 +0100
Use Lambdas instead of Runnables, deprecate old inner classes
---
.../lateral/socket/tcp/LateralTCPListener.java | 283 ++++++++++-----------
.../jcs3/engine/AbstractCacheEventQueue.java | 37 ++-
.../engine/control/event/ElementEventQueue.java | 68 +----
.../jcs3/utils/discovery/UDPCleanupRunner.java | 34 +--
.../jcs3/utils/discovery/UDPDiscoveryReceiver.java | 104 ++++----
5 files changed, 232 insertions(+), 294 deletions(-)
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPListener.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPListener.java
index 3045253..8f457f8 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPListener.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPListener.java
@@ -19,7 +19,6 @@ package org.apache.commons.jcs3.auxiliary.lateral.socket.tcp;
* under the License.
*/
-import java.io.EOFException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
@@ -29,7 +28,6 @@ import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
-import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.Map;
import java.util.Set;
@@ -72,9 +70,6 @@ public class LateralTCPListener<K, V>
private static final ConcurrentHashMap<String, ILateralCacheListener<?, ?>> instances =
new ConcurrentHashMap<>();
- /** The socket listener */
- private ListenerThread receiver;
-
/** Configuration attributes */
private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
@@ -122,8 +117,7 @@ public class LateralTCPListener<K, V>
newIns.init();
newIns.setCacheManager( cacheMgr );
- log.info( "Created new listener {0}",
- () -> ilca.getTcpListenerPort() );
+ log.info("Created new listener {0}", () -> ilca.getTcpListenerPort());
return newIns;
});
@@ -176,9 +170,7 @@ public class LateralTCPListener<K, V>
}
serverSocket.setSoTimeout( acceptTimeOut );
- receiver = new ListenerThread(serverSocket);
- receiver.setDaemon( true );
- receiver.start();
+ pooledExecutor.execute(() -> runListener(serverSocket));
}
catch ( final IOException ex )
{
@@ -444,7 +436,9 @@ public class LateralTCPListener<K, V>
/**
* Processes commands from the server socket. There should be one listener for each configured
* TCP lateral.
+ * @deprecated No longer used
*/
+ @Deprecated
public class ListenerThread
extends Thread
{
@@ -462,60 +456,62 @@ public class LateralTCPListener<K, V>
}
/** Main processing method for the ListenerThread object */
- @SuppressWarnings("synthetic-access")
@Override
public void run()
{
- try (ServerSocket ssck = serverSocket)
+ runListener(serverSocket);
+ }
+ }
+
+ /**
+ * Processes commands from the server socket. There should be one listener for each configured
+ * TCP lateral.
+ */
+ private void runListener(final ServerSocket serverSocket)
+ {
+ try
+ {
+ while ( true )
{
- ConnectionHandler handler;
+ log.debug( "Waiting for clients to connect " );
- outer: while ( true )
+ // Check to see if we've been asked to exit, and exit
+ if (terminated.get())
{
- log.debug( "Waiting for clients to connect " );
-
- Socket socket = null;
- inner: while (true)
- {
- // Check to see if we've been asked to exit, and exit
- if (terminated.get())
- {
- log.debug("Thread terminated, exiting gracefully");
- break outer;
- }
+ log.debug("Thread terminated, exiting gracefully");
+ break;
+ }
- try
- {
- socket = ssck.accept();
- break inner;
- }
- catch (final SocketTimeoutException e)
- {
- // No problem! We loop back up!
- continue inner;
- }
- }
+ try
+ {
+ final Socket socket = serverSocket.accept();
- if ( socket != null && log.isDebugEnabled() )
+ if (socket != null)
{
- final InetAddress inetAddress = socket.getInetAddress();
- log.debug( "Connected to client at {0}", inetAddress );
+ log.debug("Connected to client at {0}", () -> socket.getInetAddress());
}
- handler = new ConnectionHandler( socket );
- pooledExecutor.execute( handler );
+ pooledExecutor.execute(() -> handleConnection(socket));
+ }
+ catch (final SocketTimeoutException e)
+ {
+ // No problem! We loop back up!
}
}
- catch ( final IOException e )
- {
- log.error( "Exception caught in TCP listener", e );
- }
+
+ serverSocket.close();
+ }
+ catch ( final IOException e )
+ {
+ log.error( "Exception caught in TCP listener", e );
}
}
/**
* A Separate thread that runs when a command comes into the LateralTCPReceiver.
+ * @deprecated No longer used
*/
+ @Deprecated
public class ConnectionHandler
implements Runnable
{
@@ -535,125 +531,127 @@ public class LateralTCPListener<K, V>
* Main processing method for the LateralTCPReceiverConnection object
*/
@Override
- @SuppressWarnings({"unchecked", // Need to cast from Object
- "synthetic-access" })
public void run()
{
- try (ObjectInputStream ois =
- new ObjectInputStreamClassLoaderAware( socket.getInputStream(), null ))
+ handleConnection(socket);
+ }
+ }
+
+ /**
+ * A Separate thread that runs when a command comes into the LateralTCPReceiver.
+ */
+ private void handleConnection(final Socket socket)
+ {
+ try (ObjectInputStream ois =
+ new ObjectInputStreamClassLoaderAware( socket.getInputStream(), null ))
+ {
+ while ( true )
{
- while ( true )
- {
- final LateralElementDescriptor<K, V> led =
- (LateralElementDescriptor<K, V>) ois.readObject();
+ @SuppressWarnings("unchecked") // Need to cast from Object
+ final LateralElementDescriptor<K, V> led =
+ (LateralElementDescriptor<K, V>) ois.readObject();
- if ( led == null )
- {
- log.debug( "LateralElementDescriptor is null" );
- continue;
- }
- if ( led.requesterId == getListenerId() )
- {
- log.debug( "from self" );
- }
- else
- {
- log.debug( "receiving LateralElementDescriptor from another led = {0}",
- led );
+ if ( led == null )
+ {
+ log.debug( "LateralElementDescriptor is null" );
+ continue;
+ }
+ if ( led.requesterId == getListenerId() )
+ {
+ log.debug( "from self" );
+ }
+ else
+ {
+ log.debug( "receiving LateralElementDescriptor from another led = {0}",
+ led );
- handle( led );
- }
+ handleElement(led, socket);
}
}
- catch ( final EOFException e )
- {
- log.info( "Caught EOFException, closing connection.", e );
- }
- catch ( final SocketException e )
- {
- log.info( "Caught SocketException, closing connection.", e );
- }
- catch ( final Exception e )
- {
- log.error( "Unexpected exception.", e );
- }
}
-
- /**
- * This calls the appropriate method, based on the command sent in the Lateral element
- * descriptor.
- * <p>
- * @param led
- * @throws IOException
- */
- @SuppressWarnings("synthetic-access")
- private void handle( final LateralElementDescriptor<K, V> led )
- throws IOException
+ catch (final IOException e)
{
- final String cacheName = led.ce.getCacheName();
- final K key = led.ce.getKey();
- Serializable obj = null;
+ log.info("Caught {0}, closing connection.", e.getClass().getSimpleName(), e);
+ }
+ catch (final ClassNotFoundException e)
+ {
+ log.error( "Deserialization failed reading from socket", e );
+ }
+ }
- switch (led.command)
- {
- case UPDATE:
- handlePut( led.ce );
- break;
+ /**
+ * This calls the appropriate method, based on the command sent in the Lateral element
+ * descriptor.
+ * <p>
+ * @param led the lateral element
+ * @param socket the socket
+ * @throws IOException
+ */
+ private void handleElement(final LateralElementDescriptor<K, V> led, Socket socket) throws IOException
+ {
+ final String cacheName = led.ce.getCacheName();
+ final K key = led.ce.getKey();
+ Serializable obj = null;
- case REMOVE:
- // if a hashcode was given and filtering is on
- // check to see if they are the same
- // if so, then don't remove, otherwise issue a remove
- if ( led.valHashCode != -1 )
+ switch (led.command)
+ {
+ case UPDATE:
+ handlePut( led.ce );
+ break;
+
+ case REMOVE:
+ // if a hashcode was given and filtering is on
+ // check to see if they are the same
+ // if so, then don't remove, otherwise issue a remove
+ if ( led.valHashCode != -1 )
+ {
+ if ( getTcpLateralCacheAttributes().isFilterRemoveByHashCode() )
{
- if ( getTcpLateralCacheAttributes().isFilterRemoveByHashCode() )
+ final ICacheElement<K, V> test = getCache( cacheName ).localGet( key );
+ if ( test != null )
{
- final ICacheElement<K, V> test = getCache( cacheName ).localGet( key );
- if ( test != null )
+ if ( test.getVal().hashCode() == led.valHashCode )
+ {
+ log.debug( "Filtering detected identical hashCode [{0}], "
+ + "not issuing a remove for led {1}",
+ led.valHashCode, led );
+ return;
+ }
+ else
{
- if ( test.getVal().hashCode() == led.valHashCode )
- {
- log.debug( "Filtering detected identical hashCode [{0}], "
- + "not issuing a remove for led {1}",
- led.valHashCode, led );
- return;
- }
- else
- {
- log.debug( "Different hashcodes, in cache [{0}] sent [{1}]",
- test.getVal().hashCode(), led.valHashCode );
- }
+ log.debug( "Different hashcodes, in cache [{0}] sent [{1}]",
+ test.getVal().hashCode(), led.valHashCode );
}
}
}
- handleRemove( cacheName, key );
- break;
+ }
+ handleRemove( cacheName, key );
+ break;
- case REMOVEALL:
- handleRemoveAll( cacheName );
- break;
+ case REMOVEALL:
+ handleRemoveAll( cacheName );
+ break;
- case GET:
- obj = handleGet( cacheName, key );
- break;
+ case GET:
+ obj = handleGet( cacheName, key );
+ break;
- case GET_MATCHING:
- obj = (Serializable) handleGetMatching( cacheName, (String) key );
- break;
+ case GET_MATCHING:
+ obj = (Serializable) handleGetMatching( cacheName, (String) key );
+ break;
- case GET_KEYSET:
- obj = (Serializable) handleGetKeySet(cacheName);
- break;
+ case GET_KEYSET:
+ obj = (Serializable) handleGetKeySet(cacheName);
+ break;
- default: break;
- }
+ default: break;
+ }
- if (obj != null)
- {
- final ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() );
- oos.writeObject( obj );
- oos.flush();
- }
+ if (obj != null)
+ {
+ final ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() );
+ oos.writeObject( obj );
+ oos.flush();
}
}
@@ -666,8 +664,7 @@ public class LateralTCPListener<K, V>
if ( shutdown.compareAndSet(false, true) )
{
log.info( "Shutting down TCP Lateral receiver." );
-
- receiver.interrupt();
+ pooledExecutor.shutdownNow();
}
else
{
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/AbstractCacheEventQueue.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/AbstractCacheEventQueue.java
index 6055a82..f131313 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/AbstractCacheEventQueue.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/AbstractCacheEventQueue.java
@@ -205,45 +205,40 @@ public abstract class AbstractCacheEventQueue<K, V>
*/
protected abstract class AbstractCacheEvent implements Runnable
{
- /** Number of failures encountered processing this event. */
- int failures;
-
/**
* Main processing method for the AbstractCacheEvent object
*/
@Override
- @SuppressWarnings("synthetic-access")
public void run()
{
- try
- {
- doRun();
- }
- catch ( final IOException e )
+ for (int failures = 0; failures < maxFailure; failures++)
{
- log.warn( e );
- if ( ++failures >= maxFailure )
+ try
{
- log.warn( "Error while running event from Queue: {0}. "
- + "Dropping Event and marking Event Queue as "
- + "non-functional.", this );
- destroy();
+ doRun();
return;
}
- log.info( "Error while running event from Queue: {0}. "
- + "Retrying...", this );
+ catch (final IOException e)
+ {
+ log.warn("Error while running event from Queue: {0}. "
+ + "Retrying...", this, e);
+ }
+
try
{
Thread.sleep( waitBeforeRetry );
- run();
}
catch ( final InterruptedException ie )
{
- log.warn( "Interrupted while sleeping for retry on event "
- + "{0}.", this );
- destroy();
+ log.warn("Interrupted while sleeping for retry on event "
+ + "{0}.", this, ie);
+ break;
}
}
+
+ log.warn( "Dropping Event and marking Event Queue {0} as "
+ + "non-functional.", this );
+ destroy();
}
/**
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/control/event/ElementEventQueue.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/control/event/ElementEventQueue.java
index e59c611..fd85be2 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/control/event/ElementEventQueue.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/control/event/ElementEventQueue.java
@@ -21,6 +21,7 @@ package org.apache.commons.jcs3.engine.control.event;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.jcs3.engine.control.event.behavior.IElementEvent;
import org.apache.commons.jcs3.engine.control.event.behavior.IElementEventHandler;
@@ -28,8 +29,8 @@ import org.apache.commons.jcs3.engine.control.event.behavior.IElementEventQueue;
import org.apache.commons.jcs3.log.Log;
import org.apache.commons.jcs3.log.LogManager;
import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration;
-import org.apache.commons.jcs3.utils.threadpool.ThreadPoolManager;
import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration.WhenBlockedPolicy;
+import org.apache.commons.jcs3.utils.threadpool.ThreadPoolManager;
/**
* An event queue is used to propagate ordered cache events to one and only one target listener.
@@ -43,7 +44,7 @@ public class ElementEventQueue
private static final Log log = LogManager.getLog( ElementEventQueue.class );
/** shutdown or not */
- private boolean destroyed;
+ private AtomicBoolean destroyed = new AtomicBoolean(false);
/** The worker thread pool. */
private ExecutorService queueProcessor;
@@ -65,15 +66,10 @@ public class ElementEventQueue
@Override
public void dispose()
{
- if ( !destroyed )
+ if (destroyed.compareAndSet(false, true))
{
- destroyed = true;
-
- // synchronize on queue so the thread will not wait forever,
- // and then interrupt the QueueProcessor
+ // shut down the QueueProcessor
queueProcessor.shutdownNow();
- queueProcessor = null;
-
log.info( "Element event queue destroyed: {0}", this );
}
}
@@ -89,19 +85,15 @@ public class ElementEventQueue
throws IOException
{
- log.debug( "Adding Event Handler to QUEUE, !destroyed = {0}", !destroyed );
+ log.debug("Adding Event Handler to QUEUE, !destroyed = {0}", !destroyed.get());
- if (destroyed)
+ if (destroyed.get())
{
log.warn("Event submitted to disposed element event queue {0}", event);
}
else
{
- final ElementEventRunner runner = new ElementEventRunner( hand, event );
-
- log.debug( "runner = {0}", runner );
-
- queueProcessor.execute(runner);
+ queueProcessor.execute(() -> hand.handleElementEvent(event));
}
}
@@ -109,14 +101,15 @@ public class ElementEventQueue
/**
* Retries before declaring failure.
+ * @deprecated No longer used
*/
+ @Deprecated
protected abstract class AbstractElementEventRunner
implements Runnable
{
/**
* Main processing method for the AbstractElementEvent object
*/
- @SuppressWarnings("synthetic-access")
@Override
public void run()
{
@@ -140,45 +133,4 @@ public class ElementEventQueue
protected abstract void doRun()
throws IOException;
}
-
- /**
- * ElementEventRunner.
- */
- private class ElementEventRunner
- extends AbstractElementEventRunner
- {
- /** the handler */
- private final IElementEventHandler hand;
-
- /** event */
- private final IElementEvent<?> event;
-
- /**
- * Constructor for the PutEvent object.
- * <p>
- * @param hand
- * @param event
- * @throws IOException
- */
- @SuppressWarnings("synthetic-access")
- ElementEventRunner( final IElementEventHandler hand, final IElementEvent<?> event )
- throws IOException
- {
- log.debug( "Constructing {0}", this );
- this.hand = hand;
- this.event = event;
- }
-
- /**
- * Tells the handler to handle the event.
- * <p>
- * @throws IOException
- */
- @Override
- protected void doRun()
- throws IOException
- {
- hand.handleElementEvent( event );
- }
- }
}
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPCleanupRunner.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPCleanupRunner.java
index e94f5ac..647dc2d 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPCleanupRunner.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPCleanupRunner.java
@@ -19,9 +19,6 @@ package org.apache.commons.jcs3.utils.discovery;
* under the License.
*/
-import java.util.HashSet;
-import java.util.Set;
-
import org.apache.commons.jcs3.log.Log;
import org.apache.commons.jcs3.log.LogManager;
@@ -66,27 +63,20 @@ public class UDPCleanupRunner
{
final long now = System.currentTimeMillis();
- // iterate through the set
- // it is thread safe
- // TODO this should get a copy. you can't simply remove from this.
// the listeners need to be notified.
- final Set<DiscoveredService> toRemove = new HashSet<>();
- // can't remove via the iterator. must remove directly
- for (final DiscoveredService service : discoveryService.getDiscoveredServices())
- {
- if ( ( now - service.getLastHearFromTime() ) > ( maxIdleTimeSeconds * 1000 ) )
- {
- log.info( "Removing service, since we haven't heard from it in "
- + "{0} seconds. service = {1}", maxIdleTimeSeconds, service );
- toRemove.add( service );
- }
- }
+ discoveryService.getDiscoveredServices().stream()
+ .filter(service -> {
+ if (now - service.getLastHearFromTime() > maxIdleTimeSeconds * 1000)
+ {
+ log.info( "Removing service, since we haven't heard from it in "
+ + "{0} seconds. service = {1}", maxIdleTimeSeconds, service );
+ return true;
+ }
- // remove the bad ones
- for (final DiscoveredService service : toRemove)
- {
+ return false;
+ })
+ // remove the bad ones
// call this so the listeners get notified
- discoveryService.removeDiscoveredService( service );
- }
+ .forEach(service -> discoveryService.removeDiscoveredService(service));
}
}
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java
index 3f03ca5..5e71d63 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java
@@ -37,8 +37,8 @@ import org.apache.commons.jcs3.log.LogManager;
import org.apache.commons.jcs3.utils.discovery.UDPDiscoveryMessage.BroadcastType;
import org.apache.commons.jcs3.utils.net.HostNameUtil;
import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration;
-import org.apache.commons.jcs3.utils.threadpool.ThreadPoolManager;
import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration.WhenBlockedPolicy;
+import org.apache.commons.jcs3.utils.threadpool.ThreadPoolManager;
/** Receives UDP Discovery messages. */
public class UDPDiscoveryReceiver
@@ -210,18 +210,13 @@ public class UDPDiscoveryReceiver
log.debug( "{0} messages received.", this::getCnt );
- UDPDiscoveryMessage message = null;
-
try
{
- message = (UDPDiscoveryMessage) obj;
+ UDPDiscoveryMessage message = (UDPDiscoveryMessage) obj;
// check for null
if ( message != null )
{
- final MessageHandler handler = new MessageHandler( message );
-
- pooledExecutor.execute( handler );
-
+ pooledExecutor.execute(() -> handleMessage(message));
log.debug( "Passed handler to executor." );
}
else
@@ -269,7 +264,9 @@ public class UDPDiscoveryReceiver
/**
* Separate thread run when a command comes into the UDPDiscoveryReceiver.
+ * @deprectaed No longer used
*/
+ @Deprecated
public class MessageHandler
implements Runnable
{
@@ -287,62 +284,69 @@ public class UDPDiscoveryReceiver
/**
* Process the message.
*/
- @SuppressWarnings("synthetic-access")
@Override
public void run()
{
- // consider comparing ports here instead.
- if ( message.getRequesterId() == CacheInfo.listenerId )
- {
- log.debug( "Ignoring message sent from self" );
- }
- else
- {
- log.debug( "Process message sent from another" );
- log.debug( "Message = {0}", message );
-
- if ( message.getHost() == null || message.getCacheNames() == null || message.getCacheNames().isEmpty() )
- {
- log.debug( "Ignoring invalid message: {0}", message );
- }
- else
- {
- processMessage();
- }
- }
+ handleMessage(message);
}
- /**
- * Process the incoming message.
- */
- @SuppressWarnings("synthetic-access")
- private void processMessage()
+ }
+
+ /**
+ * Separate thread run when a command comes into the UDPDiscoveryReceiver.
+ */
+ private void handleMessage(UDPDiscoveryMessage message)
+ {
+ // consider comparing ports here instead.
+ if ( message.getRequesterId() == CacheInfo.listenerId )
{
- final DiscoveredService discoveredService = new DiscoveredService();
- discoveredService.setServiceAddress( message.getHost() );
- discoveredService.setCacheNames( message.getCacheNames() );
- discoveredService.setServicePort( message.getPort() );
- discoveredService.setLastHearFromTime( System.currentTimeMillis() );
-
- // if this is a request message, have the service handle it and
- // return
- if ( message.getMessageType() == BroadcastType.REQUEST )
- {
- log.debug( "Message is a Request Broadcast, will have the service handle it." );
- service.serviceRequestBroadcast();
- }
- else if ( message.getMessageType() == BroadcastType.REMOVE )
+ log.debug( "Ignoring message sent from self" );
+ }
+ else
+ {
+ log.debug( "Process message sent from another" );
+ log.debug( "Message = {0}", message );
+
+ if ( message.getHost() == null || message.getCacheNames() == null || message.getCacheNames().isEmpty() )
{
- log.debug( "Removing service from set {0}", discoveredService );
- service.removeDiscoveredService( discoveredService );
+ log.debug( "Ignoring invalid message: {0}", message );
}
else
{
- service.addOrUpdateService( discoveredService );
+ processMessage(message);
}
}
}
+ /**
+ * Process the incoming message.
+ */
+ private void processMessage(UDPDiscoveryMessage message)
+ {
+ final DiscoveredService discoveredService = new DiscoveredService();
+ discoveredService.setServiceAddress( message.getHost() );
+ discoveredService.setCacheNames( message.getCacheNames() );
+ discoveredService.setServicePort( message.getPort() );
+ discoveredService.setLastHearFromTime( System.currentTimeMillis() );
+
+ // if this is a request message, have the service handle it and
+ // return
+ if ( message.getMessageType() == BroadcastType.REQUEST )
+ {
+ log.debug( "Message is a Request Broadcast, will have the service handle it." );
+ service.serviceRequestBroadcast();
+ }
+ else if ( message.getMessageType() == BroadcastType.REMOVE )
+ {
+ log.debug( "Removing service from set {0}", discoveredService );
+ service.removeDiscoveredService( discoveredService );
+ }
+ else
+ {
+ service.addOrUpdateService( discoveredService );
+ }
+ }
+
/** Shuts down the socket. */
@Override
public void shutdown()