You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/06/29 12:45:37 UTC
[27/38] usergrid git commit: Hook ReservationCache in via
ClusterClients
Hook ReservationCache in via ClusterClients
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2fb3ab32
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2fb3ab32
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2fb3ab32
Branch: refs/heads/usergrid-1268-akka-211
Commit: 2fb3ab32bca20be4c9133e340cb97145ca452a3c
Parents: d12307b
Author: Dave Johnson <sn...@apache.org>
Authored: Fri Jun 24 11:34:53 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Fri Jun 24 11:34:53 2016 -0400
----------------------------------------------------------------------
.../corepersistence/CpEntityManagerFactory.java | 2 +-
.../actorsystem/ActorSystemManager.java | 36 ++++++++++++++-
.../actorsystem/ActorSystemManagerImpl.java | 24 +++++++++-
.../uniquevalues/ReservationCacheActor.java | 4 +-
.../uniquevalues/UniqueValueActor.java | 27 ++++++-----
.../uniquevalues/UniqueValuesServiceImpl.java | 47 ++++++++------------
.../collection/AbstractUniqueValueTest.java | 2 +-
7 files changed, 96 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2fb3ab32/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 060ec18..eca5927 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -155,7 +155,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
actorSystemManager.registerMessageType( UniqueValueActor.Cancellation.class, "/user/uvProxy" );
actorSystemManager.registerMessageType( UniqueValueActor.Confirmation.class, "/user/uvProxy" );
actorSystemManager.start();
- actorSystemManager.waitForClientActors();
+ actorSystemManager.waitForClientActor();
} catch (Throwable t) {
logger.error("Error starting Akka", t);
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2fb3ab32/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
index c45ccac..cdb6caf 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
@@ -23,21 +23,55 @@ import akka.actor.ActorRef;
public interface ActorSystemManager {
+ /**
+ * Start create and start all Akka Actors, ClusterClients Routers and etc.
+ */
void start();
+ /**
+ * Start method used in JUnit tests.
+ */
void start(String hostname, Integer port, String currentRegion);
- void waitForClientActors();
+ /**
+ * Wait until ClientActor has seen some nodes and is ready for use.
+ */
+ void waitForClientActor();
+ /**
+ * True if ActorSystem and ClientActor are ready to be used.
+ */
boolean isReady();
+ /**
+ * MUST be called before start() to register any router producers to be configured.
+ */
void registerRouterProducer( RouterProducer routerProducer );
+ /**
+ * MUST be called before start() to register any messages to be sent.
+ * @param messageType Class of message.
+ * @param routerPath Router-path to which such messages are to be sent.
+ */
void registerMessageType( Class messageType, String routerPath );
+ /**
+ * Local client for ActorSystem, send all local messages here for routing.
+ */
ActorRef getClientActor();
+ /**
+ * Get ClientClient for specified remote region.
+ */
ActorRef getClusterClient(String region );
+ /**
+ * Get name of of this, the current region.
+ */
String getCurrentRegion();
+
+ /**
+ * Publish message to all topic subscribers in all regions.
+ */
+ void publishToAllRegions( String topic, Object message, ActorRef sender );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2fb3ab32/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
index 5e23c20..89980bc 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
@@ -23,6 +23,8 @@ import akka.actor.*;
import akka.cluster.client.ClusterClient;
import akka.cluster.client.ClusterClientReceptionist;
import akka.cluster.client.ClusterClientSettings;
+import akka.cluster.pubsub.DistributedPubSub;
+import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.common.collect.ArrayListMultimap;
@@ -56,6 +58,8 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
private final Map<Class, String> routersByMessageType = new HashMap<>();
private final Map<String, ActorRef> clusterClientsByRegion = new HashMap<String, ActorRef>(20);
+ private ActorRef mediator;
+
private ActorRef clientActor;
private ListMultimap<String, String> seedsByRegion;
@@ -78,7 +82,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
this.port = null;
initAkka();
- waitForClientActors();
+ waitForClientActor();
}
@@ -132,6 +136,20 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
}
+ @Override
+ public void publishToAllRegions( String topic, Object message, ActorRef sender ) {
+
+ // send to local subscribers to topic
+ mediator.tell( new DistributedPubSubMediator.Publish( topic, message ), sender );
+
+ // send to each ClusterClient
+ for ( ActorRef clusterClient : clusterClientsByRegion.values() ) {
+ clusterClient.tell( new ClusterClient.Publish( topic, message ), sender );
+ }
+
+ }
+
+
private void initAkka() {
logger.info("Initializing Akka");
@@ -171,6 +189,8 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
for ( RouterProducer routerProducer : routerProducers ) {
routerProducer.createLocalSystemActors( localSystem );
}
+
+ mediator = DistributedPubSub.get( localSystem ).mediator();
}
@@ -359,7 +379,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
@Override
- public void waitForClientActors() {
+ public void waitForClientActor() {
waitForClientActor( clientActor );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2fb3ab32/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java
index 2912c7d..51f5c8c 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java
@@ -49,7 +49,7 @@ public class ReservationCacheActor extends UntypedActor {
ReservationCache.getInstance().cacheReservation( res );
if ( ++reservationCount % 10 == 0 ) {
- logger.debug("Received {} reservations cache size {}",
+ logger.info("Received {} reservations cache size {}",
reservationCount, ReservationCache.getInstance().getSize());
}
@@ -58,7 +58,7 @@ public class ReservationCacheActor extends UntypedActor {
ReservationCache.getInstance().cancelReservation( can );
if ( ++cancellationCount % 10 == 0 ) {
- logger.debug("Received {} cancellations", cancellationCount);
+ logger.info("Received {} cancellations", cancellationCount);
}
} else if (msg instanceof DistributedPubSubMediator.SubscribeAck) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2fb3ab32/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
index e53710c..a14c63e 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
@@ -21,6 +21,7 @@ import akka.actor.UntypedActor;
import akka.cluster.pubsub.DistributedPubSub;
import akka.cluster.pubsub.DistributedPubSubMediator;
import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.field.Field;
@@ -30,6 +31,7 @@ import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.UUID;
+
public class UniqueValueActor extends UntypedActor {
private static final Logger logger = LoggerFactory.getLogger( UniqueValueActor.class );
@@ -37,7 +39,7 @@ public class UniqueValueActor extends UntypedActor {
//private MetricsService metricsService;
- final private ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
+ private final ActorSystemManager actorSystemManager;
private final UniqueValuesTable table;
@@ -48,6 +50,8 @@ public class UniqueValueActor extends UntypedActor {
// TODO: is there a way to avoid this ugly kludge? see also: ClusterSingletonRouter
this.table = UniqueValuesServiceImpl.injector.getInstance( UniqueValuesTable.class );
+ this.actorSystemManager = UniqueValuesServiceImpl.injector.getInstance( ActorSystemManager.class );
+
//logger.info("UniqueValueActor {} is live with table {}", name, table);
}
@@ -66,7 +70,7 @@ public class UniqueValueActor extends UntypedActor {
if ( message instanceof Reservation ) {
Reservation res = (Reservation) message;
-// final Timer.Context context = metricsService.getReservationTimer().time();
+ // final Timer.Context context = metricsService.getReservationTimer().time();
try {
Id owner = table.lookupOwner( res.getApplicationScope(), res.getOwner().getType(), res.getField() );
@@ -86,8 +90,7 @@ public class UniqueValueActor extends UntypedActor {
getSender().tell( new Response( Response.Status.IS_UNIQUE ), getSender() );
- mediator.tell( new DistributedPubSubMediator.Publish( "content",
- new Reservation( res ) ), getSelf() );
+ actorSystemManager.publishToAllRegions( "content", new Reservation( res ), getSelf() );
} catch (Throwable t) {
@@ -96,13 +99,13 @@ public class UniqueValueActor extends UntypedActor {
} finally {
-// context.stop();
+ // context.stop();
}
} else if ( message instanceof Confirmation) {
Confirmation con = (Confirmation) message;
-// final Timer.Context context = metricsService.getCommitmentTimer().time();
+ // final Timer.Context context = metricsService.getCommitmentTimer().time();
try {
Id owner = table.lookupOwner( con.getApplicationScope(), con.getOwner().getType(), con.getField() );
@@ -122,15 +125,14 @@ public class UniqueValueActor extends UntypedActor {
getSender().tell( new Response( Response.Status.IS_UNIQUE ), getSender() );
- mediator.tell( new DistributedPubSubMediator.Publish( "content",
- new Reservation( con ) ), getSelf() );
+ actorSystemManager.publishToAllRegions( "content", new Reservation( con ), getSelf() );
} catch (Throwable t) {
getSender().tell( new Response( Response.Status.ERROR ), getSender() );
logger.error( "Error processing request", t );
} finally {
-// context.stop();
+ // context.stop();
}
@@ -155,8 +157,7 @@ public class UniqueValueActor extends UntypedActor {
getSender().tell( new Response( Response.Status.SUCCESS ), getSender() );
- mediator.tell( new DistributedPubSubMediator.Publish( "content",
- new Reservation( can ) ), getSelf() );
+ actorSystemManager.publishToAllRegions( "content", new Reservation( can ), getSelf() );
} catch (Throwable t) {
getSender().tell( new Response( Response.Status.ERROR ), getSender() );
@@ -180,10 +181,12 @@ public class UniqueValueActor extends UntypedActor {
final String consistentHashKey;
public Request( ApplicationScope applicationScope, Id owner, UUID ownerVersion, Field field ) {
+
this.applicationScope = applicationScope;
this.owner = owner;
this.ownerVersion = ownerVersion;
this.field = field;
+
StringBuilder sb = new StringBuilder();
sb.append( applicationScope.getApplication() );
sb.append(":");
@@ -195,10 +198,12 @@ public class UniqueValueActor extends UntypedActor {
this.consistentHashKey = sb.toString();
}
public Request( Request req ) {
+
this.applicationScope = req.applicationScope;
this.owner = req.owner;
this.ownerVersion = req.ownerVersion;
this.field = req.field;
+
StringBuilder sb = new StringBuilder();
sb.append( req.applicationScope.getApplication() );
sb.append(":");
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2fb3ab32/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
index b888b1f..352c2e5 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
@@ -82,17 +82,10 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
}
- // TODO: restore reservation cache
-// private void subscribeToReservations( ActorSystem localSystem, Map<String, ActorSystem> systemMap ) {
-//
-// for ( String region : systemMap.keySet() ) {
-// ActorSystem actorSystem = systemMap.get( region );
-// if ( !actorSystem.equals( localSystem ) ) {
-// logger.info("Starting ReservationCacheUpdater for {}", region );
-// actorSystem.actorOf( Props.create( ReservationCacheActor.class ), "subscriber");
-// }
-// }
-// }
+ private void subscribeToReservations( ActorSystem localSystem ) {
+ logger.info("Starting ReservationCacheUpdater");
+ localSystem.actorOf( Props.create( ReservationCacheActor.class ), "subscriber");
+ }
@Override
@@ -166,9 +159,9 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
scope, entity.getId(), version, field );
UniqueValueActor.Reservation res = reservationCache.get( request.getConsistentHashKey() );
-// if ( res != null ) {
-// getCacheCounter().inc();
-// }
+ // if ( res != null ) {
+ // getCacheCounter().inc();
+ // }
if ( res != null && !res.getOwner().equals( request.getOwner() )) {
throw new UniqueValueException( "Error property not unique (cache)", field);
}
@@ -194,10 +187,14 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
scope, entity.getId(), version, field );
if ( actorSystemManager.getCurrentRegion().equals( region ) ) {
+
+ // sending to current region, use local clientActor
ActorRef clientActor = actorSystemManager.getClientActor();
clientActor.tell( request, null );
} else {
+
+ // sending to remote region, send via cluster client for that region
ActorRef clusterClient = actorSystemManager.getClusterClient( region );
clusterClient.tell( new ClusterClient.Send("/user/clientActor", request), null );
}
@@ -205,16 +202,6 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
}
-// private ActorRef lookupRequestActorForType( String type ) {
-// final String region = getRegionsByType().get( type );
-// ActorRef requestActor = getRequestActorsByRegion().get( region == null ? currentRegion : region );
-// if ( requestActor == null ) {
-// throw new RuntimeException( "No request actor available for region: " + region );
-// }
-// return requestActor;
-// }
-
-
private void sendUniqueValueRequest(
Entity entity, String region, UniqueValueActor.Request request ) throws UniqueValueException {
@@ -226,19 +213,22 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
try {
Timeout t = new Timeout( 1, TimeUnit.SECONDS );
- // ask RequestActor and wait (up to timeout) for response
-
Future<Object> fut;
if ( actorSystemManager.getCurrentRegion().equals( region ) ) {
+
+ // sending to current region, use local clientActor
ActorRef clientActor = actorSystemManager.getClientActor();
fut = Patterns.ask( clientActor, request, t );
} else {
+
+ // sending to remote region, send via cluster client for that region
ActorRef clusterClient = actorSystemManager.getClusterClient( region );
fut = Patterns.ask( clusterClient, new ClusterClient.Send("/user/clientActor", request), t );
}
+ // wait (up to timeout) for response
response = (UniqueValueActor.Response) Await.result( fut, t.duration() );
if ( response != null && (
@@ -306,8 +296,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
@Override
public void createLocalSystemActors( ActorSystem localSystem ) {
- // TODO: restore reservation cache
- //subscribeToReservations( localSystem );
+ subscribeToReservations( localSystem );
}
@Override
@@ -317,6 +306,8 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
Map<String, Object> akka = (Map<String, Object>)configMap.get("akka");
+ // TODO: replace this configuration stuff with equivalent Java code in the above "create" methods
+
akka.put( "actor", new HashMap<String, Object>() {{
put( "deployment", new HashMap<String, Object>() {{
put( "/uvRouter/singleton/router", new HashMap<String, Object>() {{
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2fb3ab32/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/AbstractUniqueValueTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/AbstractUniqueValueTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/AbstractUniqueValueTest.java
index 041835b..e7b4450 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/AbstractUniqueValueTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/AbstractUniqueValueTest.java
@@ -41,7 +41,7 @@ public class AbstractUniqueValueTest {
actorSystemManager.registerMessageType( UniqueValueActor.Cancellation.class, "/user/uvProxy" );
actorSystemManager.registerMessageType( UniqueValueActor.Confirmation.class, "/user/uvProxy" );
actorSystemManager.start( "127.0.0.1", port, "us-east" );
- actorSystemManager.waitForRequestActors();
+ actorSystemManager.waitForClientActor();
startedAkka.put( port, true );
}