You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/06/29 12:45:37 UTC

[27/38] usergrid git commit: Hook ReservationCache in via ClusterClients

Hook ReservationCache in via ClusterClients


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2fb3ab32
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2fb3ab32
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2fb3ab32

Branch: refs/heads/usergrid-1268-akka-211
Commit: 2fb3ab32bca20be4c9133e340cb97145ca452a3c
Parents: d12307b
Author: Dave Johnson <sn...@apache.org>
Authored: Fri Jun 24 11:34:53 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Fri Jun 24 11:34:53 2016 -0400

----------------------------------------------------------------------
 .../corepersistence/CpEntityManagerFactory.java |  2 +-
 .../actorsystem/ActorSystemManager.java         | 36 ++++++++++++++-
 .../actorsystem/ActorSystemManagerImpl.java     | 24 +++++++++-
 .../uniquevalues/ReservationCacheActor.java     |  4 +-
 .../uniquevalues/UniqueValueActor.java          | 27 ++++++-----
 .../uniquevalues/UniqueValuesServiceImpl.java   | 47 ++++++++------------
 .../collection/AbstractUniqueValueTest.java     |  2 +-
 7 files changed, 96 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/2fb3ab32/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 060ec18..eca5927 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -155,7 +155,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
                 actorSystemManager.registerMessageType( UniqueValueActor.Cancellation.class, "/user/uvProxy" );
                 actorSystemManager.registerMessageType( UniqueValueActor.Confirmation.class, "/user/uvProxy" );
                 actorSystemManager.start();
-                actorSystemManager.waitForClientActors();
+                actorSystemManager.waitForClientActor();
 
             } catch (Throwable t) {
                 logger.error("Error starting Akka", t);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2fb3ab32/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
index c45ccac..cdb6caf 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
@@ -23,21 +23,55 @@ import akka.actor.ActorRef;
 
 public interface ActorSystemManager {
 
+    /**
+     * Start create and start all Akka Actors, ClusterClients Routers and etc.
+     */
     void start();
 
+    /**
+     * Start method used in JUnit tests.
+     */
     void start(String hostname, Integer port, String currentRegion);
 
-    void waitForClientActors();
+    /**
+     * Wait until ClientActor has seen some nodes and is ready for use.
+     */
+    void waitForClientActor();
 
+    /**
+     * True if ActorSystem and ClientActor are ready to be used.
+     */
     boolean isReady();
 
+    /**
+     * MUST be called before start() to register any router producers to be configured.
+     */
     void registerRouterProducer( RouterProducer routerProducer );
 
+    /**
+     * MUST be called before start() to register any messages to be sent.
+     * @param messageType Class of message.
+     * @param routerPath Router-path to which such messages are to be sent.
+     */
     void registerMessageType( Class messageType, String routerPath );
 
+    /**
+     * Local client for ActorSystem, send all local messages here for routing.
+     */
     ActorRef getClientActor();
 
+    /**
+     * Get ClientClient for specified remote region.
+     */
     ActorRef getClusterClient(String region );
 
+    /**
+     * Get name of of this, the current region.
+     */
     String getCurrentRegion();
+
+    /**
+     * Publish message to all topic subscribers in all regions.
+     */
+    void publishToAllRegions( String topic, Object message, ActorRef sender );
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2fb3ab32/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
index 5e23c20..89980bc 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
@@ -23,6 +23,8 @@ import akka.actor.*;
 import akka.cluster.client.ClusterClient;
 import akka.cluster.client.ClusterClientReceptionist;
 import akka.cluster.client.ClusterClientSettings;
+import akka.cluster.pubsub.DistributedPubSub;
+import akka.cluster.pubsub.DistributedPubSubMediator;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.collect.ArrayListMultimap;
@@ -56,6 +58,8 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
     private final Map<Class, String>    routersByMessageType = new HashMap<>();
     private final Map<String, ActorRef> clusterClientsByRegion = new HashMap<String, ActorRef>(20);
 
+    private ActorRef                    mediator;
+
     private ActorRef                    clientActor;
 
     private ListMultimap<String, String> seedsByRegion;
@@ -78,7 +82,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
         this.port = null;
 
         initAkka();
-        waitForClientActors();
+        waitForClientActor();
     }
 
 
@@ -132,6 +136,20 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
     }
 
 
