You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/07/08 22:33:57 UTC

usergrid git commit: Fix REST test issues and implement a way to shutdown the Akka actor system, with hooks for JVM stop/servlet stop.

Repository: usergrid
Updated Branches:
  refs/heads/usergrid-1268-akka-211 31b20404d -> 5f39ee0af


Fix REST test issues and implement a way to shutdown the Akka actor system, with hooks for JVM stop/servlet stop.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/5f39ee0a
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/5f39ee0a
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/5f39ee0a

Branch: refs/heads/usergrid-1268-akka-211
Commit: 5f39ee0af119305ef1a63c7fb5b19e08db8a7890
Parents: 31b2040
Author: Michael Russo <mr...@apigee.com>
Authored: Fri Jul 8 15:33:26 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Fri Jul 8 15:33:26 2016 -0700

----------------------------------------------------------------------
 .../src/test/resources/usergrid-test.properties |  5 +-
 .../actorsystem/ActorSystemManager.java         |  2 +
 .../actorsystem/ActorSystemManagerImpl.java     | 81 +++++++++++++-------
 .../uniquevalues/ReservationCacheActor.java     | 25 ++++--
 .../uniquevalues/UniqueValueActor.java          |  4 +-
 .../apache/usergrid/rest/ShutdownListener.java  | 11 +++
 .../applications/ApplicationResourceIT.java     |  2 +-
 .../resources/usergrid-custom-test.properties   |  6 +-
 .../resources/usergrid-rest-deploy-context.xml  |  1 -
 9 files changed, 96 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f39ee0a/stack/config/src/test/resources/usergrid-test.properties
----------------------------------------------------------------------
diff --git a/stack/config/src/test/resources/usergrid-test.properties b/stack/config/src/test/resources/usergrid-test.properties
index 368c585..085a11d 100644
--- a/stack/config/src/test/resources/usergrid-test.properties
+++ b/stack/config/src/test/resources/usergrid-test.properties
@@ -30,6 +30,9 @@
 # This property is required to be set and cannot be defaulted anywhere
 usergrid.cluster_name=usergrid
 
+# Set this for testing purposesly only
+usergrid.test=true
+
 # Whether to user the remote Cassandra cluster or not
 cassandra.use_remote=false
 
@@ -153,7 +156,7 @@ groupid=counter_group
 autooffset.reset=smallest
 
 # set high batch size to minimize count overhead
-usergrid.counter.batch.size=10000
+usergrid.counter.batch.size=1
 
 usergrid.recaptcha.public=
 usergrid.recaptcha.private=

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f39ee0a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
index 893afca..c7322dd 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
@@ -81,4 +81,6 @@ public interface ActorSystemManager {
      * Publish message to all topic subscribers in all regions.
      */
     void publishToAllRegions( String topic, Object message, ActorRef sender );
+
+    void shutdownAll();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f39ee0a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
index 8399979..d8d284f 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
@@ -66,6 +66,8 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
 
     private ListMultimap<String, String> seedsByRegion;
 
+    private ActorSystem clusterSystem = null;
+
 
 
     @Inject
@@ -173,15 +175,15 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
         // Create one actor system with request actor for each region
 
         if ( StringUtils.isEmpty( currentRegion )) {
-            throw new RuntimeException( "No value specified for " + ActorSystemFig.CLUSTER_REGIONS_LOCAL );
+            throw new RuntimeException( "No value specified for: " + ActorSystemFig.CLUSTER_REGIONS_LOCAL );
         }
 
         if ( StringUtils.isEmpty( actorSystemFig.getRegionsList() )) {
-            throw new RuntimeException( "No value specified for " + ActorSystemFig.CLUSTER_REGIONS_LIST );
+            throw new RuntimeException( "No value specified for: " + ActorSystemFig.CLUSTER_REGIONS_LIST );
         }
 
         if ( StringUtils.isEmpty( actorSystemFig.getSeeds() )) {
-            throw new RuntimeException( "No value specified for " + ActorSystemFig.CLUSTER_SEEDS );
+            throw new RuntimeException( "No value specified for: " + ActorSystemFig.CLUSTER_SEEDS );
         }
 
         List regionList = Arrays.asList( actorSystemFig.getRegionsList().toLowerCase().split(",") );
@@ -191,15 +193,15 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
 
         Config config = readClusterSystemConfig();
 
-        ActorSystem localSystem = createClusterSystemsFromConfigs( config );
+        clusterSystem = createClusterSystemsFromConfigs( config );
 
-        createClientActors( localSystem );
+        createClientActors( clusterSystem );
 
         for ( RouterProducer routerProducer : routerProducers ) {
-            routerProducer.createLocalSystemActors( localSystem );
+            routerProducer.createLocalSystemActors( clusterSystem );
         }
 
-        mediator = DistributedPubSub.get( localSystem ).mediator();
+        mediator = DistributedPubSub.get( clusterSystem ).mediator();
     }
 
 
