You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/08/22 21:57:43 UTC

[1/5] git commit: Make sure Tomcat gets the ulimit.

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-notifications-queue 5586b33f9 -> 5e30645d8


Make sure Tomcat gets the ulimit.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d09bb712
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d09bb712
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d09bb712

Branch: refs/heads/two-dot-o-notifications-queue
Commit: d09bb71217e2f63defe848187055c16a96f1c663
Parents: ac0ca5d
Author: Dave Johnson <dm...@apigee.com>
Authored: Thu Aug 21 16:16:11 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Thu Aug 21 16:16:11 2014 -0400

----------------------------------------------------------------------
 stack/awscluster/src/main/dist/init_instance/init_rest_server.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d09bb712/stack/awscluster/src/main/dist/init_instance/init_rest_server.sh
----------------------------------------------------------------------
diff --git a/stack/awscluster/src/main/dist/init_instance/init_rest_server.sh b/stack/awscluster/src/main/dist/init_instance/init_rest_server.sh
index be95a51..7a8954e 100644
--- a/stack/awscluster/src/main/dist/init_instance/init_rest_server.sh
+++ b/stack/awscluster/src/main/dist/init_instance/init_rest_server.sh
@@ -80,7 +80,6 @@ case `(curl http://169.254.169.254/latest/meta-data/instance-type)` in
 'c3.4xlarge' )
     export TOMCAT_RAM=24G
     export TOMCAT_THREADS=4000
-    export NOFILE=200000
 esac
 
 export TOMCAT_CONNECTIONS=10000
@@ -88,6 +87,7 @@ sudo sed -i.bak "s/Xmx128m/Xmx${TOMCAT_RAM} -Xms${TOMCAT_RAM}/g" /etc/default/to
 sudo sed -i.bak "s/<Connector/<Connector maxThreads=\"${TOMCAT_THREADS}\" acceptCount=\"${TOMCAT_THREADS}\" maxConnections=\"${TOMCAT_CONNECTIONS}\"/g" /var/lib/tomcat7/conf/server.xml
 
 # set file limits
+sudo sed -i.bak "s/# \/etc\/init\.d\/tomcat7 -- startup script for the Tomcat 6 servlet engine/ulimit ${NOFILE}/" /etc/init.d/tomcat7
 sudo sed -i.bak "/@student/a *\t\thard\tnofile\t\t${NOFILE}\n*\t\tsoft\tnofile\t\t${NOFILE}" /etc/security/limits.conf
 echo "$NOFILE" | sudo tee > /proc/sys/fs/nr_open
 echo "$NOFILE" | sudo tee > /proc/sys/fs/file-max


[5/5] git commit: Merge branch 'two-dot-o' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-notifications-queue

Posted by sf...@apache.org.
Merge branch 'two-dot-o' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-notifications-queue


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5e30645d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5e30645d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5e30645d

Branch: refs/heads/two-dot-o-notifications-queue
Commit: 5e30645d816372b4b492e8a97b9cd75abe9ae9ac
Parents: 8a40594 839ac29
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Aug 22 13:57:19 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Aug 22 13:57:19 2014 -0600

----------------------------------------------------------------------
 .../main/dist/init_instance/init_rest_server.sh |   6 +-
 stack/awscluster/ugcluster-cf.json              |  10 +-
 .../corepersistence/CpEntityManager.java        |  63 ++---------
 .../corepersistence/CpRelationManager.java      | 108 ++++++++++++++++---
 .../usergrid/services/ServiceInvocationIT.java  |  49 ++++-----
 5 files changed, 134 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e30645d/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------


[2/5] git commit: Fix to "reindex on update" logic that allows the ServiceInvocationIT tests to pass.

Posted by sf...@apache.org.
Fix  to "reindex on update" logic that allows the ServiceInvocationIT tests to pass.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/bc3ad2a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/bc3ad2a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/bc3ad2a3

