You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by tv...@apache.org on 2021/11/22 20:17:12 UTC

[commons-jcs] branch master updated (4d77eeb -> e3d8fb5)

This is an automated email from the ASF dual-hosted git repository.

tv pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/commons-jcs.git.


    from 4d77eeb  Minimize deprecation references
     new c773351  Remove dead code
     new e3d8fb5  Introduce serializer support throughout LateralTCPCache

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../auxiliary/lateral/LateralCacheMonitor.java     |  13 +-
 .../jcs3/auxiliary/lateral/LateralCacheNoWait.java |   2 +
 .../lateral/socket/tcp/LateralTCPCacheFactory.java | 139 +++++++++++++--------
 .../socket/tcp/LateralTCPDiscoveryListener.java    |   1 +
 .../lateral/socket/tcp/LateralTCPListener.java     |  20 +--
 .../lateral/socket/tcp/LateralTCPSender.java       |  19 ++-
 .../lateral/socket/tcp/LateralTCPService.java      |  62 +++++----
 .../tcp/LateralTCPIssueRemoveOnPutUnitTest.java    |  14 +--
 8 files changed, 171 insertions(+), 99 deletions(-)

[commons-jcs] 01/02: Remove dead code

Posted by tv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-jcs.git

commit c773351f849d38cde874d8fc051638a7a3524522
Author: Thomas Vandahl <tv...@apache.org>
AuthorDate: Mon Nov 22 21:15:59 2021 +0100

    Remove dead code
---
 .../socket/tcp/LateralTCPIssueRemoveOnPutUnitTest.java     | 14 ++++----------
 1 file changed, 4 insertions(+), 10 deletions(-)

diff --git a/commons-jcs-core/src/test/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPIssueRemoveOnPutUnitTest.java b/commons-jcs-core/src/test/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPIssueRemoveOnPutUnitTest.java
index 6ff8ee6..1910e89 100644
--- a/commons-jcs-core/src/test/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPIssueRemoveOnPutUnitTest.java
+++ b/commons-jcs-core/src/test/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPIssueRemoveOnPutUnitTest.java
@@ -129,8 +129,6 @@ public class LateralTCPIssueRemoveOnPutUnitTest
     public void runTestForRegion( final String region, final int range, final int numOps, final int testNum )
         throws Exception
     {
-        final boolean show = false;
-
         final CacheAccess<String, String> cache = JCS.getInstance( region );
 
         Thread.sleep( 100 );
@@ -169,23 +167,19 @@ public class LateralTCPIssueRemoveOnPutUnitTest
                 final ICacheElement<String, String> element = new CacheElement<>( region, key, region + ":data" + i
                     + " junk asdfffffffadfasdfasf " + kn + ":" + n );
                 service.update( element );
-                if ( show )
-                {
-                    p( "put " + key );
-                }
+                p("put " + key);
 
-                if (show && i % 100 == 0 )
+                if (i % 100 == 0)
                 {
-                    System.out.println( cache.getStats() );
+                    p(cache.getStats());
                 }
 
             }
-            p( "Finished cycle of " + numOps );
+            p("Finished cycle of " + numOps);
         }
         catch ( final Exception e )
         {
             p( e.toString() );
-            e.printStackTrace( System.out );
             throw e;
         }
 

[commons-jcs] 02/02: Introduce serializer support throughout LateralTCPCache

Posted by tv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-jcs.git

commit e3d8fb571f092ab0d1a2f7552c2719c362b4ec3e
Author: Thomas Vandahl <tv...@apache.org>
AuthorDate: Mon Nov 22 21:16:58 2021 +0100

    Introduce serializer support throughout LateralTCPCache
---
 .../auxiliary/lateral/LateralCacheMonitor.java     |  13 +-
 .../jcs3/auxiliary/lateral/LateralCacheNoWait.java |   2 +
 .../lateral/socket/tcp/LateralTCPCacheFactory.java | 139 +++++++++++++--------
 .../socket/tcp/LateralTCPDiscoveryListener.java    |   1 +
 .../lateral/socket/tcp/LateralTCPListener.java     |  20 +--
 .../lateral/socket/tcp/LateralTCPSender.java       |  19 ++-
 .../lateral/socket/tcp/LateralTCPService.java      |  62 +++++----
 7 files changed, 167 insertions(+), 89 deletions(-)

diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/LateralCacheMonitor.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/LateralCacheMonitor.java
index 44ca757..c6e6e43 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/LateralCacheMonitor.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/LateralCacheMonitor.java
@@ -112,16 +112,19 @@ public class LateralCacheMonitor extends AbstractAuxiliaryCacheMonitor
             final String cacheName = entry.getKey();
 
             @SuppressWarnings("unchecked") // Downcast to match service
-            final
-            LateralCacheNoWait<Object, Object> c = (LateralCacheNoWait<Object, Object>) entry.getValue();
-            if ( c.getStatus() == CacheStatus.ERROR )
+            final LateralCacheNoWait<Object, Object> c =
+                (LateralCacheNoWait<Object, Object>) entry.getValue();
+
+            if (c.getStatus() == CacheStatus.ERROR)
             {
                 log.info( "Found LateralCacheNoWait in error, " + cacheName );
 
-                final ITCPLateralCacheAttributes lca = (ITCPLateralCacheAttributes)c.getAuxiliaryCacheAttributes();
+                final ITCPLateralCacheAttributes lca =
+                        (ITCPLateralCacheAttributes) c.getAuxiliaryCacheAttributes();
 
                 // Get service instance
-                final ICacheServiceNonLocal<Object, Object> cacheService = factory.getCSNLInstance(lca);
+                final ICacheServiceNonLocal<Object, Object> cacheService =
+                        factory.getCSNLInstance(lca, c.getElementSerializer());
 
                 // If we can't fix them, just skip and re-try in the
                 // next round.
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/LateralCacheNoWait.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/LateralCacheNoWait.java
index 26e1e68..7424c86 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/LateralCacheNoWait.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/LateralCacheNoWait.java
@@ -83,6 +83,8 @@ public class LateralCacheNoWait<K, V>
     {
         this.cache = cache;
         this.identityKey = cache.getCacheName();
+        this.setCacheEventLogger(cache.getCacheEventLogger());
+        this.setElementSerializer(cache.getElementSerializer());
 
         log.debug( "Constructing LateralCacheNoWait, LateralCache = [{0}]", cache );
 
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java
index baf7025..bc6d864 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java
@@ -44,6 +44,7 @@ import org.apache.commons.jcs3.log.Log;
 import org.apache.commons.jcs3.log.LogManager;
 import org.apache.commons.jcs3.utils.discovery.UDPDiscoveryManager;
 import org.apache.commons.jcs3.utils.discovery.UDPDiscoveryService;
+import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
 
 /**
  * Constructs a LateralCacheNoWaitFacade for the given configuration. Each lateral service / local
@@ -76,11 +77,13 @@ public class LateralTCPCacheFactory
     /**
      * Creates a TCP lateral.
      * <p>
-     * @param iaca
-     * @param cacheMgr
-     * @param cacheEventLogger
-     * @param elementSerializer
-     * @return LateralCacheNoWaitFacade
+     * @param <K> cache key type
+     * @param <V> cache value type
+     * @param iaca the cache configuration object
+     * @param cacheMgr the cache manager
+     * @param cacheEventLogger the event logger
+     * @param elementSerializer the serializer to use when sending or receiving
+     * @return a LateralCacheNoWaitFacade
      */
     @Override
     public <K, V> LateralCacheNoWaitFacade<K, V> createCache(
@@ -101,13 +104,13 @@ public class LateralTCPCacheFactory
             for (final String server : servers)
             {
                 log.debug( "tcp server = {0}", server );
-                final ITCPLateralCacheAttributes lacC = (ITCPLateralCacheAttributes) lac.clone();
-                lacC.setTcpServer( server );
+                final ITCPLateralCacheAttributes lacClone = (ITCPLateralCacheAttributes) lac.clone();
+                lacClone.setTcpServer( server );
 
-                final LateralCacheNoWait<K, V> lateralNoWait = createCacheNoWait(lacC, cacheEventLogger, elementSerializer);
+                final LateralCacheNoWait<K, V> lateralNoWait = createCacheNoWait(lacClone, cacheEventLogger, elementSerializer);
 
-                addListenerIfNeeded( lacC, cacheMgr );
-                monitor.addCache(lateralNoWait);
+                addListenerIfNeeded( lacClone, cacheMgr );
+                monitorCache(lateralNoWait);
                 noWaits.add( lateralNoWait );
             }
         }
@@ -124,10 +127,20 @@ public class LateralTCPCacheFactory
         return lcnwf;
     }
 
+    /**
+     * Create a LateralCacheNoWait for the server configured in lca
+     *
+     * @param <K> cache key type
+     * @param <V> cache value type
+     * @param lca the cache configuration object
+     * @param cacheEventLogger the event logger
+     * @param elementSerializer the serializer to use when sending or receiving
+     * @return a LateralCacheNoWait
+     */
     public <K, V> LateralCacheNoWait<K, V> createCacheNoWait( final ITCPLateralCacheAttributes lca,
             final ICacheEventLogger cacheEventLogger, final IElementSerializer elementSerializer )
     {
-        final ICacheServiceNonLocal<K, V> lateralService = getCSNLInstance(lca);
+        final ICacheServiceNonLocal<K, V> lateralService = getCSNLInstance(lca, elementSerializer);
 
         final LateralCache<K, V> cache = new LateralCache<>( lca, lateralService, this.monitor );
         cache.setCacheEventLogger( cacheEventLogger );
@@ -137,8 +150,6 @@ public class LateralTCPCacheFactory
 
         final LateralCacheNoWait<K, V> lateralNoWait = new LateralCacheNoWait<>( cache );
         lateralNoWait.setIdentityKey(lca.getTcpServer());
-        lateralNoWait.setCacheEventLogger( cacheEventLogger );
-        lateralNoWait.setElementSerializer( elementSerializer );
 
         log.info( "Created LateralCacheNoWait for [{0}] LateralCacheNoWait = [{1}]",
                 lca, lateralNoWait );
@@ -205,55 +216,86 @@ public class LateralTCPCacheFactory
     /**
      * Returns an instance of the cache service.
      * <p>
+     * @param <K> cache key type
+     * @param <V> cache value type
      * @param lca configuration for the creation of a new service instance
      *
      * @return ICacheServiceNonLocal&lt;K, V&gt;
+     *
+     * @deprecated Specify serializer
+     */
+    @Deprecated
+    public <K, V> ICacheServiceNonLocal<K, V> getCSNLInstance( final ITCPLateralCacheAttributes lca )
+    {
+        return getCSNLInstance(lca, new StandardSerializer());
+    }
+
+    /**
+     * Returns an instance of the cache service.
+     * <p>
+     * @param <K> cache key type
+     * @param <V> cache value type
+     * @param lca configuration for the creation of a new service instance
+     * @param elementSerializer the serializer to use when sending or receiving
+     *
+     * @return ICacheServiceNonLocal&lt;K, V&gt;
      */
     // Need to cast because of common map for all cache services
     @SuppressWarnings("unchecked")
-    public <K, V> ICacheServiceNonLocal<K, V> getCSNLInstance( final ITCPLateralCacheAttributes lca )
+    public <K, V> ICacheServiceNonLocal<K, V> getCSNLInstance(final ITCPLateralCacheAttributes lca,
+            final IElementSerializer elementSerializer)
     {
         final String key = lca.getTcpServer();
 
-        csnlInstances.computeIfPresent(key, (name, service) -> {
+        return (ICacheServiceNonLocal<K, V>) csnlInstances.compute(key, (name, service) -> {
+
+            ICacheServiceNonLocal<?, ?> newService = service;
+
             // If service creation did not succeed last time, force retry
             if (service instanceof ZombieCacheServiceNonLocal)
             {
                 log.info("Disposing of zombie service instance for [{0}]", name);
-                return null;
+                newService = null;
             }
 
-            return service;
-        });
-
-        return (ICacheServiceNonLocal<K, V>) csnlInstances.computeIfAbsent(key, name -> {
-
-                    log.info( "Instance for [{0}] is null, creating", name );
-
-                    // Create the service
-                    try
-                    {
-                        log.info( "Creating TCP service, lca = {0}", lca );
-
-                        return new LateralTCPService<>( lca );
-                    }
-                    catch ( final IOException ex )
-                    {
-                        // Failed to connect to the lateral server.
-                        // Configure this LateralCacheManager instance to use the
-                        // "zombie" services.
-                        log.error( "Failure, lateral instance will use zombie service", ex );
-
-                        final ICacheServiceNonLocal<K, V> zombieService =
-                                new ZombieCacheServiceNonLocal<>( lca.getZombieQueueMaxSize() );
+            if (newService == null)
+            {
+                log.info( "Instance for [{0}] is null, creating", name );
+
+                // Create the service
+                try
+                {
+                    log.info( "Creating TCP service, lca = {0}", lca );
+
+                    newService = new LateralTCPService<>(lca, elementSerializer);
+                }
+                catch ( final IOException ex )
+                {
+                    // Failed to connect to the lateral server.
+                    // Configure this LateralCacheManager instance to use the
+                    // "zombie" services.
+                    log.error( "Failure, lateral instance will use zombie service", ex );
+
+                    newService = new ZombieCacheServiceNonLocal<>(lca.getZombieQueueMaxSize());
+
+                    // Notify the cache monitor about the error, and kick off
+                    // the recovery process.
+                    monitor.notifyError();
+                }
+            }
 
-                        // Notify the cache monitor about the error, and kick off
-                        // the recovery process.
-                        monitor.notifyError();
+            return newService;
+        });
+    }
 
-                        return zombieService;
-                    }
-                });
+    /**
+     * Add cache instance to monitor
+     *
+     * @param cache the cache instance
+     */
+    public void monitorCache(final LateralCacheNoWait<?, ?> cache)
+    {
+        monitor.addCache(cache);
     }
 
     /**
@@ -294,8 +336,7 @@ public class LateralTCPCacheFactory
         {
             try
             {
-                addLateralCacheListener( iaca.getCacheName(),
-                        LateralTCPListener.getInstance( iaca, cacheMgr ) );
+                addLateralCacheListener(iaca.getCacheName(), createListener(iaca, cacheMgr));
             }
             catch ( final IOException ioe )
             {
@@ -331,11 +372,11 @@ public class LateralTCPCacheFactory
      * This should be called by create cache.
      * <p>
      * @param attr  ITCPLateralCacheAttributes
-     * @param cacheMgr
+     * @param cacheMgr the composite cache manager
      *
      * @return the listener if created, else null
      */
-    private <K, V> ILateralCacheListener<K, V> createListener( final ITCPLateralCacheAttributes attr,
+    private static <K, V> ILateralCacheListener<K, V> createListener( final ITCPLateralCacheAttributes attr,
             final ICompositeCacheManager cacheMgr )
     {
         ILateralCacheListener<K, V> listener = null;
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListener.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListener.java
index c453b60..e24a684 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListener.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListener.java
@@ -305,6 +305,7 @@ public class LateralTCPDiscoveryListener
 
                     LateralCacheNoWait<?, ?> noWait =
                             factory.createCacheNoWait(lca, cacheEventLogger, elementSerializer);
+                    factory.monitorCache(noWait);
 
                     if (addNoWait(noWait))
                     {
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPListener.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPListener.java
index babe75e..fc92109 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPListener.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPListener.java
@@ -597,7 +597,7 @@ public class LateralTCPListener<K, V>
                         log.debug( "LateralElementDescriptor is null" );
                         continue;
                     }
-                    if ( led.requesterId == getListenerId() )
+                    if ( led.getRequesterId() == getListenerId() )
                     {
                         log.debug( "from self" );
                     }
@@ -645,7 +645,7 @@ public class LateralTCPListener<K, V>
                 return;
             }
 
-            if ( led.requesterId == getListenerId() )
+            if ( led.getRequesterId() == getListenerId() )
             {
                 log.debug( "from self" );
             }
@@ -689,35 +689,35 @@ public class LateralTCPListener<K, V>
      */
     private Object handleElement(final LateralElementDescriptor<K, V> led) throws IOException
     {
-        final String cacheName = led.ce.getCacheName();
-        final K key = led.ce.getKey();
+        final String cacheName = led.getPayload().getCacheName();
+        final K key = led.getPayload().getKey();
         Object obj = null;
 
-        switch (led.command)
+        switch (led.getCommand())
         {
             case UPDATE:
-                handlePut( led.ce );
+                handlePut(led.getPayload());
                 break;
 
             case REMOVE:
                 // if a hashcode was given and filtering is on
                 // check to see if they are the same
                 // if so, then don't remove, otherwise issue a remove
-                if ( (led.valHashCode != -1) &&
+                if ( (led.getValHashCode() != -1) &&
                         getTcpLateralCacheAttributes().isFilterRemoveByHashCode() )
                 {
                     final ICacheElement<K, V> test = getCache( cacheName ).localGet( key );
                     if ( test != null )
                     {
-                        if ( test.getVal().hashCode() == led.valHashCode )
+                        if ( test.getVal().hashCode() == led.getValHashCode() )
                         {
                             log.debug( "Filtering detected identical hashCode [{0}], "
                                     + "not issuing a remove for led {1}",
-                                    led.valHashCode, led );
+                                    led.getValHashCode(), led );
                             return null;
                         }
                         log.debug( "Different hashcodes, in cache [{0}] sent [{1}]",
-                                test.getVal().hashCode(), led.valHashCode );
+                                test.getVal()::hashCode, led::getValHashCode );
                     }
                 }
                 handleRemove( cacheName, key );
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPSender.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPSender.java
index 8fdb6d1..0696723 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPSender.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPSender.java
@@ -66,14 +66,29 @@ public class LateralTCPSender
      * <p>
      * @param lca
      * @throws IOException
+     * @deprecated Specify serializer
      */
+    @Deprecated
     public LateralTCPSender( final ITCPLateralCacheAttributes lca )
         throws IOException
     {
+        this(lca, new StandardSerializer());
+    }
+
+    /**
+     * Constructor for the LateralTCPSender object.
+     * <p>
+     * @param lca the configuration object
+     * @param serializer the serializer to use when sending
+     * @throws IOException
+     */
+    public LateralTCPSender( final ITCPLateralCacheAttributes lca, final IElementSerializer serializer )
+        throws IOException
+    {
         this.socketOpenTimeOut = lca.getOpenTimeOut();
         this.socketSoTimeOut = lca.getSocketTimeOut();
 
-        this.serializer = new StandardSerializer();
+        this.serializer = serializer;
 
         final String p1 = lca.getTcpServer();
         if ( p1 == null )
@@ -124,8 +139,6 @@ public class LateralTCPSender
         {
             throw new IOException( "Cannot connect to " + host + ":" + port, ioe );
         }
-
-        // socket.setSoTimeout( socketSoTimeOut );
     }
 
     /**
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPService.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPService.java
index e29e182..b26c98f 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPService.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPService.java
@@ -35,8 +35,10 @@ import org.apache.commons.jcs3.engine.CacheElement;
 import org.apache.commons.jcs3.engine.CacheInfo;
 import org.apache.commons.jcs3.engine.behavior.ICacheElement;
 import org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal;
+import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
 import org.apache.commons.jcs3.log.Log;
 import org.apache.commons.jcs3.log.LogManager;
+import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
 
 /**
  * A lateral cache service implementation. Does not implement getGroupKey
@@ -62,19 +64,35 @@ public class LateralTCPService<K, V>
     /**
      * Constructor for the LateralTCPService object
      * <p>
-     * @param lca ITCPLateralCacheAttributes
+     * @param lca ITCPLateralCacheAttributes the configuration object
      * @throws IOException
+     *
+     * @deprecated Specify serializer
      */
+    @Deprecated
     public LateralTCPService( final ITCPLateralCacheAttributes lca )
         throws IOException
     {
+        this(lca, new StandardSerializer());
+    }
+
+    /**
+     * Constructor for the LateralTCPService object
+     * <p>
+     * @param lca ITCPLateralCacheAttributes the configuration object
+     * @param serializer the serializer to use when sending
+     * @throws IOException
+     */
+    public LateralTCPService( final ITCPLateralCacheAttributes lca, final IElementSerializer serializer )
+        throws IOException
+    {
         this.allowGet = lca.isAllowGet();
         this.allowPut = lca.isAllowPut();
         this.issueRemoveOnPut = lca.isIssueRemoveOnPut();
 
         try
         {
-            sender = new LateralTCPSender( lca );
+            sender = new LateralTCPSender( lca, serializer );
 
             log.debug( "Created sender to [{0}]", lca::getTcpServer);
         }
@@ -83,7 +101,7 @@ public class LateralTCPService<K, V>
             // log.error( "Could not create sender", e );
             // This gets thrown over and over in recovery mode.
             // The stack trace isn't useful here.
-            log.error( "Could not create sender to [{0}] -- {1}", lca.getTcpServer(), e.getMessage());
+            log.error( "Could not create sender to [{0}] -- {1}", lca::getTcpServer, e::getMessage);
             throw e;
         }
     }
@@ -122,9 +140,8 @@ public class LateralTCPService<K, V>
         // if we shouldn't remove on put, then put
         if ( !this.issueRemoveOnPut )
         {
-            final LateralElementDescriptor<K, V> led = new LateralElementDescriptor<>( item );
-            led.requesterId = requesterId;
-            led.command = LateralCommand.UPDATE;
+            final LateralElementDescriptor<K, V> led =
+                    new LateralElementDescriptor<>(item, LateralCommand.UPDATE, requesterId);
             sender.send( led );
         }
         // else issue a remove with the hashcode for remove check on
@@ -135,9 +152,8 @@ public class LateralTCPService<K, V>
 
             // set the value to null so we don't send the item
             final CacheElement<K, V> ce = new CacheElement<>( item.getCacheName(), item.getKey(), null );
-            final LateralElementDescriptor<K, V> led = new LateralElementDescriptor<>( ce );
-            led.requesterId = requesterId;
-            led.command = LateralCommand.REMOVE;
+            final LateralElementDescriptor<K, V> led =
+                    new LateralElementDescriptor<>(ce, LateralCommand.REMOVE, requesterId);
             led.valHashCode = item.getVal().hashCode();
             sender.send( led );
         }
@@ -165,9 +181,8 @@ public class LateralTCPService<K, V>
         throws IOException
     {
         final CacheElement<K, V> ce = new CacheElement<>( cacheName, key, null );
-        final LateralElementDescriptor<K, V> led = new LateralElementDescriptor<>( ce );
-        led.requesterId = requesterId;
-        led.command = LateralCommand.REMOVE;
+        final LateralElementDescriptor<K, V> led =
+                new LateralElementDescriptor<>(ce, LateralCommand.REMOVE, requesterId);
         sender.send( led );
     }
 
@@ -226,9 +241,9 @@ public class LateralTCPService<K, V>
         if ( this.allowGet )
         {
             final CacheElement<K, V> ce = new CacheElement<>( cacheName, key, null );
-            final LateralElementDescriptor<K, V> led = new LateralElementDescriptor<>( ce );
+            final LateralElementDescriptor<K, V> led =
+                    new LateralElementDescriptor<>(ce, LateralCommand.GET);
             // led.requesterId = requesterId; // later
-            led.command = LateralCommand.GET;
             @SuppressWarnings("unchecked") // Need to cast from Object
             final
             ICacheElement<K, V> response = (ICacheElement<K, V>)sender.sendAndReceive( led );
@@ -275,9 +290,9 @@ public class LateralTCPService<K, V>
             return null;
         }
         final CacheElement<String, String> ce = new CacheElement<>( cacheName, pattern, null );
-        final LateralElementDescriptor<String, String> led = new LateralElementDescriptor<>( ce );
+        final LateralElementDescriptor<String, String> led =
+                new LateralElementDescriptor<>(ce, LateralCommand.GET_MATCHING);
         // led.requesterId = requesterId; // later
-        led.command = LateralCommand.GET_MATCHING;
 
         final Object response = sender.sendAndReceive( led );
         if ( response != null )
@@ -325,7 +340,7 @@ public class LateralTCPService<K, V>
         {
             for (final K key : keys)
             {
-                final ICacheElement<K, V> element = get( cacheName, key );
+                final ICacheElement<K, V> element = get( cacheName, key, requesterId );
 
                 if ( element != null )
                 {
@@ -347,9 +362,9 @@ public class LateralTCPService<K, V>
     public Set<K> getKeySet(final String cacheName) throws IOException
     {
         final CacheElement<String, String> ce = new CacheElement<>(cacheName, null, null);
-        final LateralElementDescriptor<String, String> led = new LateralElementDescriptor<>(ce);
+        final LateralElementDescriptor<String, String> led =
+                new LateralElementDescriptor<>(ce, LateralCommand.GET_KEYSET);
         // led.requesterId = requesterId; // later
-        led.command = LateralCommand.GET_KEYSET;
         final Object response = sender.sendAndReceive(led);
         if (response != null)
         {
@@ -380,15 +395,18 @@ public class LateralTCPService<K, V>
         throws IOException
     {
         final CacheElement<String, String> ce = new CacheElement<>( cacheName, "ALL", null );
-        final LateralElementDescriptor<String, String> led = new LateralElementDescriptor<>( ce );
-        led.requesterId = requesterId;
-        led.command = LateralCommand.REMOVEALL;
+        final LateralElementDescriptor<String, String> led =
+                new LateralElementDescriptor<>(ce, LateralCommand.REMOVEALL, requesterId);
         sender.send( led );
     }
 
     /**
+     * Test
      * @param args
+     *
+     * @deprecated Use unit tests
      */
+    @Deprecated
     public static void main( final String args[] )
     {
         try