@@ -214,7 +216,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
 
             String[] regionSeeds = actorSystemFig.getSeeds().split( "," );
 
-            logger.info( "Found region {} seeds {}", regionSeeds.length, regionSeeds );
+            logger.info( "Found region [{}] seeds [{}]", regionSeeds.length, regionSeeds );
 
             try {
 
@@ -248,7 +250,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
 
                         String seed = "akka.tcp://ClusterSystem" + "@" + hostname + ":" + regionPort;
 
-                        logger.info( "Adding seed {} for region {}", seed, region );
+                        logger.info( "Adding seed [{}] for region [{}]", seed, region );
 
                         seedsByRegion.put( region, seed );
                     }
@@ -283,7 +285,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
 
             List<String> seeds = getSeedsByRegion().get( region );
 
-            logger.info( "Akka Config for region {} is:\n" + "   Hostname {}\n" + "   Seeds {}\n",
+            logger.info( "Akka Config for region [{}] is:\n" + "   Hostname [{}]\n" + "   Seeds [{}]\n",
                 region, hostname, seeds );
 
             int lastColon = seeds.get(0).lastIndexOf(":") + 1;
@@ -335,19 +337,38 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
      */
     private ActorSystem createClusterSystemsFromConfigs( Config config ) {
 
-        ActorSystem system = ActorSystem.create( "ClusterSystem", config );
 
-        for ( RouterProducer routerProducer : routerProducers ) {
-            logger.info("Creating {} for region {}", routerProducer.getName(), currentRegion );
-            routerProducer.createClusterSingletonManager( system );
-        }
+        // there is only 1 akka system for a Usergrid cluster
+        final String clusterName = "ClusterSystem";
+
+
+        if( clusterSystem == null) {
+
+            logger.info("Class: {}. ActorSystem [{}] not initialized, creating...", this, clusterName);
+
+            clusterSystem = ActorSystem.create( clusterName, config );
+
+            for ( RouterProducer routerProducer : routerProducers ) {
+                logger.info("Creating router producer [{}] for region [{}]", routerProducer.getName(), currentRegion );
+                routerProducer.createClusterSingletonManager( clusterSystem );
+            }
+
+            for ( RouterProducer routerProducer : routerProducers ) {
+                logger.info("Creating [{}] proxy for region [{}] role 'io'", routerProducer.getName(), currentRegion);
+                routerProducer.createClusterSingletonProxy( clusterSystem, "io" );
+            }
+
+            //add a shutdown hook to clean all actor systems if the JVM exits without the servlet container knowing
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+                @Override
+                public void run() {
+                    shutdownAll();
+                }
+            });
 
-        for ( RouterProducer routerProducer : routerProducers ) {
-            logger.info("Creating {} proxy for region {} role 'io'", routerProducer.getName(), currentRegion);
-            routerProducer.createClusterSingletonProxy( system, "io" );
         }
 
-        return system;
+        return clusterSystem;
     }
 
 