Branch: refs/heads/two-dot-o-notifications-queue
Commit: bc3ad2a354cdb285ce1ec57d1df34a66c454eedf
Parents: d09bb71
Author: Dave Johnson <dm...@apigee.com>
Authored: Thu Aug 21 16:17:02 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Thu Aug 21 16:17:02 2014 -0400

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |  63 ++---------
 .../corepersistence/CpRelationManager.java      | 108 ++++++++++++++++---
 .../usergrid/services/ServiceInvocationIT.java  |  49 ++++-----
 3 files changed, 121 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bc3ad2a3/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index b216294..2ce3d56 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -520,61 +520,9 @@ public class CpEntityManager implements EntityManager {
             }
         }
         
-        // update item in collection index
-        IndexScope indexScope = new IndexScopeImpl(
-            appScope.getApplication(), 
-            appScope.getApplication(), 
-            getCollectionScopeNameFromEntityType( entity.getType() ));
-        EntityIndex ei = managerCache.getEntityIndex( indexScope );
-        ei.index( cpEntity );
-
-        // update all items index
-        IndexScope allTypesIndexScope = new IndexScopeImpl( 
-            appScope.getApplication(), 
-            appScope.getApplication(), 
-            ALL_TYPES);
-        EntityIndex aei = managerCache.getEntityIndex( allTypesIndexScope );
-        aei.index( cpEntity );
-
-        // next, update entity in every collection and connection scope in which it is indexed 
-        updateEntityIndexes( entity, cpEntity );
-    }
-
-
-    private void updateEntityIndexes( 
-            Entity entity, org.apache.usergrid.persistence.model.entity.Entity cpEntity )
-            throws Exception {
-
-        RelationManager rm = getRelationManager( entity );
-        Map<String, Map<UUID, Set<String>>> owners = rm.getOwners();
-
-        logger.debug( "Updating indexes of all {} collections owning the entity", 
-                owners.keySet().size() );
-
-        for ( String ownerType : owners.keySet() ) {
-            Map<UUID, Set<String>> collectionsByUuid = owners.get( ownerType );
-
-            for ( UUID uuid : collectionsByUuid.keySet() ) {
-                Set<String> collections = collectionsByUuid.get( uuid );
-                for ( String coll : collections ) {
-
-                    if ( coll == null || coll.trim().isEmpty() ) {
-                        logger.warn( "Ignoring empty collection name for owner {}:{}", 
-                                uuid, ownerType );
-                        break;
-                    }
-
-                    IndexScope indexScope = new IndexScopeImpl( 
-                        appScope.getApplication(), 
-                        new SimpleId( uuid, ownerType ),
-                        getCollectionScopeNameFromCollectionName( coll ) );
-
-                    EntityIndex ei = managerCache.getEntityIndex( indexScope );
-
-                    ei.index( cpEntity );
-                }
-            }
-        }
+        // update in all containing collections and connection indexes
+        CpRelationManager rm = (CpRelationManager)getRelationManager( entity );
+        rm.updateContainingCollectionAndCollectionIndexes( entity, cpEntity );
     }
 
 
