You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/08/22 21:57:43 UTC
[1/5] git commit: Make sure Tomcat gets the ulimit.
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o-notifications-queue 5586b33f9 -> 5e30645d8
Make sure Tomcat gets the ulimit.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d09bb712
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d09bb712
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d09bb712
Branch: refs/heads/two-dot-o-notifications-queue
Commit: d09bb71217e2f63defe848187055c16a96f1c663
Parents: ac0ca5d
Author: Dave Johnson <dm...@apigee.com>
Authored: Thu Aug 21 16:16:11 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Thu Aug 21 16:16:11 2014 -0400
----------------------------------------------------------------------
stack/awscluster/src/main/dist/init_instance/init_rest_server.sh | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d09bb712/stack/awscluster/src/main/dist/init_instance/init_rest_server.sh
----------------------------------------------------------------------
diff --git a/stack/awscluster/src/main/dist/init_instance/init_rest_server.sh b/stack/awscluster/src/main/dist/init_instance/init_rest_server.sh
index be95a51..7a8954e 100644
--- a/stack/awscluster/src/main/dist/init_instance/init_rest_server.sh
+++ b/stack/awscluster/src/main/dist/init_instance/init_rest_server.sh
@@ -80,7 +80,6 @@ case `(curl http://169.254.169.254/latest/meta-data/instance-type)` in
'c3.4xlarge' )
export TOMCAT_RAM=24G
export TOMCAT_THREADS=4000
- export NOFILE=200000
esac
export TOMCAT_CONNECTIONS=10000
@@ -88,6 +87,7 @@ sudo sed -i.bak "s/Xmx128m/Xmx${TOMCAT_RAM} -Xms${TOMCAT_RAM}/g" /etc/default/to
sudo sed -i.bak "s/<Connector/<Connector maxThreads=\"${TOMCAT_THREADS}\" acceptCount=\"${TOMCAT_THREADS}\" maxConnections=\"${TOMCAT_CONNECTIONS}\"/g" /var/lib/tomcat7/conf/server.xml
# set file limits
+sudo sed -i.bak "s/# \/etc\/init\.d\/tomcat7 -- startup script for the Tomcat 6 servlet engine/ulimit ${NOFILE}/" /etc/init.d/tomcat7
sudo sed -i.bak "/@student/a *\t\thard\tnofile\t\t${NOFILE}\n*\t\tsoft\tnofile\t\t${NOFILE}" /etc/security/limits.conf
echo "$NOFILE" | sudo tee > /proc/sys/fs/nr_open
echo "$NOFILE" | sudo tee > /proc/sys/fs/file-max
[5/5] git commit: Merge branch 'two-dot-o' of
https://git-wip-us.apache.org/repos/asf/incubator-usergrid into
two-dot-o-notifications-queue
Posted by sf...@apache.org.
Merge branch 'two-dot-o' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-notifications-queue
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5e30645d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5e30645d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5e30645d
Branch: refs/heads/two-dot-o-notifications-queue
Commit: 5e30645d816372b4b492e8a97b9cd75abe9ae9ac
Parents: 8a40594 839ac29
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Aug 22 13:57:19 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Aug 22 13:57:19 2014 -0600
----------------------------------------------------------------------
.../main/dist/init_instance/init_rest_server.sh | 6 +-
stack/awscluster/ugcluster-cf.json | 10 +-
.../corepersistence/CpEntityManager.java | 63 ++---------
.../corepersistence/CpRelationManager.java | 108 ++++++++++++++++---
.../usergrid/services/ServiceInvocationIT.java | 49 ++++-----
5 files changed, 134 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e30645d/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
[2/5] git commit: Fix to "reindex on update" logic that allows the
ServiceInvocationIT tests to pass.
Posted by sf...@apache.org.
Fix to "reindex on update" logic that allows the ServiceInvocationIT tests to pass.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/bc3ad2a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/bc3ad2a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/bc3ad2a3
Branch: refs/heads/two-dot-o-notifications-queue
Commit: bc3ad2a354cdb285ce1ec57d1df34a66c454eedf
Parents: d09bb71
Author: Dave Johnson <dm...@apigee.com>
Authored: Thu Aug 21 16:17:02 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Thu Aug 21 16:17:02 2014 -0400
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 63 ++---------
.../corepersistence/CpRelationManager.java | 108 ++++++++++++++++---
.../usergrid/services/ServiceInvocationIT.java | 49 ++++-----
3 files changed, 121 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bc3ad2a3/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index b216294..2ce3d56 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -520,61 +520,9 @@ public class CpEntityManager implements EntityManager {
}
}
- // update item in collection index
- IndexScope indexScope = new IndexScopeImpl(
- appScope.getApplication(),
- appScope.getApplication(),
- getCollectionScopeNameFromEntityType( entity.getType() ));
- EntityIndex ei = managerCache.getEntityIndex( indexScope );
- ei.index( cpEntity );
-
- // update all items index
- IndexScope allTypesIndexScope = new IndexScopeImpl(
- appScope.getApplication(),
- appScope.getApplication(),
- ALL_TYPES);
- EntityIndex aei = managerCache.getEntityIndex( allTypesIndexScope );
- aei.index( cpEntity );
-
- // next, update entity in every collection and connection scope in which it is indexed
- updateEntityIndexes( entity, cpEntity );
- }
-
-
- private void updateEntityIndexes(
- Entity entity, org.apache.usergrid.persistence.model.entity.Entity cpEntity )
- throws Exception {
-
- RelationManager rm = getRelationManager( entity );
- Map<String, Map<UUID, Set<String>>> owners = rm.getOwners();
-
- logger.debug( "Updating indexes of all {} collections owning the entity",
- owners.keySet().size() );
-
- for ( String ownerType : owners.keySet() ) {
- Map<UUID, Set<String>> collectionsByUuid = owners.get( ownerType );
-
- for ( UUID uuid : collectionsByUuid.keySet() ) {
- Set<String> collections = collectionsByUuid.get( uuid );
- for ( String coll : collections ) {
-
- if ( coll == null || coll.trim().isEmpty() ) {
- logger.warn( "Ignoring empty collection name for owner {}:{}",
- uuid, ownerType );
- break;
- }
-
- IndexScope indexScope = new IndexScopeImpl(
- appScope.getApplication(),
- new SimpleId( uuid, ownerType ),
- getCollectionScopeNameFromCollectionName( coll ) );
-
- EntityIndex ei = managerCache.getEntityIndex( indexScope );
-
- ei.index( cpEntity );
- }
- }
- }
+ // update in all containing collections and connection indexes
+ CpRelationManager rm = (CpRelationManager)getRelationManager( entity );
+ rm.updateContainingCollectionAndCollectionIndexes( entity, cpEntity );
}
@@ -1056,8 +1004,9 @@ public class CpEntityManager implements EntityManager {
ei.index( cpEntity );
- // update entity in every collection and connection scope in which it is indexed
- updateEntityIndexes( get( entityRef ), cpEntity );
+ // update in all containing collections and connection indexes
+ CpRelationManager rm = (CpRelationManager)getRelationManager( entityRef );
+ rm.updateContainingCollectionAndCollectionIndexes( get( entityRef ), cpEntity );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bc3ad2a3/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index ad6041d..d467682 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -271,16 +271,24 @@ public class CpRelationManager implements RelationManager {
}
- static boolean isConnectionEdgeType( String type ) {
+ static boolean isCollectionEdgeType( String type ) {
return type.endsWith( EDGE_COLL_SUFFIX );
}
-
+
+ static boolean isConnectionEdgeType( String type ) {
+ return type.endsWith( EDGE_CONN_SUFFIX );
+ }
public String getConnectionName( String edgeType ) {
String[] parts = edgeType.split("\\|");
return parts[0];
}
+ public String getCollectionName( String edgeType ) {
+ String[] parts = edgeType.split("\\|");
+ return parts[0];
+ }
+
@Override
public Set<String> getCollectionIndexes( String collectionName ) throws Exception {
@@ -376,11 +384,13 @@ public class CpRelationManager implements RelationManager {
EntityRef eref = new SimpleEntityRef(
edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
- String connectionName = null;
+ String name = null;
if ( isConnectionEdgeType( edge.getType() )) {
- connectionName = getConnectionName( edge.getType() );
+ name = getConnectionName( edge.getType() );
+ } else {
+ name = getCollectionName( edge.getType() );
}
- addMapSet( results, eref, connectionName );
+ addMapSet( results, eref, name );
}
if ( limit > 0 && results.keySet().size() >= limit ) {
@@ -388,16 +398,84 @@ public class CpRelationManager implements RelationManager {
}
}
- // should not need to do this
-// if ( connType == null ) {
-//
-// EntityRef applicationRef = new SimpleEntityRef( TYPE_APPLICATION, applicationId );
-// if ( !results.containsKey( applicationRef ) ) {
-//
-// addMapSet( results, applicationRef,
-// CpEntityManager.getCollectionScopeNameFromEntityType( headEntity.getType() ) );
-// }
-// }
+ return results;
+ }
+
+
+ public List<String> updateContainingCollectionAndCollectionIndexes(
+ Entity entity, org.apache.usergrid.persistence.model.entity.Entity cpEntity ) {
+
+ List<String> results = new ArrayList<String>();
+
+ GraphManager gm = managerCache.getGraphManager(applicationScope);
+
+ Iterator<String> edgeTypesToTarget = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType(
+ cpHeadEntity.getId(), null, null) ).toBlockingObservable().getIterator();
+
+ logger.debug("updateContainingCollectionsAndCollections(): "
+ + "Searched for edges to target {}:{}\n in scope {}\n found: {}",
+ new Object[] {
+ cpHeadEntity.getId().getType(),
+ cpHeadEntity.getId().getUuid(),
+ applicationScope.getApplication(),
+ edgeTypesToTarget.hasNext()
+ });
+
+ // loop through all types of edge to target
+ int count = 0;
+ while ( edgeTypesToTarget.hasNext() ) {
+
+ // get all edges of the type
+ String etype = edgeTypesToTarget.next();
+
+ Observable<Edge> edges = gm.loadEdgesToTarget( new SimpleSearchByEdgeType(
+ cpHeadEntity.getId(), etype, Long.MAX_VALUE, null ));
+
+ // loop through edges of that type
+ Iterator<Edge> iter = edges.toBlockingObservable().getIterator();
+ while ( iter.hasNext() ) {
+
+ Edge edge = iter.next();
+
+ EntityRef sourceEntity = new SimpleEntityRef(
+ edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
+
+ // reindex the entity in the source entity's collection or connection index
+
+ IndexScope indexScope;
+ if ( isCollectionEdgeType( edge.getType() )) {
+
+ String collName = getCollectionName( edge.getType() );
+ indexScope = new IndexScopeImpl(
+ applicationScope.getApplication(),
+ new SimpleId(sourceEntity.getUuid(), sourceEntity.getType()),
+ CpEntityManager.getCollectionScopeNameFromCollectionName( collName ));
+
+ } else {
+
+ String connName = getCollectionName( edge.getType() );
+ indexScope = new IndexScopeImpl(
+ applicationScope.getApplication(),
+ new SimpleId(sourceEntity.getUuid(), sourceEntity.getType()),
+ CpEntityManager.getConnectionScopeName( cpHeadEntity.getId().getType(), connName ));
+ }
+
+ EntityIndex ei = managerCache.getEntityIndex(indexScope);
+ ei.index(cpEntity);
+
+ // reindex the entity in the source entity's all-types index
+
+ indexScope = new IndexScopeImpl(
+ applicationScope.getApplication(),
+ new SimpleId(sourceEntity.getUuid(), sourceEntity.getType()),
+ ALL_TYPES);
+ ei = managerCache.getEntityIndex(indexScope);
+ ei.index(cpEntity);
+
+ count++;
+ }
+ }
+ logger.debug("updateContainingCollectionsAndCollections() updated {} indexes", count);
return results;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bc3ad2a3/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java b/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java
index 5d1bf25..bde23a5 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java
@@ -22,15 +22,13 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.index.query.Query;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
import static org.junit.Assert.assertNotNull;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ServiceInvocationIT extends AbstractServiceIT {
@@ -102,28 +100,25 @@ public class ServiceInvocationIT extends AbstractServiceIT {
app.testRequest( ServiceAction.PUT, 1, "users", "edanuff", "likes", cat.getUuid() );
- // TODO: uncomment this code and fix whatever problem is causing it to fail
- // see also: https://issues.apache.org/jira/browse/USERGRID-214
-
-// app.put( "eats", "petfood" );
-//
-// app.testRequest( ServiceAction.PUT, 1, "users", "edanuff", "likes", "cats", "dylan" );
-//
-// app.put( "Todays special", "Coffee" );
-//
-// app.testRequest( ServiceAction.PUT, 1, "users", "edanuff", "likes", "restaurants",
-// Query.fromQL( "select * where name='Brickhouse'" ) );
-//
-// app.testRequest( ServiceAction.DELETE, 1, null, "users", user.getUuid(), "connections", "likes",
-// restaurant.getUuid() );
-//
-// app.testRequest( ServiceAction.GET, 2, null, "users", "edanuff", "connections" );
-//
-// app.testRequest( ServiceAction.GET, 1, null, "users", "edanuff", "likes", "restaurants" );
-//
-// UUID uuid = UUIDGenerator.newTimeUUID();
-// app.put( "visits", 5 );
-// app.testRequest( ServiceAction.PUT, 1, "devices", uuid );
+ app.put( "eats", "petfood" );
+
+ app.testRequest( ServiceAction.PUT, 1, "users", "edanuff", "likes", "cats", "dylan" );
+
+ app.put( "Todays special", "Coffee" );
+
+ app.testRequest( ServiceAction.PUT, 1, "users", "edanuff", "likes", "restaurants",
+ Query.fromQL( "select * where name='Brickhouse'" ) );
+
+ app.testRequest( ServiceAction.DELETE, 1, null, "users", user.getUuid(), "connections", "likes",
+ restaurant.getUuid() );
+
+ app.testRequest( ServiceAction.GET, 2, null, "users", "edanuff", "connections" );
+
+ app.testRequest( ServiceAction.GET, 1, null, "users", "edanuff", "likes", "restaurants" );
+
+ UUID uuid = UUIDGenerator.newTimeUUID();
+ app.put( "visits", 5 );
+ app.testRequest( ServiceAction.PUT, 1, "devices", uuid );
}
[4/5] git commit: move notifier matching logic up
Posted by sf...@apache.org.
move notifier matching logic up
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8a40594d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8a40594d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8a40594d
Branch: refs/heads/two-dot-o-notifications-queue
Commit: 8a40594d8ec92dd1cd980614a339cc6b597fe674
Parents: 5586b33
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Aug 22 13:57:05 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Aug 22 13:57:05 2014 -0600
----------------------------------------------------------------------
.../persistence/entities/Notification.java | 11 ++
.../NotificationsQueueManager.java | 125 ++++++++++---------
.../services/notifications/QueueMessage.java | 31 ++++-
.../apns/NotificationsServiceIT.java | 1 -
4 files changed, 106 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a40594d/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
index 7a643ac..f070ee6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
@@ -41,6 +41,8 @@ public class Notification extends TypedEntity {
public static final String RECEIPTS_COLLECTION = "receipts";
+
+
public static enum State {
CREATED, FAILED, SCHEDULED, STARTED, FINISHED, CANCELED, EXPIRED
}
@@ -49,6 +51,10 @@ public class Notification extends TypedEntity {
@EntityProperty
protected Map<String, Object> payloads;
+ /** Total count */
+ @EntityProperty
+ private int expectedCount;
+
/** Time processed */
@EntityProperty
protected Long queued;
@@ -237,4 +243,9 @@ public class Notification extends TypedEntity {
public void setQueued(Long queued) {
this.queued = queued;
}
+
+ public void setExpectedCount(int expectedCount) { this.expectedCount = expectedCount; }
+
+ @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+ public int getExpectedCount() { return expectedCount; }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a40594d/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
index 34d4571..93543bd 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
@@ -135,6 +135,8 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
final AtomicInteger batchCount = new AtomicInteger(); //count devices so you can make a judgement on batching
final int numCurrentBatchesConfig = getNumConcurrentBatches();
final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<String>(); //build up list of issues
+ final HashMap<Object,Notifier> notifierMap = getNotifierMap();
+ final Map<String,Object> payloads = notification.getPayloads();
//get devices in querystring, and make sure you have access
if (pathQuery != null) {
@@ -163,7 +165,26 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
}else {
sketch.add(hash,1);
}
- QueueMessage message = new QueueMessage(em.getApplicationId(),notification.getUuid(),deviceRef.getUuid());
+ String notifierId = null;
+ String notifierName = null;
+
+ //find the device notifier info, match it to the payload
+ for (Map.Entry<String, Object> entry : payloads.entrySet()) {
+ Notifier notifier = notifierMap.get(entry.getKey());
+ String providerId = getProviderId(deviceRef, notifier);
+ if (providerId != null) {
+ notifierId = providerId;
+ notifierName = notifier.getName();
+ break;
+ }
+ }
+
+ if(notifierId == null){
+ LOG.debug("Notifier did not match for device {} ", deviceRef);
+ continue;
+ }
+
+ QueueMessage message = new QueueMessage(em.getApplicationId(),notification.getUuid(),deviceRef.getUuid(),notifierName,notifierId);
qm.postToQueue(QUEUE_NAME, message);
if(notification.getQueued() == null){
// update queued time
@@ -202,10 +223,12 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
if(errorMessages.size()>0){
properties.put("deliveryErrors", errorMessages.toArray());
- if(notification.getErrorMessage()==null){
+ if (notification.getErrorMessage() == null) {
notification.setErrorMessage("There was a problem delivering all of your notifications. See deliveryErrors in properties");
}
}
+
+ notification.setExpectedCount(deviceCount.get());
notification.addProperties(properties);
em.update(notification);
@@ -254,7 +277,6 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
map.put("notification",notification);
final Map<String, Object> payloads = notification.getPayloads();
final Map<String, Object> translatedPayloads = translatePayloads(payloads, notifierMap);
- map.put("payloads",payloads);
map.put("translatedPayloads",translatedPayloads);
LOG.info("sending batch of {} devices for Notification: {}", messages.size(), notification.getUuid());
return map;
@@ -274,7 +296,6 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
try {
UUID deviceUUID = message.getUuid();
HashMap<String, Object> notificationMap = notificationCache.get(message.getNotificationId());
- Map<String, Object> payloads = (Map<String, Object>) notificationMap.get("payloads");
Map<String, Object> translatedPayloads = (Map<String, Object>) notificationMap.get("translatedPayloads");
TaskManager taskManager = (TaskManager) notificationMap.get("taskManager");
Notification notification = (Notification) notificationMap.get("notification");
@@ -282,59 +303,35 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
return message;
}
boolean foundNotifier = false;
- for (Map.Entry<String, Object> entry : payloads.entrySet()) {
- try {
- String payloadKey = entry.getKey();
- Notifier notifier = notifierMap.get(payloadKey.toLowerCase());
- EntityRef deviceRef = new SimpleEntityRef(Device.ENTITY_TYPE, deviceUUID);
-
- String providerId;
+ try {
+ String notifierName = message.getNotifierName();
+ Notifier notifier = notifierMap.get(notifierName.toLowerCase());
+ Object payload = translatedPayloads.get(notifierName);
+ Receipt receipt = new Receipt(notification.getUuid(), message.getNotifierId(), payload, deviceUUID, message);
+ TaskTracker tracker = new TaskTracker(notifier, taskManager, receipt, deviceUUID);
+
+ if (payload == null) {
+ LOG.debug("selected device {} for notification {} doesn't have a valid payload. skipping.", deviceUUID, notification.getUuid());
try {
- providerId = getProviderId(deviceRef, notifier);
- if (providerId == null) {
- LOG.debug("Provider not found.{} {}", deviceRef, notifier.getName());
- continue;
- }
- } catch (Exception providerException) {
- LOG.error("Exception getting provider.", providerException);
- continue;
- }
- Object payload = translatedPayloads.get(payloadKey);
-
- Receipt receipt = new Receipt(notification.getUuid(), providerId, payload, deviceUUID,message);
- TaskTracker tracker = new TaskTracker(notifier, taskManager, receipt, deviceUUID);
- if (payload == null) {
- LOG.debug("selected device {} for notification {} doesn't have a valid payload. skipping.", deviceUUID, notification.getUuid());
- try {
- tracker.failed(0, "failed to match payload to " + payloadKey + " notifier");
- } catch (Exception e) {
- LOG.debug("failed to mark device failed" + e);
- }
- continue;
- }
-
- if (LOG.isDebugEnabled()) {
- StringBuilder sb = new StringBuilder();
- sb.append("sending notification ").append(notification.getUuid());
- sb.append(" to device ").append(deviceUUID);
- LOG.debug(sb.toString());
+ tracker.failed(0, "failed to match payload to " + message.getNotifierId() + " notifier");
+ } catch (Exception e) {
+ LOG.debug("failed to mark device failed" + e);
}
+ }
+ try {
+ ProviderAdapter providerAdapter = providerAdapters.get(notifier.getProvider());
+ providerAdapter.sendNotification(message.getNotifierId(), notifier, payload, notification, tracker);
+ } catch (Exception e) {
try {
- ProviderAdapter providerAdapter = providerAdapters.get(notifier.getProvider());
- providerAdapter.sendNotification(providerId, notifier, payload, notification, tracker);
-
- } catch (Exception e) {
- try {
- tracker.failed(0, e.getMessage());
- } catch (Exception trackerException) {
- LOG.error("tracker failed", trackerException);
- }
+ tracker.failed(0, e.getMessage());
+ } catch (Exception trackerException) {
+ LOG.error("tracker failed", trackerException);
}
- foundNotifier = true;
- } finally {
- sendMeter.mark();
}
+ foundNotifier = true;
+ } finally {
+ sendMeter.mark();
}
if (!foundNotifier) {
try {
@@ -344,14 +341,14 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
}
}
} catch (Exception x) {
-
+ LOG.error("Failure unknown",x);
}
return message;
}
});
}
}, Schedulers.io())
- .buffer(1000)
+ .buffer(QueueListener.BATCH_SIZE)
.map(new Func1<List<QueueMessage>, Object>() {
@Override
public Object call(List<QueueMessage> queueMessages) {
@@ -363,6 +360,19 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
LOG.error("providerAdapter.doneSendingNotifications: ", e);
}
}
+ //TODO: check if a notification is done and mark it
+ HashMap<UUID, Notification> notifications = new HashMap<UUID, Notification>();
+ for (QueueMessage message : queueMessages) {
+ if (notifications.get(message.getNotificationId()) == null) {
+ try {
+ final Notification notification = em.get(message.getNotificationId(), Notification.class);
+ finishedBatch(notification, 0, 0);
+ } catch (Exception e) {
+ LOG.error("Failed to finish batch", e);
+ }
+ }
+
+ }
notificationCache.cleanUp();
return null;
}
@@ -386,15 +396,16 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
Map<String, Object> properties = new HashMap<String, Object>(4);
properties.put("statistics", notification.getStatistics());
properties.put("modified", notification.getModified());
-
+ long sent = notification.getStatistics().get("sent");
+ long errors = notification.getStatistics().get("errors");
//none of this is known and should you ever do this
+ if(notification.getExpectedCount() == (errors + sent )) {
notification.setFinished(notification.getModified());
properties.put("finished", notification.getModified());
properties.put("state", notification.getState());
long elapsed = notification.getFinished()
- notification.getStarted();
- long sent = notification.getStatistics().get("sent");
- long errors = notification.getStatistics().get("errors");
+
if (LOG.isInfoEnabled()) {
StringBuilder sb = new StringBuilder();
@@ -403,7 +414,7 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
sb.append(" devices in ").append(elapsed).append(" ms");
LOG.info(sb.toString());
}
-
+ }
LOG.info("notification finished batch: {}",
notification.getUuid());
em.updateProperties(notification, properties);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a40594d/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueMessage.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueMessage.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueMessage.java
index e6301ed..b8b5c6f 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueMessage.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueMessage.java
@@ -25,25 +25,32 @@ public class QueueMessage extends Message {
static final String MESSAGE_PROPERTY_DEVICE_UUID = "deviceUUID";
static final String MESSAGE_PROPERTY_APPLICATION_UUID = "applicationUUID";
+ static final String MESSAGE_PROPERTY_NOTIFIER_ID = "notifierId";
+ static final String MESSAGE_PROPERTY_NOTIFICATION_ID = "notificationId";
+ static final String MESSAGE_PROPERTY_NOTIFIER_NAME = "notifierName";
public QueueMessage() {
}
- public QueueMessage(UUID applicationId,UUID notificationId,UUID deviceId){
+ public QueueMessage(UUID applicationId,UUID notificationId,UUID deviceId,String notifierName,String notifierId){
setApplicationId(applicationId);
setDeviceId(deviceId);
+ setNotificationId(notificationId);
+ setNotifierName(notifierName);
+ setNotifierId(notifierId);
}
public static QueueMessage generate(Message message){
- return new QueueMessage((UUID) message.getObjectProperty(MESSAGE_PROPERTY_APPLICATION_UUID),(UUID) message.getObjectProperty("notificationId"),(UUID) message.getObjectProperty(MESSAGE_PROPERTY_DEVICE_UUID));
+ return new QueueMessage((UUID) message.getObjectProperty(MESSAGE_PROPERTY_APPLICATION_UUID),(UUID) message.getObjectProperty(MESSAGE_PROPERTY_NOTIFICATION_ID),(UUID) message.getObjectProperty(MESSAGE_PROPERTY_DEVICE_UUID),message.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_NAME),message.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_ID));
}
public UUID getApplicationId() {
return (UUID) this.getObjectProperty(MESSAGE_PROPERTY_APPLICATION_UUID);
}
+
public void setApplicationId(UUID applicationId){
this.setProperty(MESSAGE_PROPERTY_APPLICATION_UUID,applicationId);
}
@@ -56,10 +63,26 @@ public class QueueMessage extends Message {
}
public UUID getNotificationId(){
- return (UUID) this.getObjectProperty("notificationId");
+ return (UUID) this.getObjectProperty(MESSAGE_PROPERTY_NOTIFICATION_ID);
}
public void setNotificationId(UUID notificationId){
- this.setProperty("notificationdId",notificationId);
+ this.setProperty(MESSAGE_PROPERTY_NOTIFICATION_ID,notificationId);
+ }
+
+ public String getNotifierId() {
+ return this.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_ID);
}
+ public void setNotifierId(String notifierId){
+ this.setProperty(MESSAGE_PROPERTY_NOTIFIER_ID,notifierId);
+ }
+
+ public String getNotifierName() {
+ return this.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_NAME);
+ }
+ public void setNotifierName(String name){
+ this.setProperty(MESSAGE_PROPERTY_NOTIFIER_NAME,name);
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a40594d/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
index ed15eeb..456df69 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
@@ -77,7 +77,6 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
public void before() throws Exception {
super.before();
// create apns notifier //
- NotificationsQueueManager.IS_TEST = true;
app.clear();
app.put("name", "apns");
[3/5] git commit: Add support for c3.2xlarge to AWS Cluster template.
Posted by sf...@apache.org.
Add support for c3.2xlarge to AWS Cluster template.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/839ac295
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/839ac295
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/839ac295
Branch: refs/heads/two-dot-o-notifications-queue
Commit: 839ac295e20d8f214fbfa9536d3afa7e9f2b1e4b
Parents: bc3ad2a
Author: Dave Johnson <dm...@apigee.com>
Authored: Fri Aug 22 14:44:50 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Fri Aug 22 14:44:50 2014 -0400
----------------------------------------------------------------------
.../src/main/dist/init_instance/init_rest_server.sh | 4 ++++
stack/awscluster/ugcluster-cf.json | 10 ++++++++--
2 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/839ac295/stack/awscluster/src/main/dist/init_instance/init_rest_server.sh
----------------------------------------------------------------------
diff --git a/stack/awscluster/src/main/dist/init_instance/init_rest_server.sh b/stack/awscluster/src/main/dist/init_instance/init_rest_server.sh
index 7a8954e..d069d0d 100644
--- a/stack/awscluster/src/main/dist/init_instance/init_rest_server.sh
+++ b/stack/awscluster/src/main/dist/init_instance/init_rest_server.sh
@@ -77,6 +77,10 @@ case `(curl http://169.254.169.254/latest/meta-data/instance-type)` in
export TOMCAT_RAM=6G
export TOMCAT_THREADS=1600
;;
+'c3.2xlarge' )
+ export TOMCAT_RAM=12G
+ export TOMCAT_THREADS=2000
+;;
'c3.4xlarge' )
export TOMCAT_RAM=24G
export TOMCAT_THREADS=4000
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/839ac295/stack/awscluster/ugcluster-cf.json
----------------------------------------------------------------------
diff --git a/stack/awscluster/ugcluster-cf.json b/stack/awscluster/ugcluster-cf.json
index 9c4885f..1de4e54 100644
--- a/stack/awscluster/ugcluster-cf.json
+++ b/stack/awscluster/ugcluster-cf.json
@@ -40,6 +40,7 @@
"m1.xlarge",
"m3.xlarge",
"m3.large",
+ "c3.2xlarge",
"c3.4xlarge"
],
"ConstraintDescription": "must be valid instance type."
@@ -63,7 +64,7 @@
"CassInstanceType": {
"Description": "Instance type for Cass servers",
"Type": "String",
- "Default": "c3.4xlarge",
+ "Default": "c3.2xlarge",
"AllowedValues": [
"m1.small",
"m1.medium",
@@ -71,6 +72,7 @@
"m1.xlarge",
"m3.xlarge",
"m3.2xlarge",
+ "c3.2xlarge",
"c3.4xlarge"
],
"ConstraintDescription": "must be valid instance type."
@@ -151,6 +153,9 @@
"c3.2xlarge": {
"Arch": "64"
},
+ "c3.2xlarge": {
+ "Arch": "64"
+ },
"c3.4xlarge": {
"Arch": "64"
}
@@ -857,7 +862,8 @@
"UnhealthyThreshold": "8",
"Interval": "30",
"Timeout": "5"
- }
+ },
+ "CrossZone": "true"
}
},
"NotificationTopic": {