@@ -360,7 +381,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
 
             if ( currentRegion.equals( region )) {
 
-                logger.info( "Creating clientActor for region {}", region );
+                logger.info( "Creating clientActor for region [{}]", region );
 
                 // Each clientActor needs to know path to ClusterSingletonProxy and region
                 clientActor = system.actorOf(
@@ -381,7 +402,6 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
 
                 clusterClientsByRegion.put( region, clusterClient );
             }
-
         }
     }
 
@@ -394,7 +414,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
 
     private void waitForClientActor( ActorRef ra ) {
 
-        logger.info( "Waiting on request actor {}...", ra.path() );
+        logger.info( "Waiting on RequestActor [{}]...", ra.path() );
 
         started = false;
 
@@ -411,20 +431,29 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
                     started = true;
                     break;
                 }
-                logger.info( "Waiting for request actor {} region {} ({}s)", ra.path(), currentRegion, retries );
+                logger.info( "Waiting for RequestActor [{}] region [{}] for [{}s]", ra.path(), currentRegion, retries );
                 Thread.sleep( 1000 );
 
             } catch (Exception e) {
-                logger.error( "Error: Timeout waiting for requestActor" );
+                logger.error( "Error: Timeout waiting for RequestActor [{}]", ra.path() );
             }
             retries++;
         }
 
         if (started) {
-            logger.info( "RequestActor has started" );
+            logger.info( "RequestActor [{}] has started", ra.path() );
         } else {
-            throw new RuntimeException( "RequestActor did not start in time" );
+            throw new RuntimeException( "RequestActor ["+ra.path()+"] did not start in time" );
         }
     }
 