@@ -1056,8 +1004,9 @@ public class CpEntityManager implements EntityManager {
 
         ei.index( cpEntity );
 
-        // update entity in every collection and connection scope in which it is indexed
-        updateEntityIndexes( get( entityRef ), cpEntity );
+        // update in all containing collections and connection indexes
+        CpRelationManager rm = (CpRelationManager)getRelationManager( entityRef );
+        rm.updateContainingCollectionAndCollectionIndexes( get( entityRef ), cpEntity );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bc3ad2a3/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index ad6041d..d467682 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -271,16 +271,24 @@ public class CpRelationManager implements RelationManager {
     }
 
 
-    static boolean isConnectionEdgeType( String type )  {
+    static boolean isCollectionEdgeType( String type )  {
         return type.endsWith( EDGE_COLL_SUFFIX );
     }
-
+    
+    static boolean isConnectionEdgeType( String type )  {
+        return type.endsWith( EDGE_CONN_SUFFIX );
+    }
     
     public String getConnectionName( String edgeType ) {
         String[] parts = edgeType.split("\\|");
         return parts[0];
     }
 
+    public String getCollectionName( String edgeType ) {
+        String[] parts = edgeType.split("\\|");
+        return parts[0];
+    }
+
 
     @Override
     public Set<String> getCollectionIndexes( String collectionName ) throws Exception {
@@ -376,11 +384,13 @@ public class CpRelationManager implements RelationManager {
                 EntityRef eref = new SimpleEntityRef( 
                     edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
 
-                String connectionName = null;
+                String name = null;
                 if ( isConnectionEdgeType( edge.getType() )) {
-                    connectionName = getConnectionName( edge.getType() );
+                    name = getConnectionName( edge.getType() );
+                } else {
+                    name = getCollectionName( edge.getType() );
                 }
-                addMapSet( results, eref, connectionName );
+                addMapSet( results, eref, name );
             }
 
             if ( limit > 0 && results.keySet().size() >= limit ) {
@@ -388,16 +398,84 @@ public class CpRelationManager implements RelationManager {
             }
         }
 
-        // should not need to do this
-//        if ( connType == null ) {
-//
-//            EntityRef applicationRef = new SimpleEntityRef( TYPE_APPLICATION, applicationId );
-//            if ( !results.containsKey( applicationRef ) ) {
-//
-//                addMapSet( results, applicationRef, 
-//                    CpEntityManager.getCollectionScopeNameFromEntityType( headEntity.getType() ) );
-//            }
-//        }
+        return results;
+    }
+
+
+    public List<String> updateContainingCollectionAndCollectionIndexes( 
+        Entity entity, org.apache.usergrid.persistence.model.entity.Entity cpEntity ) {
+
+        List<String> results = new ArrayList<String>();
+
+        GraphManager gm = managerCache.getGraphManager(applicationScope);
+
+        Iterator<String> edgeTypesToTarget = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( 
+            cpHeadEntity.getId(), null, null) ).toBlockingObservable().getIterator();
+
+        logger.debug("updateContainingCollectionsAndCollections(): "
+                + "Searched for edges to target {}:{}\n   in scope {}\n   found: {}", 
+            new Object[] {
+                cpHeadEntity.getId().getType(), 
+                cpHeadEntity.getId().getUuid(), 
+                applicationScope.getApplication(),
+                edgeTypesToTarget.hasNext()
+        });
+
+        // loop through all types of edge to target
+        int count = 0;
+        while ( edgeTypesToTarget.hasNext() ) {
+
+            // get all edges of the type
+            String etype = edgeTypesToTarget.next();
+
+            Observable<Edge> edges = gm.loadEdgesToTarget( new SimpleSearchByEdgeType(
+                cpHeadEntity.getId(), etype, Long.MAX_VALUE, null ));
+
+            // loop through edges of that type
+            Iterator<Edge> iter = edges.toBlockingObservable().getIterator();
+            while ( iter.hasNext() ) {
+
+                Edge edge = iter.next();
+
+                EntityRef sourceEntity = new SimpleEntityRef( 
+                    edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
+
+                // reindex the entity in the source entity's collection or connection index
+
+                IndexScope indexScope;
+                if ( isCollectionEdgeType( edge.getType() )) {
+
+                    String collName = getCollectionName( edge.getType() ); 
+                    indexScope = new IndexScopeImpl(
+                        applicationScope.getApplication(),
+                        new SimpleId(sourceEntity.getUuid(), sourceEntity.getType()),
+                        CpEntityManager.getCollectionScopeNameFromCollectionName( collName ));
+
+                } else {
+
+                    String connName = getCollectionName( edge.getType() ); 
+                    indexScope = new IndexScopeImpl(
+                        applicationScope.getApplication(),
+                        new SimpleId(sourceEntity.getUuid(), sourceEntity.getType()),
+                        CpEntityManager.getConnectionScopeName( cpHeadEntity.getId().getType(), connName ));
+                } 
+           
+                EntityIndex ei = managerCache.getEntityIndex(indexScope);
+                ei.index(cpEntity);
+
+                // reindex the entity in the source entity's all-types index
+                
+                indexScope = new IndexScopeImpl(
+                    applicationScope.getApplication(),
+                    new SimpleId(sourceEntity.getUuid(), sourceEntity.getType()),
+                    ALL_TYPES);
+                ei = managerCache.getEntityIndex(indexScope);
+                ei.index(cpEntity);
+
+                count++;
+            }
+        }
+        logger.debug("updateContainingCollectionsAndCollections() updated {} indexes", count);
         return results;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bc3ad2a3/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java b/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java
index 5d1bf25..bde23a5 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java
@@ -22,15 +22,13 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.index.query.Query;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
 import static org.junit.Assert.assertNotNull;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class ServiceInvocationIT extends AbstractServiceIT {
@@ -102,28 +100,25 @@ public class ServiceInvocationIT extends AbstractServiceIT {
 
         app.testRequest( ServiceAction.PUT, 1, "users", "edanuff", "likes", cat.getUuid() );
 
-        // TODO: uncomment this code and fix whatever problem is causing it to fail
-        // see also: https://issues.apache.org/jira/browse/USERGRID-214
-
-//        app.put( "eats", "petfood" );
-//
-//        app.testRequest( ServiceAction.PUT, 1, "users", "edanuff", "likes", "cats", "dylan" );
-//
-//        app.put( "Todays special", "Coffee" );
-//
-//        app.testRequest( ServiceAction.PUT, 1, "users", "edanuff", "likes", "restaurants",
-//                Query.fromQL( "select * where name='Brickhouse'" ) );
-//
-//        app.testRequest( ServiceAction.DELETE, 1, null, "users", user.getUuid(), "connections", "likes",
-//                restaurant.getUuid() );
-//
-//        app.testRequest( ServiceAction.GET, 2, null, "users", "edanuff", "connections" );
-//
-//        app.testRequest( ServiceAction.GET, 1, null, "users", "edanuff", "likes", "restaurants" );
-//
-//        UUID uuid = UUIDGenerator.newTimeUUID();
-//        app.put( "visits", 5 );
-//        app.testRequest( ServiceAction.PUT, 1, "devices", uuid );
+        app.put( "eats", "petfood" );
+
+        app.testRequest( ServiceAction.PUT, 1, "users", "edanuff", "likes", "cats", "dylan" );
+
+        app.put( "Todays special", "Coffee" );
+
+        app.testRequest( ServiceAction.PUT, 1, "users", "edanuff", "likes", "restaurants",
+                Query.fromQL( "select * where name='Brickhouse'" ) );
+
+        app.testRequest( ServiceAction.DELETE, 1, null, "users", user.getUuid(), "connections", "likes",
+                restaurant.getUuid() );
+
+        app.testRequest( ServiceAction.GET, 2, null, "users", "edanuff", "connections" );
+
+        app.testRequest( ServiceAction.GET, 1, null, "users", "edanuff", "likes", "restaurants" );
+
+        UUID uuid = UUIDGenerator.newTimeUUID();
+        app.put( "visits", 5 );
+        app.testRequest( ServiceAction.PUT, 1, "devices", uuid );
     }
 
 


[4/5] git commit: move notifier matching logic up

Posted by sf...@apache.org.
move notifier matching logic up


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8a40594d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8a40594d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8a40594d

Branch: refs/heads/two-dot-o-notifications-queue
Commit: 8a40594d8ec92dd1cd980614a339cc6b597fe674
Parents: 5586b33
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Aug 22 13:57:05 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Aug 22 13:57:05 2014 -0600

----------------------------------------------------------------------
 .../persistence/entities/Notification.java      |  11 ++
 .../NotificationsQueueManager.java              | 125 ++++++++++---------
 .../services/notifications/QueueMessage.java    |  31 ++++-
 .../apns/NotificationsServiceIT.java            |   1 -
 4 files changed, 106 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a40594d/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
index 7a643ac..f070ee6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
@@ -41,6 +41,8 @@ public class Notification extends TypedEntity {
 
     public static final String RECEIPTS_COLLECTION = "receipts";
 
+
+
     public static enum State {
         CREATED, FAILED, SCHEDULED, STARTED, FINISHED, CANCELED, EXPIRED
     }
@@ -49,6 +51,10 @@ public class Notification extends TypedEntity {
     @EntityProperty
     protected Map<String, Object> payloads;
 
+    /** Total count */
+    @EntityProperty
+    private int expectedCount;
+
     /** Time processed */
     @EntityProperty
     protected Long queued;
@@ -237,4 +243,9 @@ public class Notification extends TypedEntity {
     public void setQueued(Long queued) {
         this.queued = queued;
     }
+
+    public void setExpectedCount(int expectedCount) {  this.expectedCount = expectedCount;  }
+
+    @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+    public int getExpectedCount() {  return expectedCount;  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a40594d/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
index 34d4571..93543bd 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
@@ -135,6 +135,8 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
         final AtomicInteger batchCount = new AtomicInteger(); //count devices so you can make a judgement on batching
         final int numCurrentBatchesConfig = getNumConcurrentBatches();
         final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<String>(); //build up list of issues
+        final HashMap<Object,Notifier> notifierMap =  getNotifierMap();
+        final Map<String,Object> payloads = notification.getPayloads();
 
         //get devices in querystring, and make sure you have access
         if (pathQuery != null) {
@@ -163,7 +165,26 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
                                     }else {
                                         sketch.add(hash,1);
                                     }
-                                    QueueMessage message = new QueueMessage(em.getApplicationId(),notification.getUuid(),deviceRef.getUuid());
+                                    String notifierId = null;
+                                    String notifierName = null;
+
+                                    //find the device notifier info, match it to the payload
+                                    for (Map.Entry<String, Object> entry : payloads.entrySet()) {
+                                        Notifier notifier = notifierMap.get(entry.getKey());
+                                        String providerId = getProviderId(deviceRef, notifier);
+                                        if (providerId != null) {
+                                            notifierId = providerId;
+                                            notifierName = notifier.getName();
+                                            break;
+                                        }
+                                    }
+
+                                    if(notifierId == null){
+                                        LOG.debug("Notifier did not match for device {} ", deviceRef);
+                                        continue;
+                                    }
+
+                                    QueueMessage message = new QueueMessage(em.getApplicationId(),notification.getUuid(),deviceRef.getUuid(),notifierName,notifierId);
                                     qm.postToQueue(QUEUE_NAME, message);
                                     if(notification.getQueued() == null){
                                         // update queued time
@@ -202,10 +223,12 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
 
         if(errorMessages.size()>0){
             properties.put("deliveryErrors", errorMessages.toArray());
-            if(notification.getErrorMessage()==null){
+            if (notification.getErrorMessage() == null) {
                 notification.setErrorMessage("There was a problem delivering all of your notifications. See deliveryErrors in properties");
             }
         }
+
+        notification.setExpectedCount(deviceCount.get());
         notification.addProperties(properties);
         em.update(notification);
 
@@ -254,7 +277,6 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
                         map.put("notification",notification);
                         final Map<String, Object> payloads = notification.getPayloads();
                         final Map<String, Object> translatedPayloads = translatePayloads(payloads, notifierMap);
-                        map.put("payloads",payloads);
                         map.put("translatedPayloads",translatedPayloads);
                         LOG.info("sending batch of {} devices for Notification: {}", messages.size(), notification.getUuid());
                         return map;
@@ -274,7 +296,6 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
                                     try {
                                         UUID deviceUUID = message.getUuid();
                                         HashMap<String, Object> notificationMap = notificationCache.get(message.getNotificationId());
-                                        Map<String, Object> payloads = (Map<String, Object>) notificationMap.get("payloads");
                                         Map<String, Object> translatedPayloads = (Map<String, Object>) notificationMap.get("translatedPayloads");
                                         TaskManager taskManager = (TaskManager) notificationMap.get("taskManager");
                                         Notification notification = (Notification) notificationMap.get("notification");
@@ -282,59 +303,35 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
                                             return message;
                                         }
                                         boolean foundNotifier = false;
-                                        for (Map.Entry<String, Object> entry : payloads.entrySet()) {
-                                            try {
-                                                String payloadKey = entry.getKey();
-                                                Notifier notifier = notifierMap.get(payloadKey.toLowerCase());
-                                                EntityRef deviceRef = new SimpleEntityRef(Device.ENTITY_TYPE, deviceUUID);
-
-                                                String providerId;
+                                        try {
+                                            String notifierName = message.getNotifierName();
+                                            Notifier notifier = notifierMap.get(notifierName.toLowerCase());
+                                            Object payload = translatedPayloads.get(notifierName);
+                                            Receipt receipt = new Receipt(notification.getUuid(), message.getNotifierId(), payload, deviceUUID, message);
+                                            TaskTracker tracker = new TaskTracker(notifier, taskManager, receipt, deviceUUID);
+
+                                            if (payload == null) {
+                                                LOG.debug("selected device {} for notification {} doesn't have a valid payload. skipping.", deviceUUID, notification.getUuid());
                                                 try {
-                                                    providerId = getProviderId(deviceRef, notifier);
-                                                    if (providerId == null) {
-                                                        LOG.debug("Provider not found.{} {}", deviceRef, notifier.getName());
-                                                        continue;
-                                                    }
-                                                } catch (Exception providerException) {
-                                                    LOG.error("Exception getting provider.", providerException);
-                                                    continue;
-                                                }
-                                                Object payload = translatedPayloads.get(payloadKey);
-
-                                                Receipt receipt = new Receipt(notification.getUuid(), providerId, payload, deviceUUID,message);
-                                                TaskTracker tracker = new TaskTracker(notifier, taskManager, receipt, deviceUUID);
-                                                if (payload == null) {
-                                                    LOG.debug("selected device {} for notification {} doesn't have a valid payload. skipping.", deviceUUID, notification.getUuid());
-                                                    try {
-                                                        tracker.failed(0, "failed to match payload to " + payloadKey + " notifier");
-                                                    } catch (Exception e) {
-                                                        LOG.debug("failed to mark device failed" + e);
-                                                    }
-                                                    continue;
-                                                }
-
-                                                if (LOG.isDebugEnabled()) {
-                                                    StringBuilder sb = new StringBuilder();
-                                                    sb.append("sending notification ").append(notification.getUuid());
-                                                    sb.append(" to device ").append(deviceUUID);
-                                                    LOG.debug(sb.toString());
+                                                    tracker.failed(0, "failed to match payload to " + message.getNotifierId() + " notifier");
+                                                } catch (Exception e) {
+                                                    LOG.debug("failed to mark device failed" + e);
                                                 }
+                                            }
 
+                                            try {
+                                                ProviderAdapter providerAdapter = providerAdapters.get(notifier.getProvider());
+                                                providerAdapter.sendNotification(message.getNotifierId(), notifier, payload, notification, tracker);
+                                            } catch (Exception e) {
                                                 try {
-                                                    ProviderAdapter providerAdapter = providerAdapters.get(notifier.getProvider());
-                                                    providerAdapter.sendNotification(providerId, notifier, payload, notification, tracker);
-
-                                                } catch (Exception e) {
-                                                    try {
-                                                        tracker.failed(0, e.getMessage());
-                                                    } catch (Exception trackerException) {
-                                                        LOG.error("tracker failed", trackerException);
-                                                    }
+                                                    tracker.failed(0, e.getMessage());
+                                                } catch (Exception trackerException) {
+                                                    LOG.error("tracker failed", trackerException);
                                                 }
-                                                foundNotifier = true;
-                                            } finally {
-                                                sendMeter.mark();
                                             }
+                                            foundNotifier = true;
+                                        } finally {
+                                            sendMeter.mark();
                                         }
                                         if (!foundNotifier) {
                                             try {
@@ -344,14 +341,14 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
                                             }
                                         }
                                     } catch (Exception x) {
-
+                                        LOG.error("Failure unknown",x);
                                     }
                                     return message;
                                 }
                             });
                         }
                     }, Schedulers.io())
-                    .buffer(1000)
+                    .buffer(QueueListener.BATCH_SIZE)
                     .map(new Func1<List<QueueMessage>, Object>() {
                         @Override
                         public Object call(List<QueueMessage> queueMessages) {
@@ -363,6 +360,19 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
                                     LOG.error("providerAdapter.doneSendingNotifications: ", e);
                                 }
                             }
+                            //TODO: check if a notification is done and mark it
+                            HashMap<UUID, Notification> notifications = new HashMap<UUID, Notification>();
+                            for (QueueMessage message : queueMessages) {
+                                if (notifications.get(message.getNotificationId()) == null) {
+                                    try {
+                                        final Notification notification = em.get(message.getNotificationId(), Notification.class);
+                                        finishedBatch(notification, 0, 0);
+                                    } catch (Exception e) {
+                                        LOG.error("Failed to finish batch", e);
+                                    }
+                                }
+
+                            }
                             notificationCache.cleanUp();
                             return null;
                         }
@@ -386,15 +396,16 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
         Map<String, Object> properties = new HashMap<String, Object>(4);
         properties.put("statistics", notification.getStatistics());
         properties.put("modified", notification.getModified());
-
+        long sent = notification.getStatistics().get("sent");
+        long errors = notification.getStatistics().get("errors");
         //none of this is known and should you ever do this
+        if(notification.getExpectedCount() == (errors  + sent )) {
             notification.setFinished(notification.getModified());
             properties.put("finished", notification.getModified());
             properties.put("state", notification.getState());
             long elapsed = notification.getFinished()
                     - notification.getStarted();
-            long sent = notification.getStatistics().get("sent");
-            long errors = notification.getStatistics().get("errors");
+
 
             if (LOG.isInfoEnabled()) {
                 StringBuilder sb = new StringBuilder();
@@ -403,7 +414,7 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
                 sb.append(" devices in ").append(elapsed).append(" ms");
                 LOG.info(sb.toString());
             }
-
+        }
         LOG.info("notification finished batch: {}",
                 notification.getUuid());
         em.updateProperties(notification, properties);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a40594d/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueMessage.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueMessage.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueMessage.java
index e6301ed..b8b5c6f 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueMessage.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueMessage.java
@@ -25,25 +25,32 @@ public class QueueMessage extends Message {
 
     static final String MESSAGE_PROPERTY_DEVICE_UUID = "deviceUUID";
     static final String MESSAGE_PROPERTY_APPLICATION_UUID = "applicationUUID";
+    static final String MESSAGE_PROPERTY_NOTIFIER_ID = "notifierId";
+    static final String MESSAGE_PROPERTY_NOTIFICATION_ID = "notificationId";
+    static final String MESSAGE_PROPERTY_NOTIFIER_NAME = "notifierName";
 
 
     public QueueMessage() {
     }
 
-    public QueueMessage(UUID applicationId,UUID notificationId,UUID deviceId){
+    public QueueMessage(UUID applicationId,UUID notificationId,UUID deviceId,String notifierName,String notifierId){
         setApplicationId(applicationId);
         setDeviceId(deviceId);
+        setNotificationId(notificationId);
+        setNotifierName(notifierName);
+        setNotifierId(notifierId);
     }
 
 
 
     public static QueueMessage generate(Message message){
-        return new QueueMessage((UUID) message.getObjectProperty(MESSAGE_PROPERTY_APPLICATION_UUID),(UUID) message.getObjectProperty("notificationId"),(UUID) message.getObjectProperty(MESSAGE_PROPERTY_DEVICE_UUID));
+        return new QueueMessage((UUID) message.getObjectProperty(MESSAGE_PROPERTY_APPLICATION_UUID),(UUID) message.getObjectProperty(MESSAGE_PROPERTY_NOTIFICATION_ID),(UUID) message.getObjectProperty(MESSAGE_PROPERTY_DEVICE_UUID),message.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_NAME),message.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_ID));
     }
 
     public UUID getApplicationId() {
         return (UUID) this.getObjectProperty(MESSAGE_PROPERTY_APPLICATION_UUID);
     }
+
     public void setApplicationId(UUID applicationId){
         this.setProperty(MESSAGE_PROPERTY_APPLICATION_UUID,applicationId);
     }
@@ -56,10 +63,26 @@ public class QueueMessage extends Message {
     }
 
     public UUID getNotificationId(){
-        return (UUID) this.getObjectProperty("notificationId");
+        return (UUID) this.getObjectProperty(MESSAGE_PROPERTY_NOTIFICATION_ID);
     }
 
     public void setNotificationId(UUID notificationId){
-        this.setProperty("notificationdId",notificationId);
+        this.setProperty(MESSAGE_PROPERTY_NOTIFICATION_ID,notificationId);
+    }
+
+    public String getNotifierId() {
+        return  this.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_ID);
     }
+    public void setNotifierId(String notifierId){
+        this.setProperty(MESSAGE_PROPERTY_NOTIFIER_ID,notifierId);
+    }
+
+    public String getNotifierName() {
+        return  this.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_NAME);
+    }
+    public void setNotifierName(String name){
+        this.setProperty(MESSAGE_PROPERTY_NOTIFIER_NAME,name);
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a40594d/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
index ed15eeb..456df69 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
@@ -77,7 +77,6 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
     public void before() throws Exception {
         super.before();
         // create apns notifier //
-        NotificationsQueueManager.IS_TEST = true;
 
         app.clear();
         app.put("name", "apns");


[3/5] git commit: Add support for c3.2xlarge to AWS Cluster template.

Posted by sf...@apache.org.
Add support for c3.2xlarge to AWS Cluster template.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/839ac295
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/839ac295
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/839ac295

Branch: refs/heads/two-dot-o-notifications-queue
Commit: 839ac295e20d8f214fbfa9536d3afa7e9f2b1e4b
Parents: bc3ad2a
Author: Dave Johnson <dm...@apigee.com>
Authored: Fri Aug 22 14:44:50 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Fri Aug 22 14:44:50 2014 -0400

----------------------------------------------------------------------
 .../src/main/dist/init_instance/init_rest_server.sh       |  4 ++++
 stack/awscluster/ugcluster-cf.json                        | 10 ++++++++--
 2 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/839ac295/stack/awscluster/src/main/dist/init_instance/init_rest_server.sh
----------------------------------------------------------------------
diff --git a/stack/awscluster/src/main/dist/init_instance/init_rest_server.sh b/stack/awscluster/src/main/dist/init_instance/init_rest_server.sh
index 7a8954e..d069d0d 100644
--- a/stack/awscluster/src/main/dist/init_instance/init_rest_server.sh
+++ b/stack/awscluster/src/main/dist/init_instance/init_rest_server.sh
@@ -77,6 +77,10 @@ case `(curl http://169.254.169.254/latest/meta-data/instance-type)` in
     export TOMCAT_RAM=6G
     export TOMCAT_THREADS=1600
 ;;
+'c3.2xlarge' )
+    export TOMCAT_RAM=12G
+    export TOMCAT_THREADS=2000
+;;
 'c3.4xlarge' )
     export TOMCAT_RAM=24G
     export TOMCAT_THREADS=4000

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/839ac295/stack/awscluster/ugcluster-cf.json
----------------------------------------------------------------------
diff --git a/stack/awscluster/ugcluster-cf.json b/stack/awscluster/ugcluster-cf.json
index 9c4885f..1de4e54 100644
--- a/stack/awscluster/ugcluster-cf.json
+++ b/stack/awscluster/ugcluster-cf.json
@@ -40,6 +40,7 @@
                 "m1.xlarge",
                 "m3.xlarge",
                 "m3.large",
+                "c3.2xlarge",
                 "c3.4xlarge"
             ],
             "ConstraintDescription": "must be valid instance type."
@@ -63,7 +64,7 @@
         "CassInstanceType": {
             "Description": "Instance type for Cass servers",
             "Type": "String",
-            "Default": "c3.4xlarge",
+            "Default": "c3.2xlarge",
             "AllowedValues": [
                 "m1.small",
                 "m1.medium",
@@ -71,6 +72,7 @@
                 "m1.xlarge",
                 "m3.xlarge",
                 "m3.2xlarge",
+                "c3.2xlarge",
                 "c3.4xlarge"
             ],
             "ConstraintDescription": "must be valid instance type."
@@ -151,6 +153,9 @@
             "c3.2xlarge": {
                 "Arch": "64"
             },
+            "c3.2xlarge": {
+                "Arch": "64"
+            },
             "c3.4xlarge": {
                 "Arch": "64"
             }
@@ -857,7 +862,8 @@
                     "UnhealthyThreshold": "8",
                     "Interval": "30",
                     "Timeout": "5"
-                }
+                },
+                "CrossZone": "true"
             }
         },
         "NotificationTopic": {