You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/09/05 23:11:10 UTC
[07/33] git commit: adding notificationsservice
adding notificationsservice
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9d7901ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9d7901ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9d7901ae
Branch: refs/heads/two-dot-o
Commit: 9d7901ae388d80350439221f1a253a0846db7c71
Parents: 0a61101
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Aug 18 14:53:51 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Aug 18 14:53:51 2014 -0600
----------------------------------------------------------------------
.../index/exceptions/NoIndexException.java | 17 +-
.../index/query/tree/NotOperand.java | 17 +-
.../index/query/tree/WithinOperand.java | 17 +-
.../query/tree/StringLiteralTest.java | 17 +-
stack/services/pom.xml | 11 +
.../notifications/NotificationsService.java | 32 +
.../notifications/NotificationsService.java | 32 +
.../notifications/NotificationsService.java | 32 +
.../notifications/ConnectionException.java | 25 +
.../services/notifications/JobScheduler.java | 92 +++
.../notifications/NotificationBatchJob.java | 125 ++++
.../services/notifications/NotificationJob.java | 129 ++++
.../notifications/NotificationServiceProxy.java | 32 +
.../NotificationsQueueManager.java | 654 +++++++++++++++++++
.../notifications/NotificationsService.java | 327 ++++++++++
.../services/notifications/ProviderAdapter.java | 55 ++
.../services/notifications/QueueJob.java | 127 ++++
.../services/notifications/TaskManager.java | 196 ++++++
.../services/notifications/TaskTracker.java | 50 ++
.../services/notifications/TestAdapter.java | 106 +++
.../notifications/apns/APNsAdapter.java | 280 ++++++++
.../notifications/apns/APNsNotification.java | 101 +++
.../apns/FailedConnectionListener.java | 88 +++
.../apns/RejectedAPNsListener.java | 53 ++
.../notifications/apns/TestAPNsListener.java | 100 +++
.../apns/TestAPNsNotification.java | 124 ++++
.../services/notifications/gcm/GCMAdapter.java | 221 +++++++
.../services/notifiers/NotifiersService.java | 73 +++
.../notifications/NotificationsService.java | 32 +
.../notifications/NotificationsService.java | 32 +
30 files changed, 3165 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9d7901ae/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/exceptions/NoIndexException.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/exceptions/NoIndexException.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/exceptions/NoIndexException.java
index c505721..3043768 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/exceptions/NoIndexException.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/exceptions/NoIndexException.java
@@ -1,18 +1,19 @@
-/*******************************************************************************
- * Copyright 2012 Apigee Corporation
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- ******************************************************************************/
+ */
package org.apache.usergrid.persistence.index.exceptions;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9d7901ae/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/tree/NotOperand.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/tree/NotOperand.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/tree/NotOperand.java
index 8588d24..35b41f3 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/tree/NotOperand.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/tree/NotOperand.java
@@ -1,18 +1,19 @@
-/*******************************************************************************
- * Copyright 2012 Apigee Corporation
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- ******************************************************************************/
+ */
package org.apache.usergrid.persistence.index.query.tree;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9d7901ae/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/tree/WithinOperand.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/tree/WithinOperand.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/tree/WithinOperand.java
index 602fc34..9d913e3 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/tree/WithinOperand.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/tree/WithinOperand.java
@@ -1,18 +1,19 @@
-/*******************************************************************************
- * Copyright 2012 Apigee Corporation
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- ******************************************************************************/
+ */
package org.apache.usergrid.persistence.index.query.tree;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9d7901ae/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/query/tree/StringLiteralTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/query/tree/StringLiteralTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/query/tree/StringLiteralTest.java
index f67cb11..161430a 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/query/tree/StringLiteralTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/query/tree/StringLiteralTest.java
@@ -1,18 +1,19 @@
-/*******************************************************************************
- * Copyright 2012 Apigee Corporation
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- ******************************************************************************/
+ */
package org.apache.usergrid.persistence.query.tree;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9d7901ae/stack/services/pom.xml
----------------------------------------------------------------------
diff --git a/stack/services/pom.xml b/stack/services/pom.xml
index 3d81558..ace3c8d 100644
--- a/stack/services/pom.xml
+++ b/stack/services/pom.xml
@@ -549,6 +549,17 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.relayrides</groupId>
+ <artifactId>pushy</artifactId>
+ <!-- The sha in the version is the git commit used in this build. Check out the pushy source, then this commit to build the library locally -->
+ <version>0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b</version>
+ </dependency>
+ <dependency>
+ <groupId>com.ganyo</groupId>
+ <artifactId>gcm-server</artifactId>
+ <version>1.0.2</version>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9d7901ae/stack/services/src/main/java/org/apache/usergrid/services/devices/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/devices/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/devices/notifications/NotificationsService.java
new file mode 100644
index 0000000..c766c72
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/devices/notifications/NotificationsService.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.devices.notifications;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NotificationsService extends
+ org.apache.usergrid.services.notifications.NotificationsService {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(NotificationsService.class);
+
+ public NotificationsService() {
+ logger.info("/devices/*/notifications");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9d7901ae/stack/services/src/main/java/org/apache/usergrid/services/groups/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/groups/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/groups/notifications/NotificationsService.java
new file mode 100644
index 0000000..9e965b5
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/groups/notifications/NotificationsService.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.groups.notifications;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NotificationsService extends
+ org.apache.usergrid.services.notifications.NotificationsService {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(NotificationsService.class);
+
+ public NotificationsService() {
+ logger.info("/groups/*/notifications");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9d7901ae/stack/services/src/main/java/org/apache/usergrid/services/groups/users/devices/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/groups/users/devices/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/groups/users/devices/notifications/NotificationsService.java
new file mode 100644
index 0000000..7587bd0
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/groups/users/devices/notifications/NotificationsService.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.groups.users.devices.notifications;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NotificationsService extends
+ org.apache.usergrid.services.notifications.NotificationsService {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(NotificationsService.class);
+
+ public NotificationsService() {
+ logger.info("/groups/*/users/*/devices/*/notifications");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9d7901ae/stack/services/src/main/java/org/apache/usergrid/services/notifications/ConnectionException.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ConnectionException.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ConnectionException.java
new file mode 100644
index 0000000..314834b
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ConnectionException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications;
+
+import org.springframework.core.NestedCheckedException;
+
+public class ConnectionException extends NestedCheckedException {
+ public ConnectionException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9d7901ae/stack/services/src/main/java/org/apache/usergrid/services/notifications/JobScheduler.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/JobScheduler.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/JobScheduler.java
new file mode 100644
index 0000000..c366a86
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/JobScheduler.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications;
+
+import org.apache.usergrid.batch.service.SchedulerService;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.Notification;
+import org.apache.usergrid.persistence.entities.JobData;
+import org.apache.usergrid.services.ServiceManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class JobScheduler{
+ public static final long SCHEDULER_GRACE_PERIOD = 250;
+ private final EntityManager em;
+
+ protected ServiceManager sm;
+ private final Logger LOG = LoggerFactory.getLogger(NotificationsService.class);
+
+ public JobScheduler(ServiceManager sm,EntityManager em){
+ this.sm=sm; this.em = em;
+ }
+ public void scheduleBatchJob(Notification notification, long delay) throws Exception {
+
+ JobData jobData = new JobData();
+ jobData.setProperty("applicationId", sm.getApplicationId());
+ jobData.setProperty("notificationId", notification.getUuid());
+ jobData.setProperty("deliver", notification.getDeliver());
+
+ long soonestPossible = System.currentTimeMillis() + SCHEDULER_GRACE_PERIOD + delay;
+
+ SchedulerService scheduler = getSchedulerService();
+ scheduler.createJob("notificationBatchJob", soonestPossible, jobData);
+
+ LOG.info("notification {} batch scheduled for delivery", notification.getUuid());
+ }
+ public boolean scheduleQueueJob(Notification notification) throws Exception {
+ return scheduleQueueJob(notification,false);
+ }
+
+ public boolean scheduleQueueJob(Notification notification, boolean forceSchedule) throws Exception {
+
+ boolean scheduleInFuture = notification.getDeliver() != null;
+ long scheduleAt = (notification.getDeliver() != null) ? notification.getDeliver() : 0;
+ long soonestPossible = System.currentTimeMillis() + SCHEDULER_GRACE_PERIOD;
+ if (scheduleAt < soonestPossible) {
+ scheduleAt = soonestPossible;
+ scheduleInFuture = false;
+ }
+
+ // update Notification properties
+ if (notification.getStarted() == null || notification.getStarted() == 0) {
+ notification.setStarted(System.currentTimeMillis());
+ Map<String, Object> properties = new HashMap<String, Object>(2);
+ properties.put("started", notification.getStarted());
+ properties.put("state", notification.getState());
+ em.updateProperties(notification, properties);
+ }
+ boolean scheduled = scheduleInFuture || forceSchedule;
+ if(scheduled) {
+ JobData jobData = new JobData();
+ jobData.setProperty("applicationId", sm.getApplicationId());
+ jobData.setProperty("notificationId", notification.getUuid());
+ jobData.setProperty("deliver", notification.getDeliver());
+ SchedulerService scheduler = getSchedulerService();
+ scheduler.createJob("queueJob", scheduleAt, jobData);
+ }
+ LOG.info("notification {} scheduled for queuing", notification.getUuid());
+ return scheduled;
+ }
+ private SchedulerService getSchedulerService() {
+ return sm.getSchedulerService();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9d7901ae/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationBatchJob.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationBatchJob.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationBatchJob.java
new file mode 100644
index 0000000..81006ce
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationBatchJob.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications;
+
+
+import java.util.UUID;
+
+import javax.annotation.PostConstruct;
+
+import org.apache.usergrid.persistence.Notification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import org.apache.usergrid.batch.Job;
+import org.apache.usergrid.batch.JobExecution;
+import org.apache.usergrid.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.entities.JobData;
+import org.apache.usergrid.services.ServiceManager;
+import org.apache.usergrid.services.ServiceManagerFactory;
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+
+
+@Component( "notificationBatchJob" )
+public class NotificationBatchJob implements Job {
+
+ private static final Logger logger = LoggerFactory.getLogger( NotificationBatchJob.class );
+ private Meter postSendMeter;
+ private Timer timerMetric;
+ private Meter requestMeter;
+ private Histogram cycleMetric;
+
+ @Autowired
+ private MetricsFactory metricsService;
+
+ @Autowired
+ private ServiceManagerFactory smf;
+
+ @Autowired
+ private EntityManagerFactory emf;
+
+
+ public NotificationBatchJob() {
+
+ }
+
+
+ @PostConstruct
+ void init() {
+ postSendMeter = metricsService.getMeter( NotificationsService.class, "post-send" );
+ requestMeter = metricsService.getMeter( NotificationBatchJob.class, "requests" );
+ timerMetric = metricsService.getTimer( NotificationBatchJob.class, "execution" );
+ cycleMetric = metricsService.getHistogram( NotificationsService.class, "cycle" );
+ }
+
+
+ public void execute( JobExecution jobExecution ) throws Exception {
+
+ Timer.Context timer = timerMetric.time();
+ requestMeter.mark();
+ logger.info( "execute NotificationBatchJob {}", jobExecution );
+
+ JobData jobData = jobExecution.getJobData();
+ UUID applicationId = ( UUID ) jobData.getProperty( "applicationId" );
+ ServiceManager sm = smf.getServiceManager( applicationId );
+ NotificationsService notificationsService = ( NotificationsService ) sm.getService( "notifications" );
+
+ EntityManager em = emf.getEntityManager( applicationId );
+
+ try {
+ if ( em == null ) {
+ logger.info( "no EntityManager for applicationId {}", applicationId );
+ return;
+ }
+
+ UUID notificationId = ( UUID ) jobData.getProperty( "notificationId" );
+ Notification notification = em.get( notificationId, Notification.class );
+ if ( notification == null ) {
+ logger.info( "notificationId {} no longer exists", notificationId );
+ return;
+ }
+
+
+ try {
+ notificationsService.getQueueManager().processBatchAndReschedule( notification, jobExecution );
+ }
+ catch ( Exception e ) {
+ logger.error( "execute NotificationBatchJob failed", e );
+ em.setProperty( notification, "errorMessage", e.getMessage() );
+ throw e;
+ }
+ finally {
+ long diff = System.currentTimeMillis() - notification.getCreated();
+ cycleMetric.update( diff );
+ postSendMeter.mark();
+ }
+ }
+ finally {
+ timer.stop();
+ }
+
+ logger.info( "execute NotificationBatch completed normally" );
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9d7901ae/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationJob.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationJob.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationJob.java
new file mode 100644
index 0000000..f80ee1e
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationJob.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications;
+
+import java.util.UUID;
+
+import javax.annotation.PostConstruct;
+
+import org.apache.usergrid.persistence.Notification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import org.apache.usergrid.batch.JobExecution;
+import org.apache.usergrid.batch.job.OnlyOnceJob;
+import org.apache.usergrid.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.entities.JobData;
+import org.apache.usergrid.services.ServiceManager;
+import org.apache.usergrid.services.ServiceManagerFactory;
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+
+
+@Component( "notificationJob" )
+public class NotificationJob extends OnlyOnceJob {
+
+
+ private static final Logger logger = LoggerFactory.getLogger( NotificationJob.class );
+
+ @Autowired
+ private MetricsFactory metricsService;
+
+ @Autowired
+ private ServiceManagerFactory smf;
+
+ @Autowired
+ private EntityManagerFactory emf;
+ private Meter requests;
+ private Timer execution;
+ private Histogram end;
+
+
+ public NotificationJob() {
+
+ }
+
+
+ @PostConstruct
+ void init() {
+ requests = metricsService.getMeter( NotificationJob.class, "requests" );
+ execution = metricsService.getTimer( NotificationJob.class, "execution" );
+ end = metricsService.getHistogram( QueueJob.class, "end" );
+ }
+
+
+ @Override
+ public void doJob( JobExecution jobExecution ) throws Exception {
+
+ Timer.Context timer = execution.time();
+ requests.mark();
+
+ logger.info( "execute NotificationJob {}", jobExecution );
+
+ JobData jobData = jobExecution.getJobData();
+ UUID applicationId = ( UUID ) jobData.getProperty( "applicationId" );
+ ServiceManager sm = smf.getServiceManager( applicationId );
+ NotificationsService notificationsService = ( NotificationsService ) sm.getService( "notifications" );
+
+ EntityManager em = emf.getEntityManager( applicationId );
+ try {
+ if ( em == null ) {
+ logger.info( "no EntityManager for applicationId {}", applicationId );
+ return;
+ }
+ UUID notificationId = ( UUID ) jobData.getProperty( "notificationId" );
+ Notification notification = em.get( notificationId, Notification.class );
+ if ( notification == null ) {
+ logger.info( "notificationId {} no longer exists", notificationId );
+ return;
+ }
+
+ try {
+ notificationsService.getQueueManager().processBatchAndReschedule( notification, jobExecution );
+ }
+ catch ( Exception e ) {
+ logger.error( "execute NotificationJob failed", e );
+ em.setProperty( notification, "errorMessage", e.getMessage() );
+ throw e;
+ }
+ finally {
+ long diff = System.currentTimeMillis() - notification.getCreated();
+ end.update( diff );
+ }
+ }
+ finally {
+ timer.stop();
+ }
+
+ logger.info( "execute NotificationJob completed normally" );
+ }
+
+
+ @Override
+ protected long getDelay( JobExecution execution ) throws Exception {
+ return TaskManager.BATCH_DEATH_PERIOD;
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9d7901ae/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationServiceProxy.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationServiceProxy.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationServiceProxy.java
new file mode 100644
index 0000000..908482f
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationServiceProxy.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications;
+
+import org.apache.usergrid.persistence.Notification;
+import org.apache.usergrid.persistence.Notifier;
+
+import java.util.Set;
+
+/**
+ * Created by ApigeeCorporation on 8/6/14.
+ */
+public interface NotificationServiceProxy {
+
+ public void finishedBatch(Notification notification, long successes, long failures) throws Exception;
+
+ void asyncCheckForInactiveDevices(Set<Notifier> notifiers) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9d7901ae/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
new file mode 100644
index 0000000..95dfb8e
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
@@ -0,0 +1,654 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.google.common.cache.*;
+import org.apache.usergrid.batch.JobExecution;
+import org.apache.usergrid.metrics.MetricsFactory;
+import org.apache.usergrid.mq.Message;
+import org.apache.usergrid.mq.QueueManager;
+import org.apache.usergrid.mq.QueueQuery;
+import org.apache.usergrid.mq.QueueResults;
+import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.entities.*;
+import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.services.notifications.apns.APNsAdapter;
+import org.apache.usergrid.services.notifications.gcm.GCMAdapter;
+import org.apache.usergrid.utils.InflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Subscriber;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Created by ApigeeCorporation on 8/13/14.
+ */
+public class NotificationsQueueManager implements NotificationServiceProxy {
+ private static final String NOTIFICATION_CONCURRENT_BATCHES = "notification.concurrent.batches";
+ private static final long CONSECUTIVE_EMPTY_QUEUES = 10;
+
+ private static final Logger LOG = LoggerFactory.getLogger(NotificationsQueueManager.class);
+
+ //this is for tests, will not mark initial post complete, set to false for tests
+ public static boolean IS_TEST = false;
+ private final Meter sendMeter;
+ private final Histogram queueSize;
+ private final Counter outstandingQueue;
+ private static ExecutorService INACTIVE_DEVICE_CHECK_POOL = Executors.newFixedThreadPool(5);
+ public static final String NOTIFIER_ID_POSTFIX = ".notifier.id";
+ public static int BATCH_SIZE = 1000;
+ // timeout for message queue transaction
+ public static final long MESSAGE_TRANSACTION_TIMEOUT = TaskManager.MESSAGE_TRANSACTION_TIMEOUT;
+ // If this property is set Notifications are automatically expired in
+ // the isOkToSent() method after the specified number of milliseconds
+ public static final String PUSH_AUTO_EXPIRE_AFTER_PROPNAME = "usergrid.push-auto-expire-after";
+ private final EntityManager em;
+ private final QueueManager qm;
+ private final JobScheduler jobScheduler;
+ private final Properties props;
+ private final InflectionUtils utils;
+ private AtomicLong consecutiveEmptyQueues = new AtomicLong();
+ private Long pushAutoExpireAfter = null;
+ static final String MESSAGE_PROPERTY_DEVICE_UUID = "deviceUUID";
+
+ public final Map<String, ProviderAdapter> providerAdapters = new HashMap<String, ProviderAdapter>(3);
+ {
+ providerAdapters.put("apple", APNS_ADAPTER);
+ providerAdapters.put("google", new GCMAdapter());
+ providerAdapters.put("noop", TEST_ADAPTER);
+ };
+ // these 2 can be static, but GCM can't. future: would be nice to get gcm
+ // static as well...
+ public static ProviderAdapter APNS_ADAPTER = new APNsAdapter();
+ public static ProviderAdapter TEST_ADAPTER = new TestAdapter();
+
+ //cache to retrieve push manager, cached per notifier, so many notifications will get same push manager
+ private static LoadingCache<EntityManager, HashMap<Object,Notifier>> notifierCacheMap = CacheBuilder
+ .newBuilder()
+ .expireAfterWrite(5, TimeUnit.MINUTES)
+ .build(new CacheLoader<EntityManager, HashMap<Object, Notifier>>() {
+ @Override
+ public HashMap<Object, Notifier> load(EntityManager em) {
+ HashMap<Object, Notifier> notifierHashMap = new HashMap<Object, Notifier>();
+ Query query = new Query();
+ query.setCollection("notifiers");
+ query.setLimit(50);
+ PathQuery<Notifier> pathQuery = new PathQuery<Notifier>(em.getApplicationRef(), query);
+ Iterator<Notifier> notifierIterator = pathQuery.iterator(em);
+ while (notifierIterator.hasNext()) {
+ Notifier notifier = notifierIterator.next();
+ notifierHashMap.put(notifier.getName().toLowerCase(), notifier);
+ notifierHashMap.put(notifier.getUuid(), notifier);
+ notifierHashMap.put(notifier.getUuid().toString(), notifier);
+
+ }
+ return notifierHashMap;
+ }
+ });;
+
+ public NotificationsQueueManager(JobScheduler jobScheduler, EntityManager entityManager, Properties props, QueueManager queueManager, MetricsFactory metricsFactory){
+ this.em = entityManager;
+ this.qm = queueManager;
+ this.jobScheduler = jobScheduler;
+ this.props = props;
+ this.sendMeter = metricsFactory.getMeter(NotificationsService.class, "send");
+ this.queueSize = metricsFactory.getHistogram(NotificationsService.class, "queue_size");
+ this.outstandingQueue = metricsFactory.getCounter(NotificationsService.class,"current_queue");
+ utils = new InflectionUtils();
+ }
+
+ public boolean scheduleQueueJob(Notification notification) throws Exception{
+ return jobScheduler.scheduleQueueJob(notification);
+ }
+
+ public void queueNotification(final Notification notification, final JobExecution jobExecution) throws Exception {
+ if (notification.getCanceled() == Boolean.TRUE) {
+ LOG.info("notification " + notification.getUuid() + " canceled");
+ if (jobExecution != null) {
+ jobExecution.killed();
+ }
+ return;
+ }
+
+ long startTime = System.currentTimeMillis();
+ LOG.info("notification {} start queuing", notification.getUuid());
+ final PathQuery<Device> pathQuery = notification.getPathQuery(); //devices query
+ final AtomicInteger deviceCount = new AtomicInteger(); //count devices so you can make a judgement on batching
+ final AtomicInteger batchCount = new AtomicInteger(); //count devices so you can make a judgement on batching
+ final int numCurrentBatchesConfig = getNumConcurrentBatches();
+
+ final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<String>(); //build up list of issues
+
+ //get devices in querystring, and make sure you have access
+ if (pathQuery != null) {
+ final Iterator<Device> iterator = pathQuery.iterator(em);
+ //if there are more pages (defined by PAGE_SIZE) you probably want this to be async, also if this is already a job then don't reschedule
+ if (iterator instanceof ResultsIterator && ((ResultsIterator) iterator).hasPages() && jobExecution == null) {
+ jobScheduler.scheduleQueueJob(notification, true);
+ return;
+ }
+
+ Observable.create(new IteratorObservable<Entity>(iterator)).parallel(new Func1<Observable<Entity>, Observable<Entity>>() {
+ @Override
+ public Observable<Entity> call(Observable<Entity> deviceObservable) {
+ return deviceObservable.map( new Func1<Entity,Entity>() {
+ @Override
+ public Entity call(Entity entity) {
+ try {
+ List<EntityRef> devicesRef = getDevices(entity); // resolve group
+ boolean maySchedule = false;
+ for (EntityRef deviceRef : devicesRef) {
+ maySchedule |= deviceCount.incrementAndGet() % BATCH_SIZE == 0;
+ Message message = new Message();
+ message.setProperty(MESSAGE_PROPERTY_DEVICE_UUID, deviceRef.getUuid());
+ qm.postToQueue( getJobQueueName(notification), message);
+ if(notification.getQueued() == null){
+ // update queued time
+ notification.setQueued(System.currentTimeMillis());
+ em.update(notification);
+ }
+ }
+
+ //start working if you are on a large batch,
+ if (maySchedule && numCurrentBatchesConfig >= batchCount.incrementAndGet()) {
+ processBatchAndReschedule(notification, jobExecution);
+ }
+
+ if(devicesRef.size() <=0){
+ errorMessages.add("Could not find devices for entity: "+entity.getUuid());
+ }
+
+ } catch (Exception deviceLoopException) {
+ LOG.error("Failed to add devices", deviceLoopException);
+ errorMessages.add("Failed to add devices for entity: "+entity.getUuid() + " error:"+ deviceLoopException);
+ }
+ return entity;
+ }
+ });
+ }
+ }, Schedulers.io()).toBlocking().lastOrDefault(null);
+
+ }
+
+ batchCount.set(Math.min(numCurrentBatchesConfig, batchCount.get()));
+ // update queued time
+ Map<String, Object> properties = new HashMap<String, Object>(2);
+ properties.put("queued", notification.getQueued());
+ properties.put("state", notification.getState());
+
+ //do i have devices, and have i already started batching.
+ if (deviceCount.get()>0 ) {
+ if(batchCount.get() <= 0) {
+ processBatchAndReschedule(notification, jobExecution);
+ }
+ } else {
+ //if i'm in a test value will be false, do not mark finished for test orchestration, not ideal need real tests
+ if(!IS_TEST) {
+ finishedBatch(notification, 0, 0);
+ errorMessages.add("No devices for notification " + notification.getUuid());
+ }
+ }
+
+ if(!IS_TEST && errorMessages.size()>0){
+ properties.put("deliveryErrors", errorMessages.toArray());
+ if(notification.getErrorMessage()==null){
+ notification.setErrorMessage("There was a problem delivering all of your notifications. See deliveryErrors in properties");
+ }
+ }
+ notification.addProperties(properties);
+ em.update(notification);
+ long elapsed = notification.getQueued() - startTime;
+
+ if (LOG.isInfoEnabled()) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("notification ").append(notification.getUuid());
+ sb.append(" done queuing to ").append(deviceCount);
+ sb.append(" devices in ").append(elapsed).append(" ms");
+ LOG.info(sb.toString());
+ }
+ }
+ public void finishedBatch(Notification notification, long successes, long failures) throws Exception {
+ finishedBatch(notification,successes,failures,false);
+ }
+ public void finishedBatch(Notification notification, long successes, long failures, boolean overrideComplete) throws Exception {
+ if (LOG.isDebugEnabled()) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("finishedBatch ").append(notification.getUuid());
+ sb.append(" successes: ").append(successes);
+ sb.append(" failures: ").append(failures);
+ LOG.info(sb.toString());
+ }
+
+ notification = em.get(notification, Notification.class); // refresh data
+ notification.updateStatistics(successes, failures);
+ notification.setModified(System.currentTimeMillis());
+ Map<String, Object> properties = new HashMap<String, Object>(4);
+ properties.put("statistics", notification.getStatistics());
+ properties.put("modified", notification.getModified());
+
+ if (isNotificationDeliveryComplete(notification) || overrideComplete) {
+ notification.setFinished(notification.getModified());
+ properties.put("finished", notification.getModified());
+ properties.put("state", notification.getState());
+ long elapsed = notification.getFinished()
+ - notification.getStarted();
+ long sent = notification.getStatistics().get("sent");
+ long errors = notification.getStatistics().get("errors");
+
+ if (LOG.isInfoEnabled()) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("notification ").append(notification.getUuid());
+ sb.append(" done sending to ").append(sent + errors);
+ sb.append(" devices in ").append(elapsed).append(" ms");
+ LOG.info(sb.toString());
+ }
+
+ } else {
+ LOG.info("notification finished batch: {}",
+ notification.getUuid());
+ }
+ em.updateProperties(notification, properties);
+
+ Set<Notifier> notifiers = new HashSet<Notifier>(getNotifierMap().values()); // remove dups
+ asyncCheckForInactiveDevices(notifiers);
+ }
+
+ private HashMap<Object,Notifier> getNotifierMap(){
+ try{
+ return notifierCacheMap.get(em);
+ }catch (ExecutionException ee){
+ LOG.error("failed to get from cache",ee);
+ return new HashMap<Object, Notifier>();
+ }
+ }
+ /*
+ * returns partial list of Device EntityRefs (up to BATCH_SIZE) - empty when
+ * done w/ delivery
+ */
+ private QueueResults getDeliveryBatch(EntityRef notification, int batchSize) throws Exception {
+
+ QueueQuery qq = new QueueQuery();
+ qq.setLimit(batchSize);
+ qq.setTimeout(MESSAGE_TRANSACTION_TIMEOUT);
+ QueueResults results = qm.getFromQueue(getJobQueueName(notification), qq);
+ LOG.debug("got batch of {} devices for notification {}",
+ results.size(), notification.getUuid());
+ return results;
+ }
+ /**
+ * executes a Notification batch and schedules the next one - called by
+ * NotificationBatchJob
+ */
+ public void processBatchAndReschedule(Notification notification, JobExecution jobExecution) throws Exception {
+ if (!isOkToSend(notification,jobExecution)) {
+ LOG.info("notification " + notification.getUuid() + " canceled");
+ if (jobExecution != null) {
+ jobExecution.killed();
+ }
+ return;
+ }
+
+ LOG.debug("processing batch {}", notification.getUuid());
+
+ QueueResults queueResults = getDeliveryBatch(notification, jobExecution == null ? BATCH_SIZE/2 : BATCH_SIZE ); //if first run then throttle the batch down by factor of 2 if its a job then try to grab alot of notifications and run them all
+
+ long reschedule_delay = jobScheduler.SCHEDULER_GRACE_PERIOD;
+ final TaskManager taskManager = new TaskManager( em, this, qm, notification, queueResults);
+ if (queueResults.size() > 0) {
+ consecutiveEmptyQueues.set(0);
+ sendBatchToProviders(taskManager, notification, queueResults.getMessages());
+ }else{
+ consecutiveEmptyQueues.incrementAndGet();
+ }
+
+ if (!isNotificationDeliveryComplete(notification) && consecutiveEmptyQueues.get() <= CONSECUTIVE_EMPTY_QUEUES) {
+ if(jobExecution==null) {
+ jobScheduler.scheduleBatchJob(notification, reschedule_delay);
+ }else{
+ processBatchAndReschedule(notification, jobExecution);
+ }
+ } else {
+ consecutiveEmptyQueues.set(0);
+ finishedBatch(notification, 0, 0,true);
+ }
+
+ LOG.debug("finished processing batch");
+ }
+
+ /**
+ * send batches of notifications to provider
+ * @param taskManager
+ * @param notification
+ * @param queueResults
+ * @throws Exception
+ */
+ private void sendBatchToProviders(final TaskManager taskManager, final Notification notification, List<Message> queueResults) throws Exception {
+
+ LOG.info("sending batch of {} devices for Notification: {}", queueResults.size(), notification.getUuid());
+ final Map<Object, Notifier> notifierMap = getNotifierMap();
+ queueSize.update(queueResults.size());
+ final Map<String, Object> payloads = notification.getPayloads();
+ final Map<String, Object> translatedPayloads = translatePayloads(payloads, notifierMap);
+ try {
+ Observable
+ .from(queueResults)
+ .parallel(new Func1<Observable<Message>, Observable<Message>>() {
+ @Override
+ public Observable<Message> call(Observable<Message> messageObservable) {
+ return messageObservable.map(new Func1<Message, Message>() {
+ @Override
+ public Message call(Message message) {
+ UUID deviceUUID = (UUID) message.getObjectProperty(MESSAGE_PROPERTY_DEVICE_UUID);
+ boolean foundNotifier = false;
+ for (Map.Entry<String, Object> entry : payloads.entrySet()) {
+ try {
+ String payloadKey = entry.getKey();
+ Notifier notifier = notifierMap.get(payloadKey.toLowerCase());
+ EntityRef deviceRef = new SimpleEntityRef(Device.ENTITY_TYPE, deviceUUID);
+
+ String providerId;
+ try {
+ providerId = getProviderId(deviceRef, notifier);
+ if (providerId == null) {
+ LOG.debug("Provider not found.{} {}", deviceRef,notifier.getName());
+ continue;
+ }
+ } catch (Exception providerException) {
+ LOG.error("Exception getting provider.", providerException);
+ continue;
+ }
+ Object payload = translatedPayloads.get(payloadKey);
+
+ Receipt receipt = new Receipt(notification.getUuid(), providerId, payload,deviceUUID);
+ TaskTracker tracker = new TaskTracker(notifier, taskManager, receipt, deviceUUID);
+ if (payload == null) {
+ LOG.debug("selected device {} for notification {} doesn't have a valid payload. skipping.", deviceUUID, notification.getUuid());
+ try {
+ tracker.failed(0, "failed to match payload to " + payloadKey + " notifier");
+ } catch (Exception e) {
+ LOG.debug("failed to mark device failed" + e);
+ }
+ continue;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("sending notification ").append(notification.getUuid());
+ sb.append(" to device ").append(deviceUUID);
+ LOG.debug(sb.toString());
+ }
+
+ try {
+ ProviderAdapter providerAdapter = providerAdapters.get(notifier.getProvider());
+ providerAdapter.sendNotification(providerId, notifier, payload, notification, tracker);
+
+ } catch (Exception e) {
+ try {
+ tracker.failed(0, e.getMessage());
+ } catch (Exception trackerException) {
+ LOG.error("tracker failed", trackerException);
+ }
+ }
+ foundNotifier = true;
+ } finally {
+ sendMeter.mark();
+ }
+ }
+ if (!foundNotifier) {
+ try {
+ taskManager.skip(deviceUUID);
+ } catch (Exception trackerException) {
+ LOG.error("failed on skip", trackerException);
+ }
+ }
+ return message;
+ }
+ });
+ }
+ }, Schedulers.io())
+ .toBlocking()
+ .lastOrDefault(null);
+
+ //for gcm this will actually send notification
+ for (ProviderAdapter providerAdapter : providerAdapters.values()) {
+ try {
+ providerAdapter.doneSendingNotifications();
+ } catch (Exception e) {
+ LOG.error("providerAdapter.doneSendingNotifications: ", e);
+ }
+ }
+
+ } finally {
+ outstandingQueue.dec();
+ LOG.info("finished sending batch for notification {}", notification.getUuid());
+ }
+
+ }
+
+ private Map<String, Object> translatePayloads(Map<String, Object> payloads, Map<Object, Notifier> notifierMap) throws Exception {
+ Map<String, Object> translatedPayloads = new HashMap<String, Object>(
+ payloads.size());
+ for (Map.Entry<String, Object> entry : payloads.entrySet()) {
+ String payloadKey = entry.getKey();
+ Object payloadValue = entry.getValue();
+ Notifier notifier = notifierMap.get(payloadKey);
+ if (notifier != null) {
+ ProviderAdapter providerAdapter = providerAdapters.get(notifier
+ .getProvider());
+ if (providerAdapter != null) {
+ Object translatedPayload = payloadValue != null ? providerAdapter
+ .translatePayload(payloadValue) : null;
+ if (translatedPayload != null) {
+ translatedPayloads.put(payloadKey, translatedPayload);
+ }
+ }
+ }
+ }
+ return translatedPayloads;
+ }
+
+ private int getNumConcurrentBatches() {
+ return Integer.parseInt(props.getProperty(NOTIFICATION_CONCURRENT_BATCHES, "1"));
+ }
+
+ private static final class IteratorObservable<T> implements Observable.OnSubscribe<T> {
+ private final Iterator<T> input;
+ private IteratorObservable( final Iterator input ) {this.input = input;}
+
+ @Override
+ public void call( final Subscriber<? super T> subscriber ) {
+
+ /**
+ * You would replace this code with your file reading. Instead of emitting from an iterator,
+ * you would create a bean object that represents the entity, and then emit it
+ */
+
+ try {
+ while ( !subscriber.isUnsubscribed() && input.hasNext() ) {
+ //send our input to the next
+ subscriber.onNext( (T) input.next() );
+ }
+
+ //tell the subscriber we don't have any more data
+ subscriber.onCompleted();
+ }
+ catch ( Throwable t ) {
+ LOG.error("failed on subscriber",t);
+ subscriber.onError( t );
+ }
+ }
+ }
+
+ public void asyncCheckForInactiveDevices(Set<Notifier> notifiers) throws Exception {
+ for (final Notifier notifier : notifiers) {
+ INACTIVE_DEVICE_CHECK_POOL.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ checkForInactiveDevices(notifier);
+ } catch (Exception e) {
+ LOG.error("checkForInactiveDevices", e); // not
+ // essential so
+ // don't fail,
+ // but log
+ }
+ }
+ });
+ }
+ }
+
+ /** gets the list of inactive devices from the Provider and updates them */
+ private void checkForInactiveDevices(Notifier notifier) throws Exception {
+ ProviderAdapter providerAdapter = providerAdapters.get(notifier
+ .getProvider());
+ if (providerAdapter != null) {
+ LOG.debug("checking notifier {} for inactive devices", notifier);
+ Map<String, Date> inactiveDeviceMap = providerAdapter
+ .getInactiveDevices(notifier, em);
+
+ if (inactiveDeviceMap != null && inactiveDeviceMap.size() > 0) {
+ LOG.debug("processing {} inactive devices",
+ inactiveDeviceMap.size());
+ Map<String, Object> clearPushtokenMap = new HashMap<String, Object>(
+ 2);
+ clearPushtokenMap.put(notifier.getName() + NOTIFIER_ID_POSTFIX,
+ "");
+ clearPushtokenMap.put(notifier.getUuid() + NOTIFIER_ID_POSTFIX,
+ "");
+
+ // todo: this could be done in a single query
+ for (Map.Entry<String, Date> entry : inactiveDeviceMap
+ .entrySet()) {
+ // name
+ Query query = new Query();
+ query.addEqualityFilter(notifier.getName()
+ + NOTIFIER_ID_POSTFIX, entry.getKey());
+ Results results = em.searchCollection(em.getApplication(),
+ "devices", query);
+ for (Entity e : results.getEntities()) {
+ em.updateProperties(e, clearPushtokenMap);
+ }
+ // uuid
+ query = new Query();
+ query.addEqualityFilter(notifier.getUuid()
+ + NOTIFIER_ID_POSTFIX, entry.getKey());
+ results = em.searchCollection(em.getApplication(),
+ "devices", query);
+ for (Entity e : results.getEntities()) {
+ em.updateProperties(e, clearPushtokenMap);
+ }
+ }
+ }
+ LOG.debug("finished checking notifier {} for inactive devices",
+ notifier);
+ }
+ }
+
+ private boolean isOkToSend(Notification notification,
+ JobExecution jobExecution) {
+ String autoExpireAfterString = props.getProperty(PUSH_AUTO_EXPIRE_AFTER_PROPNAME);
+
+ if (autoExpireAfterString != null) {
+ pushAutoExpireAfter = Long.parseLong(autoExpireAfterString);
+ }
+ if (pushAutoExpireAfter != null) {
+ if (notification.getCreated() < System.currentTimeMillis() - pushAutoExpireAfter) {
+ notification.setExpire(System.currentTimeMillis() - 1L);
+ }
+ }
+
+ if (notification.getFinished() != null) {
+ LOG.info("notification {} already processed. not sending.",
+ notification.getUuid());
+ return false;
+ }
+ if (notification.getCanceled() == Boolean.TRUE) {
+ LOG.info("notification {} canceled. not sending.",
+ notification.getUuid());
+ return false;
+ }
+ if (notification.isExpired()) {
+ LOG.info("notification {} expired. not sending.",
+ notification.getUuid());
+ return false;
+ }
+ if (jobExecution != null
+ && notification.getDeliver() != null
+ && !notification.getDeliver().equals(jobExecution.getJobData().getProperty("deliver"))
+ ) {
+ LOG.info("notification {} was rescheduled. not sending.",
+ notification.getUuid());
+ return false;
+ }
+
+ return true;
+ }
+
+ private List<EntityRef> getDevices(EntityRef ref) throws Exception {
+ List<EntityRef> devices = Collections.EMPTY_LIST;
+ if ("device".equals(ref.getType())) {
+ devices = Collections.singletonList(ref);
+ } else if ("user".equals(ref.getType())) {
+ devices = em.getCollection(ref, "devices", null, Query.MAX_LIMIT, Query.Level.REFS, false).getRefs();
+ } else if ("group".equals(ref.getType())) {
+ devices = new ArrayList<EntityRef>();
+ for (EntityRef r : em.getCollection(ref, "users", null, Query.MAX_LIMIT, Query.Level.REFS, false).getRefs()) {
+ devices.addAll(getDevices(r));
+ }
+ }
+ return devices;
+ }
+
+
+ private String getProviderId(EntityRef device, Notifier notifier) throws Exception {
+ try {
+ Object value = em.getProperty(device, notifier.getName() + NOTIFIER_ID_POSTFIX);
+ if (value == null) {
+ value = em.getProperty(device, notifier.getUuid() + NOTIFIER_ID_POSTFIX);
+ }
+ return value != null ? value.toString() : null;
+ } catch (Exception e) {
+ LOG.error("Errer getting provider ID, proceding with rest of batch", e);
+ return null;
+ }
+ }
+
+
+ private boolean isNotificationDeliveryComplete(Notification notification) throws Exception {
+ if(notification.getQueued() == null){
+ return false;
+ }
+ String queuePath = getJobQueueName(notification);
+ return !qm.hasPendingReads(queuePath, null)
+ && !qm.hasOutstandingTransactions(queuePath, null)
+ && !qm.hasMessagesInQueue(queuePath, null);
+ }
+
+ private String getJobQueueName(EntityRef entityRef) {
+ return utils.pluralize(entityRef.getType()) + "/" + entityRef.getUuid();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9d7901ae/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
new file mode 100644
index 0000000..7dd6503
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications;
+
+import java.util.*;
+
+import com.codahale.metrics.*;
+import com.codahale.metrics.Timer;
+import org.apache.usergrid.metrics.MetricsFactory;
+import org.apache.usergrid.mq.Message;
+import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.index.query.Identifier;
+import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.services.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.entities.Device;
+import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
+import org.apache.usergrid.services.exceptions.ForbiddenServiceOperationException;
+import static org.apache.usergrid.utils.InflectionUtils.pluralize;
+
+import org.apache.usergrid.services.notifications.apns.APNsAdapter;
+import org.apache.usergrid.services.notifications.gcm.GCMAdapter;
+import rx.Observable;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
+public class NotificationsService extends AbstractCollectionService {
+
+
+ private MetricsFactory metricsService;
+ private Meter sendMeter;
+ private Meter postMeter;
+ private Timer postTimer;
+ private Histogram queueSize;
+ private Counter outstandingQueue;
+
+ private static final int PAGE = 100;
+ private static final Logger LOG = LoggerFactory.getLogger(NotificationsService.class);
+
+
+ public static final String NOTIFIER_ID_POSTFIX = ".notifier.id";
+
+ static final String MESSAGE_PROPERTY_DEVICE_UUID = "deviceUUID";
+
+ static {
+ Message.MESSAGE_PROPERTIES.put(
+ MESSAGE_PROPERTY_DEVICE_UUID, UUID.class);
+ }
+
+ // these 2 can be static, but GCM can't. future: would be nice to get gcm
+ // static as well...
+ public static ProviderAdapter APNS_ADAPTER = new APNsAdapter();
+ public static ProviderAdapter TEST_ADAPTER = new TestAdapter();
+
+ public final Map<String, ProviderAdapter> providerAdapters =
+ new HashMap<String, ProviderAdapter>(3);
+ {
+ providerAdapters.put("apple", APNS_ADAPTER);
+ providerAdapters.put("google", new GCMAdapter());
+ providerAdapters.put("noop", TEST_ADAPTER);
+ }
+
+ private NotificationsQueueManager notificationQueueManager;
+ private long gracePeriod;
+
+ public NotificationsService() {
+ LOG.info("/notifications");
+ }
+
+ @Override
+ public void init( ServiceInfo info ) {
+ super.init(info);
+ metricsService = getApplicationContext().getBean(MetricsFactory.class);
+ sendMeter = metricsService.getMeter(NotificationsService.class, "send");
+ postMeter = metricsService.getMeter(NotificationsService.class, "requests");
+ postTimer = metricsService.getTimer(this.getClass(), "execution_rest");
+ queueSize = metricsService.getHistogram(NotificationsService.class, "queue_size");
+ outstandingQueue = metricsService.getCounter(NotificationsService.class,"current_queue");
+ JobScheduler jobScheduler = new JobScheduler(sm,em);
+ notificationQueueManager = new NotificationsQueueManager(jobScheduler,em,sm.getProperties(),sm.getQueueManager(),metricsService);
+ gracePeriod = jobScheduler.SCHEDULER_GRACE_PERIOD;
+ }
+
+ public NotificationsQueueManager getQueueManager(){
+ return notificationQueueManager;
+ }
+
+ @Override
+ public ServiceContext getContext(ServiceAction action,
+ ServiceRequest request, ServiceResults previousResults,
+ ServicePayload payload) throws Exception {
+
+ ServiceContext context = super.getContext(action, request, previousResults, payload);
+
+ if (action == ServiceAction.POST) {
+ context.setQuery(null); // we don't use this, and it must be null to
+ // force the correct execution path
+ }
+ return context;
+ }
+
+ @Override
+ public ServiceResults postCollection(ServiceContext context) throws Exception {
+ Timer.Context timer = postTimer.time();
+ postMeter.mark();
+ try {
+ validate(null, context.getPayload());
+ PathQuery<Device> pathQuery = getPathQuery(context.getRequest().getOriginalParameters());
+ context.getProperties().put("state", Notification.State.CREATED);
+ context.getProperties().put("pathQuery", pathQuery);
+ context.setOwner(sm.getApplication());
+ ServiceResults results = super.postCollection(context);
+ Notification notification = (Notification) results.getEntity();
+ if(!notificationQueueManager.scheduleQueueJob(notification)){
+ notificationQueueManager.queueNotification(notification, null);
+ }
+ outstandingQueue.inc();
+ // future: somehow return 202?
+ return results;
+ }finally {
+ timer.stop();
+ }
+ }
+
+ private PathQuery<Device> getPathQuery(List<ServiceParameter> parameters)
+ throws Exception {
+
+ PathQuery pathQuery = null;
+ for (int i = 0; i < parameters.size() - 1; i += 2) {
+ String collection = pluralize(parameters.get(i).getName());
+ ServiceParameter sp = parameters.get(i + 1);
+ org.apache.usergrid.persistence.index.query.Query query = sp.getQuery();
+ if (query == null) {
+ query = new Query();
+ query.addIdentifier(sp.getIdentifier());
+ }
+ query.setLimit(PAGE);
+ query.setCollection(collection);
+
+ if (pathQuery == null) {
+ pathQuery = new PathQuery(em.getApplicationRef(), query);
+ } else {
+ pathQuery = pathQuery.chain(query);
+ }
+ }
+
+ return pathQuery;
+ }
+
+ @Override
+ public ServiceResults postItemsByQuery(ServiceContext context, Query query) throws Exception {
+ return postCollection(context);
+ }
+
+ @Override
+ public Entity updateEntity(ServiceRequest request, EntityRef ref,
+ ServicePayload payload) throws Exception {
+
+ validate(ref, payload);
+
+ Notification notification = em.get(ref, Notification.class);
+
+ if ("restart".equals(payload.getProperty("restart"))) { // for emergency
+ // use only
+ payload.getProperties().clear();
+ payload.setProperty("restart", "");
+ payload.setProperty("errorMessage", "");
+ payload.setProperty("deliver", System.currentTimeMillis() + gracePeriod);
+
+ // once finished, immutable
+ } else if (notification.getFinished() != null) {
+ throw new ForbiddenServiceOperationException(request,
+ "Notification immutable once sent.");
+
+ // once started, only cancel is allowed
+ } else if (notification.getStarted() != null) {
+ if (payload.getProperty("canceled") != Boolean.TRUE) {
+ throw new ForbiddenServiceOperationException(request,
+ "Notification has started. You may only set canceled.");
+ }
+ payload.getProperties().clear();
+ payload.setProperty("canceled", Boolean.TRUE);
+ }
+
+ Entity response = super.updateEntity(request, ref, payload);
+
+ Long deliver = (Long) payload.getProperty("deliver");
+ if (deliver != null) {
+ if (!deliver.equals(notification.getDeliver())) {
+ notificationQueueManager.processBatchAndReschedule((Notification) response, null);
+ }
+ }
+ return response;
+ }
+
+ @Override
+ protected boolean isDeleteAllowed(ServiceContext context, Entity entity) {
+ Notification notification = (Notification) entity;
+ return (notification.getStarted() == null);
+ }
+
+ public Set<String> getProviders() {
+ return providerAdapters.keySet();
+ }
+
+ // validate payloads
+ private void validate(EntityRef ref, ServicePayload servicePayload)
+ throws Exception {
+ Object obj_payloads = servicePayload.getProperty("payloads");
+ if (obj_payloads == null && ref == null) {
+ throw new RequiredPropertyNotFoundException("notification",
+ "payloads");
+ }
+ if (obj_payloads != null) {
+ if (!(obj_payloads instanceof Map)) {
+ throw new IllegalArgumentException(
+ "payloads must be a JSON Map");
+ }
+ final Map<String, Object> payloads = (Map<String, Object>) obj_payloads;
+ final Map<Object, Notifier> notifierMap = getNotifierMap(payloads);
+ Observable t = Observable.from(payloads.entrySet()).subscribeOn(Schedulers.io()).map(new Func1<Map.Entry<String, Object>, Object>() {
+ @Override
+ public Object call(Map.Entry<String, Object> entry) {
+ String notifierId = entry.getKey();
+ Notifier notifier = notifierMap.get(notifierId);
+ if (notifier == null) {
+ throw new IllegalArgumentException("notifier \""
+ + notifierId + "\" not found");
+ }
+ ProviderAdapter providerAdapter = providerAdapters.get(notifier
+ .getProvider());
+ Object payload = entry.getValue();
+ try {
+ return providerAdapter.translatePayload(payload); // validate
+ // specifically to
+ // provider
+ } catch (Exception e) {
+ return e;
+ }
+ }
+ });
+ Object e = t.toBlocking().lastOrDefault(null);
+ if(e instanceof Throwable){
+ throw new Exception((Exception)e);
+ }
+
+ }
+ }
+
+
+
+ public String getJobQueueName(EntityRef notification) {
+ return "notifications/" + notification.getUuid();
+ }
+
+
+
+ /* adds a single device for delivery - used only by tests */
+ public void addDevice(EntityRef notification, EntityRef device) throws Exception {
+
+ String jobQueueName = getJobQueueName(notification);
+ Message message = new Message();
+ message.setObjectProperty(MESSAGE_PROPERTY_DEVICE_UUID,
+ device.getUuid());
+ sm.getQueueManager().postToQueue(jobQueueName, message);
+ }
+
+ public Notification getSourceNotification(EntityRef receipt)
+ throws Exception {
+ Receipt r = em.get(receipt.getUuid(), Receipt.class);
+ return em.get(r.getNotificationUUID(), Notification.class);
+ }
+
+
+ /* create a map of Notifier UUIDs and/or names to Notifiers */
+ protected Map<Object, Notifier> getNotifierMap(Map payloads)
+ throws Exception {
+ Map<Object, Notifier> notifiers = new HashMap<Object, Notifier>(
+ payloads.size() * 2);
+ for (Object id : payloads.keySet()) {
+ Identifier identifier = Identifier.from(id);
+ Notifier notifier;
+ if (identifier.isUUID()) {
+ notifier = em.get(identifier.getUUID(), Notifier.class);
+ } else {
+ EntityRef ref = em.getAlias("notifier", identifier.getName());
+ notifier = em.get(ref, Notifier.class);
+ }
+ if (notifier != null) {
+ notifiers.put(notifier.getUuid(), notifier);
+ notifiers.put(notifier.getUuid().toString(), notifier);
+ if (notifier.getName() != null) {
+ notifiers.put(notifier.getName(), notifier);
+ }
+ }
+ }
+ return notifiers;
+ }
+
+
+ /**
+ * attempts to test the providers connections - throws an Exception on
+ * failure
+ */
+ public void testConnection(Notifier notifier) throws Exception {
+ ProviderAdapter providerAdapter = providerAdapters.get(notifier.getProvider());
+ if (providerAdapter != null) {
+ providerAdapter.testConnection(notifier);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9d7901ae/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapter.java
new file mode 100644
index 0000000..5489c2d
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications;
+
+import java.util.Date;
+import java.util.Map;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.Notification;
+import org.apache.usergrid.persistence.Notifier;
+import org.apache.usergrid.services.ServicePayload;
+
+/**
+ * To send a Notification, the following methods should be called in this order:
+ * 1) testConnection() for each notifier to be used 2) translatePayload() for
+ * each payload to be sent 3) sendNotification() for each target 4)
+ * doneSendingNotifications() when all #2 have been called If there is an
+ * Exception in #1 or #2, you should consider the entire Notification to be
+ * invalid. Also, getInactiveDevices() should be called occasionally and may be
+ * called at any time.
+ */
+public interface ProviderAdapter {
+
+ public void testConnection(Notifier notifier) throws ConnectionException;
+
+ public void sendNotification(String providerId, Notifier notifier,
+ Object payload, Notification notification, TaskTracker tracker)
+ throws Exception;
+
+ /**
+ * must be called when done calling sendNotification() so any open batches
+ * may be closed out
+ */
+ public void doneSendingNotifications() throws Exception;
+
+ public Map<String, Date> getInactiveDevices(Notifier notifier,
+ EntityManager em) throws Exception;
+
+ public Object translatePayload(Object payload) throws Exception;
+
+ public void validateCreateNotifier(ServicePayload payload) throws Exception;
+}