You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by tv...@apache.org on 2021/11/22 20:17:14 UTC

[commons-jcs] 02/02: Introduce serializer support throughout LateralTCPCache

This is an automated email from the ASF dual-hosted git repository.

tv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-jcs.git

commit e3d8fb571f092ab0d1a2f7552c2719c362b4ec3e
Author: Thomas Vandahl <tv...@apache.org>
AuthorDate: Mon Nov 22 21:16:58 2021 +0100

    Introduce serializer support throughout LateralTCPCache
---
 .../auxiliary/lateral/LateralCacheMonitor.java     |  13 +-
 .../jcs3/auxiliary/lateral/LateralCacheNoWait.java |   2 +
 .../lateral/socket/tcp/LateralTCPCacheFactory.java | 139 +++++++++++++--------
 .../socket/tcp/LateralTCPDiscoveryListener.java    |   1 +
 .../lateral/socket/tcp/LateralTCPListener.java     |  20 +--
 .../lateral/socket/tcp/LateralTCPSender.java       |  19 ++-
 .../lateral/socket/tcp/LateralTCPService.java      |  62 +++++----
 7 files changed, 167 insertions(+), 89 deletions(-)

diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/LateralCacheMonitor.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/LateralCacheMonitor.java
index 44ca757..c6e6e43 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/LateralCacheMonitor.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/LateralCacheMonitor.java
@@ -112,16 +112,19 @@ public class LateralCacheMonitor extends AbstractAuxiliaryCacheMonitor
             final String cacheName = entry.getKey();
 
             @SuppressWarnings("unchecked") // Downcast to match service