+    @Override
+    public void shutdownAll(){
+
+        logger.info("Shutting down Akka cluster: {}", clusterSystem.name());
+        clusterSystem.shutdown();
+
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f39ee0a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java
index 158b099..3998eb6 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java
@@ -49,27 +49,36 @@ public class ReservationCacheActor extends UntypedActor {
             ReservationCache.getInstance().cacheReservation( res );
 
             if ( ++reservationCount % 10 == 0 ) {
-                logger.info("Received {} reservations cache size {}",
+                if(logger.isDebugEnabled()) {
+                    logger.debug("Received {} reservations cache size {}",
                         reservationCount, ReservationCache.getInstance().getSize());
+                }
             }
 
         } else if ( msg instanceof UniqueValueActor.Cancellation ) {
             UniqueValueActor.Cancellation can = (UniqueValueActor.Cancellation) msg;
-            ReservationCache.getInstance().cancelReservation( can );
-
-            if (++cancellationCount % 10 == 0) {
-                logger.info( "Received {} cancellations", cancellationCount );
+            ReservationCache.getInstance().cancelReservation(can);
+            if (logger.isDebugEnabled()) {
+                if (++cancellationCount % 10 == 0) {
+                    logger.debug("Received {} cancellations", cancellationCount);
+                } else {
+                    logger.debug("Removing cancelled {} from reservation cache", can.getConsistentHashKey());
+                }
             }
-            logger.debug("Removing cancelled {} from reservation cache", can.getConsistentHashKey());
+
 
         } else if ( msg instanceof UniqueValueActor.Response ) {
             UniqueValueActor.Response response = (UniqueValueActor.Response) msg;
             ReservationCache.getInstance().cancelReservation( response );
 
-            logger.info("Removing completed {} from reservation cache", response.getConsistentHashKey());
+            if(logger.isDebugEnabled()) {
+                logger.debug("Removing completed {} from reservation cache", response.getConsistentHashKey());
+            }
 
         } else if (msg instanceof DistributedPubSubMediator.SubscribeAck) {
-            logger.debug( "subscribing" );
+            if(logger.isDebugEnabled()) {
+                logger.debug("subscribing");
+            }
 
         } else {
             unhandled( msg );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f39ee0a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
index 501037f..74f45eb 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
@@ -57,8 +57,8 @@ public class UniqueValueActor extends UntypedActor {
             Request req = (Request) message;
 
             count++;
-            if (count % 10 == 0) {
-                logger.info( "UniqueValueActor {} processed {} requests", name, count );
+            if (count % 10 == 0 && logger.isDebugEnabled()) {
+                logger.debug( "UniqueValueActor {} processed {} requests", name, count );
             }
         }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f39ee0a/stack/rest/src/main/java/org/apache/usergrid/rest/ShutdownListener.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/ShutdownListener.java b/stack/rest/src/main/java/org/apache/usergrid/rest/ShutdownListener.java
index f9f5421..f9f1653 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/ShutdownListener.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/ShutdownListener.java
@@ -18,8 +18,10 @@
 package org.apache.usergrid.rest;
 
 
+import com.google.inject.Injector;
 import org.apache.usergrid.batch.service.JobSchedulerService;
 import org.apache.usergrid.batch.service.SchedulerService;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
 import org.apache.usergrid.persistence.cassandra.CassandraService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,6 +65,15 @@ public class ShutdownListener implements ServletContextListener {
 
         logger.info("ShutdownListener invoked");
 
+        ApplicationContext ctx = WebApplicationContextUtils
+            .getWebApplicationContext(sce.getServletContext());
+
+        Injector injector = ctx.getBean(Injector.class);
+        ActorSystemManager actorSystemManager = injector.getInstance(ActorSystemManager.class);
+
+        // stop the Akka actor system
+        actorSystemManager.shutdownAll();
+
         boolean started = Boolean.parseBoolean(
             properties.getProperty(JobServiceBoostrap.START_SCHEDULER_PROP, "true"));
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f39ee0a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationResourceIT.java
index 06615df..9f4f8aa 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationResourceIT.java
@@ -279,7 +279,7 @@ public class ApplicationResourceIT extends AbstractRestIT {
             .get(ApiResponse.class);
 
         // assert that the response returns the correct URI
-        assertEquals(apiResponse.getUri(), String.format("http://sometestvalue/%s/%s", orgName, appName));
+        assertEquals(String.format("http://localhost:8080/%s/%s", orgName, appName), apiResponse.getUri());
 
         //unmarshal the application from the response
         Application application = new Application(apiResponse);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f39ee0a/stack/rest/src/test/resources/usergrid-custom-test.properties
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/resources/usergrid-custom-test.properties b/stack/rest/src/test/resources/usergrid-custom-test.properties
index f20f1e5..d845fcc 100644
--- a/stack/rest/src/test/resources/usergrid-custom-test.properties
+++ b/stack/rest/src/test/resources/usergrid-custom-test.properties
@@ -35,8 +35,6 @@ collection.stage.transient.timeout=5
 
 # other...
 usergrid.mongo.disable=true
-usergrid.counter.batch.size=1
-usergrid.api.url.base=http://sometestvalue
 
 usergrid.notifications.listener.run=false
 
@@ -63,5 +61,9 @@ usergrid.cluster.region.local=us-east
 usergrid.cluster.region.list=us-east
 usergrid.cluster.seeds=us-east\:localhost
 
+# Use random port here for REST tests run outside embedded tomcat because these will get an instance of Spring that
+# starts the Akka cluster, then the embedded tomcat will also try when it starts ( but using default props and port 2551)
+usergrid.cluster.port=2555
+
 collection.uniquevalues.actors=300
 collection.uniquevalues.authoritative.region=us-east

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f39ee0a/stack/rest/src/test/resources/usergrid-rest-deploy-context.xml
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/resources/usergrid-rest-deploy-context.xml b/stack/rest/src/test/resources/usergrid-rest-deploy-context.xml
index 9cc5ea6..07215ab 100644
--- a/stack/rest/src/test/resources/usergrid-rest-deploy-context.xml
+++ b/stack/rest/src/test/resources/usergrid-rest-deploy-context.xml
@@ -29,7 +29,6 @@
             <list>
                 <value>classpath:/usergrid-default.properties</value>
                 <value>classpath:/usergrid-test.properties</value>
-                <value>classpath:/usergrid-custom-test-rest.properties</value>
             </list>
         </property>
     </bean>