+    @Override
+    public void publishToAllRegions( String topic, Object message, ActorRef sender  ) {
+
+        // send to local subscribers to topic
+        mediator.tell( new DistributedPubSubMediator.Publish( topic, message ), sender );
+
+        // send to each ClusterClient
+        for ( ActorRef clusterClient : clusterClientsByRegion.values() ) {
+            clusterClient.tell( new ClusterClient.Publish( topic, message ), sender );
+        }
+
+    }
+
+
     private void initAkka() {
         logger.info("Initializing Akka");
 
@@ -171,6 +189,8 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
         for ( RouterProducer routerProducer : routerProducers ) {
             routerProducer.createLocalSystemActors( localSystem );
         }
+
+        mediator = DistributedPubSub.get( localSystem ).mediator();
     }
 
 
@@ -359,7 +379,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
 
 
     @Override
-    public void waitForClientActors() {
+    public void waitForClientActor() {
 
         waitForClientActor( clientActor );
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2fb3ab32/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java
index 2912c7d..51f5c8c 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java
@@ -49,7 +49,7 @@ public class ReservationCacheActor extends UntypedActor {
             ReservationCache.getInstance().cacheReservation( res );
 
             if ( ++reservationCount % 10 == 0 ) {
-                logger.debug("Received {} reservations cache size {}",
+                logger.info("Received {} reservations cache size {}",
                         reservationCount, ReservationCache.getInstance().getSize());
             }
 
@@ -58,7 +58,7 @@ public class ReservationCacheActor extends UntypedActor {
             ReservationCache.getInstance().cancelReservation( can );
 
             if ( ++cancellationCount % 10 == 0 ) {
-                logger.debug("Received {} cancellations", cancellationCount);
+                logger.info("Received {} cancellations", cancellationCount);
             }
 
         } else if (msg instanceof DistributedPubSubMediator.SubscribeAck) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2fb3ab32/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
index e53710c..a14c63e 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
@@ -21,6 +21,7 @@ import akka.actor.UntypedActor;
 import akka.cluster.pubsub.DistributedPubSub;
 import akka.cluster.pubsub.DistributedPubSubMediator;
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.field.Field;
@@ -30,6 +31,7 @@ import org.slf4j.LoggerFactory;
 import java.io.Serializable;
 import java.util.UUID;
 
+
 public class UniqueValueActor extends UntypedActor {
     private static final Logger logger = LoggerFactory.getLogger( UniqueValueActor.class );
 
@@ -37,7 +39,7 @@ public class UniqueValueActor extends UntypedActor {
 
     //private MetricsService metricsService;
 
-    final private ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
+    private final ActorSystemManager actorSystemManager;
 
     private final UniqueValuesTable table;
 
@@ -48,6 +50,8 @@ public class UniqueValueActor extends UntypedActor {
 
         // TODO: is there a way to avoid this ugly kludge? see also: ClusterSingletonRouter
         this.table = UniqueValuesServiceImpl.injector.getInstance( UniqueValuesTable.class );
+        this.actorSystemManager = UniqueValuesServiceImpl.injector.getInstance( ActorSystemManager.class );
+
         //logger.info("UniqueValueActor {} is live with table {}", name, table);
     }
 
@@ -66,7 +70,7 @@ public class UniqueValueActor extends UntypedActor {
         if ( message instanceof Reservation ) {
             Reservation res = (Reservation) message;
 
-//            final Timer.Context context = metricsService.getReservationTimer().time();
+            // final Timer.Context context = metricsService.getReservationTimer().time();
 
             try {
                 Id owner = table.lookupOwner( res.getApplicationScope(), res.getOwner().getType(), res.getField() );
@@ -86,8 +90,7 @@ public class UniqueValueActor extends UntypedActor {
 
                 getSender().tell( new Response( Response.Status.IS_UNIQUE ), getSender() );
 
-                mediator.tell( new DistributedPubSubMediator.Publish( "content",
-                        new Reservation( res ) ), getSelf() );
+                actorSystemManager.publishToAllRegions( "content", new Reservation( res ), getSelf() );
 
             } catch (Throwable t) {
 
@@ -96,13 +99,13 @@ public class UniqueValueActor extends UntypedActor {
 
 
             } finally {
-//                context.stop();
+                // context.stop();
             }
 
         } else if ( message instanceof Confirmation) {
             Confirmation con = (Confirmation) message;
 
-//            final Timer.Context context = metricsService.getCommitmentTimer().time();
+            // final Timer.Context context = metricsService.getCommitmentTimer().time();
 
             try {
                 Id owner = table.lookupOwner( con.getApplicationScope(), con.getOwner().getType(), con.getField() );
@@ -122,15 +125,14 @@ public class UniqueValueActor extends UntypedActor {
 
                 getSender().tell( new Response( Response.Status.IS_UNIQUE ), getSender() );
 
-                mediator.tell( new DistributedPubSubMediator.Publish( "content",
-                        new Reservation( con ) ), getSelf() );
+                actorSystemManager.publishToAllRegions( "content", new Reservation( con ), getSelf() );
 
             } catch (Throwable t) {
                 getSender().tell( new Response( Response.Status.ERROR ), getSender() );
                 logger.error( "Error processing request", t );
 
             } finally {
-//                context.stop();
+                // context.stop();
             }
 
 
@@ -155,8 +157,7 @@ public class UniqueValueActor extends UntypedActor {
 
                 getSender().tell( new Response( Response.Status.SUCCESS ), getSender() );
 
-                mediator.tell( new DistributedPubSubMediator.Publish( "content",
-                        new Reservation( can ) ), getSelf() );
+                actorSystemManager.publishToAllRegions( "content", new Reservation( can ), getSelf() );
 
             } catch (Throwable t) {
                 getSender().tell( new Response( Response.Status.ERROR ), getSender() );
@@ -180,10 +181,12 @@ public class UniqueValueActor extends UntypedActor {
         final String consistentHashKey;
 
         public Request( ApplicationScope applicationScope, Id owner, UUID ownerVersion, Field field ) {
+
             this.applicationScope = applicationScope;
             this.owner = owner;
             this.ownerVersion = ownerVersion;
             this.field = field;
+
             StringBuilder sb = new StringBuilder();
             sb.append( applicationScope.getApplication() );
             sb.append(":");
@@ -195,10 +198,12 @@ public class UniqueValueActor extends UntypedActor {
             this.consistentHashKey = sb.toString();
         }
         public Request( Request req ) {
+
             this.applicationScope = req.applicationScope;
             this.owner = req.owner;
             this.ownerVersion = req.ownerVersion;
             this.field = req.field;
+
             StringBuilder sb = new StringBuilder();
             sb.append( req.applicationScope.getApplication() );
             sb.append(":");

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2fb3ab32/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
index b888b1f..352c2e5 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
@@ -82,17 +82,10 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
     }
 
 
-    // TODO: restore reservation cache
-//    private void subscribeToReservations( ActorSystem localSystem, Map<String, ActorSystem> systemMap ) {
-//
-//        for ( String region : systemMap.keySet() ) {
-//            ActorSystem actorSystem = systemMap.get( region );
-//            if ( !actorSystem.equals( localSystem ) ) {
-//                logger.info("Starting ReservationCacheUpdater for {}", region );
-//                actorSystem.actorOf( Props.create( ReservationCacheActor.class ), "subscriber");
-//            }
-//        }
-//    }
+    private void subscribeToReservations( ActorSystem localSystem ) {
+        logger.info("Starting ReservationCacheUpdater");
+        localSystem.actorOf( Props.create( ReservationCacheActor.class ), "subscriber");
+    }
 
 
     @Override
@@ -166,9 +159,9 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
             scope, entity.getId(), version, field );
 
         UniqueValueActor.Reservation res = reservationCache.get( request.getConsistentHashKey() );
-//        if ( res != null ) {
-//            getCacheCounter().inc();
-//        }
+        // if ( res != null ) {
+        //    getCacheCounter().inc();
+        // }
         if ( res != null && !res.getOwner().equals( request.getOwner() )) {
             throw new UniqueValueException( "Error property not unique (cache)", field);
         }
@@ -194,10 +187,14 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
             scope, entity.getId(), version, field );
 
         if ( actorSystemManager.getCurrentRegion().equals( region ) ) {
+
+            // sending to current region, use local clientActor
             ActorRef clientActor = actorSystemManager.getClientActor();
             clientActor.tell( request, null );
 
         } else {
+
+            // sending to remote region, send via cluster client for that region
             ActorRef clusterClient = actorSystemManager.getClusterClient( region );
             clusterClient.tell( new ClusterClient.Send("/user/clientActor", request), null );
         }
@@ -205,16 +202,6 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
     }
 
 
-//    private ActorRef lookupRequestActorForType( String type ) {
-//        final String region = getRegionsByType().get( type );
-//        ActorRef requestActor = getRequestActorsByRegion().get( region == null ? currentRegion : region );
-//        if ( requestActor == null ) {
-//            throw new RuntimeException( "No request actor available for region: " + region );
-//        }
-//        return requestActor;
-//    }
-
-
     private void sendUniqueValueRequest(
         Entity entity, String region, UniqueValueActor.Request request ) throws UniqueValueException {
 
@@ -226,19 +213,22 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
             try {
                 Timeout t = new Timeout( 1, TimeUnit.SECONDS );
 
-                // ask RequestActor and wait (up to timeout) for response
-
                 Future<Object> fut;
 
                 if ( actorSystemManager.getCurrentRegion().equals( region ) ) {
+
+                    // sending to current region, use local clientActor
                     ActorRef clientActor = actorSystemManager.getClientActor();
                     fut = Patterns.ask( clientActor, request, t );
 
                 } else {
+
+                    // sending to remote region, send via cluster client for that region
                     ActorRef clusterClient = actorSystemManager.getClusterClient( region );
                     fut = Patterns.ask( clusterClient, new ClusterClient.Send("/user/clientActor", request), t );
                 }
 
+                // wait (up to timeout) for response
                 response = (UniqueValueActor.Response) Await.result( fut, t.duration() );
 
                 if ( response != null && (
@@ -306,8 +296,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
 
     @Override
     public void createLocalSystemActors( ActorSystem localSystem ) {
-        // TODO: restore reservation cache
-        //subscribeToReservations( localSystem );
+        subscribeToReservations( localSystem );
     }
 
     @Override
@@ -317,6 +306,8 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
 
         Map<String, Object> akka = (Map<String, Object>)configMap.get("akka");
 
+        // TODO: replace this configuration stuff with equivalent Java code in the above "create" methods
+
         akka.put( "actor", new HashMap<String, Object>() {{
             put( "deployment", new HashMap<String, Object>() {{
                 put( "/uvRouter/singleton/router", new HashMap<String, Object>() {{

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2fb3ab32/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/AbstractUniqueValueTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/AbstractUniqueValueTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/AbstractUniqueValueTest.java
index 041835b..e7b4450 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/AbstractUniqueValueTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/AbstractUniqueValueTest.java
@@ -41,7 +41,7 @@ public class AbstractUniqueValueTest {
             actorSystemManager.registerMessageType( UniqueValueActor.Cancellation.class, "/user/uvProxy" );
             actorSystemManager.registerMessageType( UniqueValueActor.Confirmation.class, "/user/uvProxy" );
             actorSystemManager.start( "127.0.0.1", port, "us-east" );
-            actorSystemManager.waitForRequestActors();
+            actorSystemManager.waitForClientActor();
 
             startedAkka.put( port, true );
         }