-            final
-            LateralCacheNoWait<Object, Object> c = (LateralCacheNoWait<Object, Object>) entry.getValue();
-            if ( c.getStatus() == CacheStatus.ERROR )
+            final LateralCacheNoWait<Object, Object> c =
+                (LateralCacheNoWait<Object, Object>) entry.getValue();
+
+            if (c.getStatus() == CacheStatus.ERROR)
             {
                 log.info( "Found LateralCacheNoWait in error, " + cacheName );
 
-                final ITCPLateralCacheAttributes lca = (ITCPLateralCacheAttributes)c.getAuxiliaryCacheAttributes();
+                final ITCPLateralCacheAttributes lca =
+                        (ITCPLateralCacheAttributes) c.getAuxiliaryCacheAttributes();
 
                 // Get service instance
-                final ICacheServiceNonLocal<Object, Object> cacheService = factory.getCSNLInstance(lca);
+                final ICacheServiceNonLocal<Object, Object> cacheService =
+                        factory.getCSNLInstance(lca, c.getElementSerializer());
 
                 // If we can't fix them, just skip and re-try in the
                 // next round.
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/LateralCacheNoWait.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/LateralCacheNoWait.java
index 26e1e68..7424c86 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/LateralCacheNoWait.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/LateralCacheNoWait.java
@@ -83,6 +83,8 @@ public class LateralCacheNoWait<K, V>
     {
         this.cache = cache;
         this.identityKey = cache.getCacheName();
+        this.setCacheEventLogger(cache.getCacheEventLogger());
+        this.setElementSerializer(cache.getElementSerializer());
 
         log.debug( "Constructing LateralCacheNoWait, LateralCache = [{0}]", cache );
 
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java
index baf7025..bc6d864 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java
@@ -44,6 +44,7 @@ import org.apache.commons.jcs3.log.Log;
 import org.apache.commons.jcs3.log.LogManager;
 import org.apache.commons.jcs3.utils.discovery.UDPDiscoveryManager;
 import org.apache.commons.jcs3.utils.discovery.UDPDiscoveryService;
+import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
 
 /**
  * Constructs a LateralCacheNoWaitFacade for the given configuration. Each lateral service / local
@@ -76,11 +77,13 @@ public class LateralTCPCacheFactory
     /**
      * Creates a TCP lateral.
      * <p>
-     * @param iaca
-     * @param cacheMgr
-     * @param cacheEventLogger
-     * @param elementSerializer
-     * @return LateralCacheNoWaitFacade
+     * @param <K> cache key type
+     * @param <V> cache value type
+     * @param iaca the cache configuration object
+     * @param cacheMgr the cache manager
+     * @param cacheEventLogger the event logger
+     * @param elementSerializer the serializer to use when sending or receiving
+     * @return a LateralCacheNoWaitFacade
      */
     @Override
     public <K, V> LateralCacheNoWaitFacade<K, V> createCache(
@@ -101,13 +104,13 @@ public class LateralTCPCacheFactory
             for (final String server : servers)
             {
                 log.debug( "tcp server = {0}", server );
-                final ITCPLateralCacheAttributes lacC = (ITCPLateralCacheAttributes) lac.clone();
-                lacC.setTcpServer( server );
+                final ITCPLateralCacheAttributes lacClone = (ITCPLateralCacheAttributes) lac.clone();
+                lacClone.setTcpServer( server );
 
-                final LateralCacheNoWait<K, V> lateralNoWait = createCacheNoWait(lacC, cacheEventLogger, elementSerializer);
+                final LateralCacheNoWait<K, V> lateralNoWait = createCacheNoWait(lacClone, cacheEventLogger, elementSerializer);
 
-                addListenerIfNeeded( lacC, cacheMgr );
-                monitor.addCache(lateralNoWait);
+                addListenerIfNeeded( lacClone, cacheMgr );
+                monitorCache(lateralNoWait);
                 noWaits.add( lateralNoWait );
             }
         }
@@ -124,10 +127,20 @@ public class LateralTCPCacheFactory
         return lcnwf;
     }
 
+    /**
+     * Create a LateralCacheNoWait for the server configured in lca
+     *
+     * @param <K> cache key type
+     * @param <V> cache value type
+     * @param lca the cache configuration object
+     * @param cacheEventLogger the event logger
+     * @param elementSerializer the serializer to use when sending or receiving
+     * @return a LateralCacheNoWait
+     */
     public <K, V> LateralCacheNoWait<K, V> createCacheNoWait( final ITCPLateralCacheAttributes lca,
             final ICacheEventLogger cacheEventLogger, final IElementSerializer elementSerializer )
     {
-        final ICacheServiceNonLocal<K, V> lateralService = getCSNLInstance(lca);
+        final ICacheServiceNonLocal<K, V> lateralService = getCSNLInstance(lca, elementSerializer);
 
         final LateralCache<K, V> cache = new LateralCache<>( lca, lateralService, this.monitor );
         cache.setCacheEventLogger( cacheEventLogger );
@@ -137,8 +150,6 @@ public class LateralTCPCacheFactory
 
         final LateralCacheNoWait<K, V> lateralNoWait = new LateralCacheNoWait<>( cache );
         lateralNoWait.setIdentityKey(lca.getTcpServer());
-        lateralNoWait.setCacheEventLogger( cacheEventLogger );
-        lateralNoWait.setElementSerializer( elementSerializer );
 
         log.info( "Created LateralCacheNoWait for [{0}] LateralCacheNoWait = [{1}]",
                 lca, lateralNoWait );
@@ -205,55 +216,86 @@ public class LateralTCPCacheFactory
     /**
      * Returns an instance of the cache service.
      * <p>
+     * @param <K> cache key type
+     * @param <V> cache value type
      * @param lca configuration for the creation of a new service instance
      *
      * @return ICacheServiceNonLocal&lt;K, V&gt;
+     *
+     * @deprecated Specify serializer
+     */
+    @Deprecated
+    public <K, V> ICacheServiceNonLocal<K, V> getCSNLInstance( final ITCPLateralCacheAttributes lca )
+    {
+        return getCSNLInstance(lca, new StandardSerializer());
+    }
+
+    /**
+     * Returns an instance of the cache service.
+     * <p>
+     * @param <K> cache key type
+     * @param <V> cache value type
+     * @param lca configuration for the creation of a new service instance
+     * @param elementSerializer the serializer to use when sending or receiving
+     *
+     * @return ICacheServiceNonLocal&lt;K, V&gt;
      */
     // Need to cast because of common map for all cache services
     @SuppressWarnings("unchecked")
-    public <K, V> ICacheServiceNonLocal<K, V> getCSNLInstance( final ITCPLateralCacheAttributes lca )
+    public <K, V> ICacheServiceNonLocal<K, V> getCSNLInstance(final ITCPLateralCacheAttributes lca,
+            final IElementSerializer elementSerializer)
     {
         final String key = lca.getTcpServer();
 
-        csnlInstances.computeIfPresent(key, (name, service) -> {
+        return (ICacheServiceNonLocal<K, V>) csnlInstances.compute(key, (name, service) -> {
+
+            ICacheServiceNonLocal<?, ?> newService = service;
+
             // If service creation did not succeed last time, force retry
             if (service instanceof ZombieCacheServiceNonLocal)
             {
                 log.info("Disposing of zombie service instance for [{0}]", name);
-                return null;
+                newService = null;
             }
 
-            return service;
-        });
-
-        return (ICacheServiceNonLocal<K, V>) csnlInstances.computeIfAbsent(key, name -> {
-
-                    log.info( "Instance for [{0}] is null, creating", name );
-
-                    // Create the service
-                    try
-                    {
-                        log.info( "Creating TCP service, lca = {0}", lca );
-
-                        return new LateralTCPService<>( lca );
-                    }
-                    catch ( final IOException ex )
-                    {
-                        // Failed to connect to the lateral server.
-                        // Configure this LateralCacheManager instance to use the
-                        // "zombie" services.
-                        log.error( "Failure, lateral instance will use zombie service", ex );
-
-                        final ICacheServiceNonLocal<K, V> zombieService =
-                                new ZombieCacheServiceNonLocal<>( lca.getZombieQueueMaxSize() );
+            if (newService == null)
+            {
+                log.info( "Instance for [{0}] is null, creating", name );
+
+                // Create the service
+                try
+                {
+                    log.info( "Creating TCP service, lca = {0}", lca );
+
+                    newService = new LateralTCPService<>(lca, elementSerializer);
+                }
+                catch ( final IOException ex )
+                {
+                    // Failed to connect to the lateral server.
+                    // Configure this LateralCacheManager instance to use the
+                    // "zombie" services.
+                    log.error( "Failure, lateral instance will use zombie service", ex );
+
+                    newService = new ZombieCacheServiceNonLocal<>(lca.getZombieQueueMaxSize());
+
+                    // Notify the cache monitor about the error, and kick off
+                    // the recovery process.
+                    monitor.notifyError();
+                }
+            }
 
-                        // Notify the cache monitor about the error, and kick off
-                        // the recovery process.
-                        monitor.notifyError();
+            return newService;
+        });
+    }
 
-                        return zombieService;
-                    }
-                });
+    /**
+     * Add cache instance to monitor
+     *
+     * @param cache the cache instance
+     */
+    public void monitorCache(final LateralCacheNoWait<?, ?> cache)
+    {
+        monitor.addCache(cache);
     }
 
     /**
@@ -294,8 +336,7 @@ public class LateralTCPCacheFactory
         {
             try
             {
-                addLateralCacheListener( iaca.getCacheName(),
-                        LateralTCPListener.getInstance( iaca, cacheMgr ) );
+                addLateralCacheListener(iaca.getCacheName(), createListener(iaca, cacheMgr));
             }
             catch ( final IOException ioe )
             {
@@ -331,11 +372,11 @@ public class LateralTCPCacheFactory
      * This should be called by create cache.
      * <p>
      * @param attr  ITCPLateralCacheAttributes
-     * @param cacheMgr
+     * @param cacheMgr the composite cache manager
      *
      * @return the listener if created, else null
      */
-    private <K, V> ILateralCacheListener<K, V> createListener( final ITCPLateralCacheAttributes attr,
+    private static <K, V> ILateralCacheListener<K, V> createListener( final ITCPLateralCacheAttributes attr,
             final ICompositeCacheManager cacheMgr )
     {
         ILateralCacheListener<K, V> listener = null;
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListener.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListener.java
index c453b60..e24a684 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListener.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListener.java
@@ -305,6 +305,7 @@ public class LateralTCPDiscoveryListener
 
                     LateralCacheNoWait<?, ?> noWait =
                             factory.createCacheNoWait(lca, cacheEventLogger, elementSerializer);
+                    factory.monitorCache(noWait);
 
                     if (addNoWait(noWait))
                     {
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPListener.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPListener.java
index babe75e..fc92109 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPListener.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPListener.java
@@ -597,7 +597,7 @@ public class LateralTCPListener<K, V>
                         log.debug( "LateralElementDescriptor is null" );
                         continue;
                     }
-                    if ( led.requesterId == getListenerId() )
+                    if ( led.getRequesterId() == getListenerId() )
                     {
                         log.debug( "from self" );
                     }
@@ -645,7 +645,7 @@ public class LateralTCPListener<K, V>
                 return;
             }
 
-            if ( led.requesterId == getListenerId() )
+            if ( led.getRequesterId() == getListenerId() )
             {
                 log.debug( "from self" );
             }
@@ -689,35 +689,35 @@ public class LateralTCPListener<K, V>
      */
     private Object handleElement(final LateralElementDescriptor<K, V> led) throws IOException
     {
-        final String cacheName = led.ce.getCacheName();
-        final K key = led.ce.getKey();
+        final String cacheName = led.getPayload().getCacheName();
+        final K key = led.getPayload().getKey();
         Object obj = null;
 
-        switch (led.command)
+        switch (led.getCommand())
         {
             case UPDATE:
-                handlePut( led.ce );
+                handlePut(led.getPayload());
                 break;
 
             case REMOVE:
                 // if a hashcode was given and filtering is on
                 // check to see if they are the same
                 // if so, then don't remove, otherwise issue a remove
-                if ( (led.valHashCode != -1) &&
+                if ( (led.getValHashCode() != -1) &&
                         getTcpLateralCacheAttributes().isFilterRemoveByHashCode() )
                 {
                     final ICacheElement<K, V> test = getCache( cacheName ).localGet( key );
                     if ( test != null )
                     {
-                        if ( test.getVal().hashCode() == led.valHashCode )
+                        if ( test.getVal().hashCode() == led.getValHashCode() )
                         {
                             log.debug( "Filtering detected identical hashCode [{0}], "
                                     + "not issuing a remove for led {1}",
-                                    led.valHashCode, led );
+                                    led.getValHashCode(), led );
                             return null;
                         }
                         log.debug( "Different hashcodes, in cache [{0}] sent [{1}]",
-                                test.getVal().hashCode(), led.valHashCode );
+                                test.getVal()::hashCode, led::getValHashCode );
                     }
                 }
                 handleRemove( cacheName, key );
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPSender.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPSender.java
index 8fdb6d1..0696723 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPSender.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPSender.java
@@ -66,14 +66,29 @@ public class LateralTCPSender
      * <p>
      * @param lca
      * @throws IOException
+     * @deprecated Specify serializer
      */
+    @Deprecated
     public LateralTCPSender( final ITCPLateralCacheAttributes lca )
         throws IOException
     {
+        this(lca, new StandardSerializer());
+    }
+
+    /**
+     * Constructor for the LateralTCPSender object.
+     * <p>
+     * @param lca the configuration object
+     * @param serializer the serializer to use when sending
+     * @throws IOException
+     */
+    public LateralTCPSender( final ITCPLateralCacheAttributes lca, final IElementSerializer serializer )
+        throws IOException
+    {
         this.socketOpenTimeOut = lca.getOpenTimeOut();
         this.socketSoTimeOut = lca.getSocketTimeOut();
 
-        this.serializer = new StandardSerializer();
+        this.serializer = serializer;
 
         final String p1 = lca.getTcpServer();
         if ( p1 == null )
@@ -124,8 +139,6 @@ public class LateralTCPSender
         {
             throw new IOException( "Cannot connect to " + host + ":" + port, ioe );
         }
-
-        // socket.setSoTimeout( socketSoTimeOut );
     }
 
     /**
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPService.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPService.java
index e29e182..b26c98f 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPService.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPService.java
@@ -35,8 +35,10 @@ import org.apache.commons.jcs3.engine.CacheElement;
 import org.apache.commons.jcs3.engine.CacheInfo;
 import org.apache.commons.jcs3.engine.behavior.ICacheElement;
 import org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal;
+import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
 import org.apache.commons.jcs3.log.Log;
 import org.apache.commons.jcs3.log.LogManager;
+import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
 
 /**
  * A lateral cache service implementation. Does not implement getGroupKey
@@ -62,19 +64,35 @@ public class LateralTCPService<K, V>
     /**
      * Constructor for the LateralTCPService object
      * <p>
-     * @param lca ITCPLateralCacheAttributes
+     * @param lca ITCPLateralCacheAttributes the configuration object
      * @throws IOException
+     *
+     * @deprecated Specify serializer
      */
+    @Deprecated
     public LateralTCPService( final ITCPLateralCacheAttributes lca )
         throws IOException
     {
+        this(lca, new StandardSerializer());
+    }
+
+    /**
+     * Constructor for the LateralTCPService object
+     * <p>
+     * @param lca ITCPLateralCacheAttributes the configuration object
+     * @param serializer the serializer to use when sending
+     * @throws IOException
+     */
+    public LateralTCPService( final ITCPLateralCacheAttributes lca, final IElementSerializer serializer )
+        throws IOException
+    {
         this.allowGet = lca.isAllowGet();
         this.allowPut = lca.isAllowPut();
         this.issueRemoveOnPut = lca.isIssueRemoveOnPut();
 
         try
         {
-            sender = new LateralTCPSender( lca );
+            sender = new LateralTCPSender( lca, serializer );
 
             log.debug( "Created sender to [{0}]", lca::getTcpServer);
         }
@@ -83,7 +101,7 @@ public class LateralTCPService<K, V>
             // log.error( "Could not create sender", e );
             // This gets thrown over and over in recovery mode.
             // The stack trace isn't useful here.
-            log.error( "Could not create sender to [{0}] -- {1}", lca.getTcpServer(), e.getMessage());
+            log.error( "Could not create sender to [{0}] -- {1}", lca::getTcpServer, e::getMessage);
             throw e;
         }
     }
@@ -122,9 +140,8 @@ public class LateralTCPService<K, V>
         // if we shouldn't remove on put, then put
         if ( !this.issueRemoveOnPut )
         {
-            final LateralElementDescriptor<K, V> led = new LateralElementDescriptor<>( item );
-            led.requesterId = requesterId;
-            led.command = LateralCommand.UPDATE;
+            final LateralElementDescriptor<K, V> led =
+                    new LateralElementDescriptor<>(item, LateralCommand.UPDATE, requesterId);
             sender.send( led );
         }
         // else issue a remove with the hashcode for remove check on
@@ -135,9 +152,8 @@ public class LateralTCPService<K, V>
 
             // set the value to null so we don't send the item
             final CacheElement<K, V> ce = new CacheElement<>( item.getCacheName(), item.getKey(), null );
-            final LateralElementDescriptor<K, V> led = new LateralElementDescriptor<>( ce );
-            led.requesterId = requesterId;
-            led.command = LateralCommand.REMOVE;
+            final LateralElementDescriptor<K, V> led =
+                    new LateralElementDescriptor<>(ce, LateralCommand.REMOVE, requesterId);
             led.valHashCode = item.getVal().hashCode();
             sender.send( led );
         }
@@ -165,9 +181,8 @@ public class LateralTCPService<K, V>
         throws IOException
     {
         final CacheElement<K, V> ce = new CacheElement<>( cacheName, key, null );
-        final LateralElementDescriptor<K, V> led = new LateralElementDescriptor<>( ce );
-        led.requesterId = requesterId;
-        led.command = LateralCommand.REMOVE;
+        final LateralElementDescriptor<K, V> led =
+                new LateralElementDescriptor<>(ce, LateralCommand.REMOVE, requesterId);
         sender.send( led );
     }
 
@@ -226,9 +241,9 @@ public class LateralTCPService<K, V>
         if ( this.allowGet )
         {
             final CacheElement<K, V> ce = new CacheElement<>( cacheName, key, null );
-            final LateralElementDescriptor<K, V> led = new LateralElementDescriptor<>( ce );
+            final LateralElementDescriptor<K, V> led =
+                    new LateralElementDescriptor<>(ce, LateralCommand.GET);
             // led.requesterId = requesterId; // later
-            led.command = LateralCommand.GET;
             @SuppressWarnings("unchecked") // Need to cast from Object
             final
             ICacheElement<K, V> response = (ICacheElement<K, V>)sender.sendAndReceive( led );
@@ -275,9 +290,9 @@ public class LateralTCPService<K, V>
             return null;
         }
         final CacheElement<String, String> ce = new CacheElement<>( cacheName, pattern, null );
-        final LateralElementDescriptor<String, String> led = new LateralElementDescriptor<>( ce );
+        final LateralElementDescriptor<String, String> led =
+                new LateralElementDescriptor<>(ce, LateralCommand.GET_MATCHING);
         // led.requesterId = requesterId; // later
-        led.command = LateralCommand.GET_MATCHING;
 
         final Object response = sender.sendAndReceive( led );
         if ( response != null )
@@ -325,7 +340,7 @@ public class LateralTCPService<K, V>
         {
             for (final K key : keys)
             {
-                final ICacheElement<K, V> element = get( cacheName, key );
+                final ICacheElement<K, V> element = get( cacheName, key, requesterId );
 
                 if ( element != null )
                 {
@@ -347,9 +362,9 @@ public class LateralTCPService<K, V>
     public Set<K> getKeySet(final String cacheName) throws IOException
     {
         final CacheElement<String, String> ce = new CacheElement<>(cacheName, null, null);
-        final LateralElementDescriptor<String, String> led = new LateralElementDescriptor<>(ce);
+        final LateralElementDescriptor<String, String> led =
+                new LateralElementDescriptor<>(ce, LateralCommand.GET_KEYSET);
         // led.requesterId = requesterId; // later
-        led.command = LateralCommand.GET_KEYSET;
         final Object response = sender.sendAndReceive(led);
         if (response != null)
         {
@@ -380,15 +395,18 @@ public class LateralTCPService<K, V>
         throws IOException
     {
         final CacheElement<String, String> ce = new CacheElement<>( cacheName, "ALL", null );
-        final LateralElementDescriptor<String, String> led = new LateralElementDescriptor<>( ce );
-        led.requesterId = requesterId;
-        led.command = LateralCommand.REMOVEALL;
+        final LateralElementDescriptor<String, String> led =
+                new LateralElementDescriptor<>(ce, LateralCommand.REMOVEALL, requesterId);
         sender.send( led );
     }
 
     /**
+     * Test
      * @param args
+     *
+     * @deprecated Use unit tests
      */
+    @Deprecated
     public static void main( final String args[] )
     {
         try