You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/10/29 20:21:42 UTC
[01/22] git commit: pushy 4_0 upgrade
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o-events 16ff75a98 -> 9153eb9d8
pushy 4_0 upgrade
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d5d859b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d5d859b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d5d859b7
Branch: refs/heads/two-dot-o-events
Commit: d5d859b707df3e293702bac4c3240c0ca3bf895e
Parents: fc3c42c
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Oct 24 13:25:37 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Oct 24 13:25:37 2014 -0600
----------------------------------------------------------------------
.../_maven.repositories | 8 -
...14c97c6e3ef40c88590e1b196d3ec55b-javadoc.jar | Bin 160201 -> 0 bytes
...14c97c6e3ef40c88590e1b196d3ec55b-sources.jar | Bin 59744 -> 0 bytes
...63dec68314c97c6e3ef40c88590e1b196d3ec55b.jar | Bin 67751 -> 0 bytes
...7c6e3ef40c88590e1b196d3ec55b.jar.lastUpdated | 13 --
...63dec68314c97c6e3ef40c88590e1b196d3ec55b.pom | 166 -------------------
...7c6e3ef40c88590e1b196d3ec55b.pom.lastUpdated | 13 --
.../pushy-0.4-apigee.pom | 166 -------------------
stack/pom.xml | 2 +-
stack/services/pom.xml | 2 +-
.../notifications/apns/APNsAdapter.java | 32 ++--
11 files changed, 18 insertions(+), 384 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5d859b7/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/_maven.repositories
----------------------------------------------------------------------
diff --git a/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/_maven.repositories b/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/_maven.repositories
deleted file mode 100644
index 4b4ac07..0000000
--- a/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/_maven.repositories
+++ /dev/null
@@ -1,8 +0,0 @@
-#NOTE: This is an internal implementation file, its format can be changed without prior notice.
-#Wed Aug 20 23:57:49 MDT 2014
-pushy-0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b.jar>local-dependencies=
-pushy-0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b.pom>=
-pushy-0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b-sources.jar>=
-pushy-0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b.jar>=
-pushy-0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b.pom>local-dependencies=
-pushy-0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b-javadoc.jar>=
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5d859b7/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/pushy-0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b-javadoc.jar
----------------------------------------------------------------------
diff --git a/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/pushy-0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b-javadoc.jar b/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/pushy-0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b-javadoc.jar
deleted file mode 100644
index 969757f..0000000
Binary files a/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/pushy-0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b-javadoc.jar and /dev/null differ
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5d859b7/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/pushy-0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b-sources.jar
----------------------------------------------------------------------
diff --git a/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/pushy-0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b-sources.jar b/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/pushy-0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b-sources.jar
deleted file mode 100644
index 9e167fb..0000000
Binary files a/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/pushy-0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b-sources.jar and /dev/null differ
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5d859b7/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/pushy-0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b.jar
----------------------------------------------------------------------
diff --git a/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/pushy-0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b.jar b/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/pushy-0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b.jar
deleted file mode 100644
index 3246a7a..0000000
Binary files a/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/pushy-0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b.jar and /dev/null differ
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5d859b7/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/pushy-0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b.jar.lastUpdated
----------------------------------------------------------------------
diff --git a/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/pushy-0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b.jar.lastUpdated b/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/pushy-0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b.jar.lastUpdated
deleted file mode 100644
index 592c733..0000000
--- a/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/pushy-0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b.jar.lastUpdated
+++ /dev/null
@@ -1,13 +0,0 @@
-#NOTE: This is an internal implementation file, its format can be changed without prior notice.
-#Tue Jul 29 08:11:52 MDT 2014
-http\://oss.sonatype.org/content/repositories/snapshots/.lastUpdated=1406643112428
-http\://download.java.net/maven/2/.lastUpdated=1406643110976
-http\://oss.sonatype.org/content/repositories/snapshots/.error=
-http\://download.java.net/maven/2/.error=
-http\://maven.springframework.org/milestone/.lastUpdated=1406643112690
-http\://repository.codehaus.org/.lastUpdated=1406643111574
-https\://repository-hector-dev.forge.cloudbees.com/snapshot/.error=
-http\://repository.codehaus.org/.error=
-http\://maven.springframework.org/milestone/.error=
-file\:///Users/ApigeeCorporation/Dev/apigee/apigee-URAP/m2/repository/.lastUpdated=1406643112703
-https\://repository-hector-dev.forge.cloudbees.com/snapshot/.lastUpdated=1406643111364
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5d859b7/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/pushy-0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b.pom
----------------------------------------------------------------------
diff --git a/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/pushy-0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b.pom b/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/pushy-0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b.pom
deleted file mode 100644
index 78f6a8e..0000000
--- a/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/pushy-0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b.pom
+++ /dev/null
@@ -1,166 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.relayrides</groupId>
- <artifactId>pushy</artifactId>
- <version>0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b</version>
- <name>Pushy</name>
- <description>A Java library for sending push notifications</description>
- <parent>
- <groupId>org.sonatype.oss</groupId>
- <artifactId>oss-parent</artifactId>
- <version>7</version>
- </parent>
- <licenses>
- <license>
- <name>The MIT License (MIT)</name>
- <url>http://opensource.org/licenses/MIT</url>
- <distribution>repo</distribution>
- </license>
- </licenses>
- <dependencies>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>1.7.6</version>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.11</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- <version>4.0.23.Final</version>
- </dependency>
- <dependency>
- <groupId>com.googlecode.json-simple</groupId>
- <artifactId>json-simple</artifactId>
- <version>1.1.1</version>
- <exclusions>
- <exclusion>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-simple</artifactId>
- <version>1.7.6</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.1</version>
- <configuration>
- <source>1.6</source>
- <target>1.6</target>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>2.16</version>
- <configuration>
- <argLine>-Dorg.slf4j.simpleLogger.defaultLogLevel=warn</argLine>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <version>2.9.1</version>
- <configuration>
- <overview>${basedir}/src/main/java/overview.html</overview>
- <show>public</show>
- <links>
- <link>http://netty.io/4.0/api/</link>
- </links>
- </configuration>
- <executions>
- <execution>
- <id>attach-javadocs</id>
- <goals>
- <goal>jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-source-plugin</artifactId>
- <version>2.2.1</version>
- <executions>
- <execution>
- <id>attach-sources</id>
- <goals>
- <goal>jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- <profiles>
- <profile>
- <id>release-sign-artifacts</id>
- <activation>
- <property>
- <name>performRelease</name>
- <value>true</value>
- </property>
- </activation>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-gpg-plugin</artifactId>
- <version>1.1</version>
- <executions>
- <execution>
- <id>sign-artifacts</id>
- <phase>verify</phase>
- <goals>
- <goal>sign</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </profile>
- </profiles>
- <organization>
- <name>RelayRides</name>
- <url>https://relayrides.com/</url>
- </organization>
- <developers>
- <developer>
- <id>jon</id>
- <name>Jon Chambers</name>
- <email>jon@relayrides.com</email>
- <url>https://github.com/jchambers</url>
- <organization>RelayRides</organization>
- <organizationUrl>https://relayrides.com/</organizationUrl>
- <roles>
- <role>developer</role>
- </roles>
- <timezone>-5</timezone>
- </developer>
- </developers>
- <inceptionYear>2013</inceptionYear>
- <url>http://relayrides.github.com/pushy/</url>
- <scm>
- <connection>scm:git:https://github.com/relayrides/pushy.git</connection>
- <developerConnection>scm:git:git@github.com:relayrides/pushy.git</developerConnection>
- <url>https://github.com/relayrides/pushy</url>
- </scm>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5d859b7/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/pushy-0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b.pom.lastUpdated
----------------------------------------------------------------------
diff --git a/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/pushy-0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b.pom.lastUpdated b/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/pushy-0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b.pom.lastUpdated
deleted file mode 100644
index 1038f38..0000000
--- a/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/pushy-0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b.pom.lastUpdated
+++ /dev/null
@@ -1,13 +0,0 @@
-#NOTE: This is an internal implementation file, its format can be changed without prior notice.
-#Tue Jul 29 08:11:50 MDT 2014
-http\://oss.sonatype.org/content/repositories/snapshots/.lastUpdated=1406643109745
-http\://download.java.net/maven/2/.lastUpdated=1406643106235
-http\://oss.sonatype.org/content/repositories/snapshots/.error=
-http\://download.java.net/maven/2/.error=
-http\://maven.springframework.org/milestone/.lastUpdated=1406643110610
-http\://repository.codehaus.org/.lastUpdated=1406643107580
-https\://repository-hector-dev.forge.cloudbees.com/snapshot/.error=
-http\://repository.codehaus.org/.error=
-http\://maven.springframework.org/milestone/.error=
-file\:///Users/ApigeeCorporation/Dev/apigee/apigee-URAP/m2/repository/.lastUpdated=1406643110624
-https\://repository-hector-dev.forge.cloudbees.com/snapshot/.lastUpdated=1406643107290
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5d859b7/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/pushy-0.4-apigee.pom
----------------------------------------------------------------------
diff --git a/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/pushy-0.4-apigee.pom b/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/pushy-0.4-apigee.pom
deleted file mode 100644
index 39f61e3..0000000
--- a/stack/m2/repository/com/relayrides/pushy/0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b/pushy-0.4-apigee.pom
+++ /dev/null
@@ -1,166 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.relayrides</groupId>
- <artifactId>pushy</artifactId>
- <version>0.4-apigee</version>
- <name>Pushy</name>
- <description>A Java library for sending push notifications</description>
- <parent>
- <groupId>org.sonatype.oss</groupId>
- <artifactId>oss-parent</artifactId>
- <version>7</version>
- </parent>
- <licenses>
- <license>
- <name>The MIT License (MIT)</name>
- <url>http://opensource.org/licenses/MIT</url>
- <distribution>repo</distribution>
- </license>
- </licenses>
- <dependencies>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>1.7.6</version>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.11</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- <version>4.0.21.Final</version>
- </dependency>
- <dependency>
- <groupId>com.googlecode.json-simple</groupId>
- <artifactId>json-simple</artifactId>
- <version>1.1.1</version>
- <exclusions>
- <exclusion>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-simple</artifactId>
- <version>1.7.6</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.1</version>
- <configuration>
- <source>1.6</source>
- <target>1.6</target>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>2.16</version>
- <configuration>
- <argLine>-Dorg.slf4j.simpleLogger.defaultLogLevel=warn</argLine>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <version>2.9.1</version>
- <configuration>
- <overview>${basedir}/src/main/java/overview.html</overview>
- <show>public</show>
- <links>
- <link>http://netty.io/4.0/api/</link>
- </links>
- </configuration>
- <executions>
- <execution>
- <id>attach-javadocs</id>
- <goals>
- <goal>jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-source-plugin</artifactId>
- <version>2.2.1</version>
- <executions>
- <execution>
- <id>attach-sources</id>
- <goals>
- <goal>jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- <profiles>
- <profile>
- <id>release-sign-artifacts</id>
- <activation>
- <property>
- <name>performRelease</name>
- <value>true</value>
- </property>
- </activation>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-gpg-plugin</artifactId>
- <version>1.1</version>
- <executions>
- <execution>
- <id>sign-artifacts</id>
- <phase>verify</phase>
- <goals>
- <goal>sign</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </profile>
- </profiles>
- <organization>
- <name>RelayRides</name>
- <url>https://relayrides.com/</url>
- </organization>
- <developers>
- <developer>
- <id>jon</id>
- <name>Jon Chambers</name>
- <email>jon@relayrides.com</email>
- <url>https://github.com/jchambers</url>
- <organization>RelayRides</organization>
- <organizationUrl>https://relayrides.com/</organizationUrl>
- <roles>
- <role>developer</role>
- </roles>
- <timezone>-5</timezone>
- </developer>
- </developers>
- <inceptionYear>2013</inceptionYear>
- <url>http://relayrides.github.com/pushy/</url>
- <scm>
- <connection>scm:git:https://github.com/relayrides/pushy.git</connection>
- <developerConnection>scm:git:git@github.com:relayrides/pushy.git</developerConnection>
- <url>https://github.com/relayrides/pushy</url>
- </scm>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5d859b7/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index b752b2b..2a2fcb0 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -1436,7 +1436,7 @@
<groupId>com.relayrides</groupId>
<artifactId>pushy</artifactId>
<!-- The sha in the version is the git commit used in this build. Check out the pushy source, then this commit to build the library locally -->
- <version>0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b</version>
+ <version>0.4</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5d859b7/stack/services/pom.xml
----------------------------------------------------------------------
diff --git a/stack/services/pom.xml b/stack/services/pom.xml
index e401440..0537613 100644
--- a/stack/services/pom.xml
+++ b/stack/services/pom.xml
@@ -564,7 +564,7 @@
<groupId>com.relayrides</groupId>
<artifactId>pushy</artifactId>
<!-- The sha in the version is the git commit used in this build. Check out the pushy source, then this commit to build the library locally -->
- <version>0.4-apigee-63dec68314c97c6e3ef40c88590e1b196d3ec55b</version>
+ <version>0.4</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5d859b7/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
index 01fda92..4277b8a 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
@@ -21,6 +21,7 @@ import com.google.common.cache.*;
import com.relayrides.pushy.apns.*;
import com.relayrides.pushy.apns.util.*;
+import io.netty.channel.nio.NioEventLoopGroup;
import org.apache.usergrid.persistence.entities.Notification;
import org.apache.usergrid.persistence.entities.Notifier;
import org.mortbay.util.ajax.JSON;
@@ -122,22 +123,8 @@ public class APNsAdapter implements ProviderAdapter {
public Map<String, Date> getInactiveDevices(Notifier notifier,
EntityManager em) throws Exception {
Map<String,Date> map = new HashMap<String,Date>();
- if(isMock(notifier)){
- return map;
- }
PushManager<SimpleApnsPushNotification> pushManager = getPushManager(notifier);
-
- List<ExpiredToken> tokens = null;
- try {
- tokens = pushManager.getExpiredTokens();
- }catch (FeedbackConnectionException fce){
- logger.debug("Failed to get tokens",fce);
- return map;
- }
- for(ExpiredToken token : tokens){
- String expiredToken = new String(token.getToken());
- map.put(expiredToken, token.getExpiration());
- }
+ pushManager.requestExpiredTokens();
return map;
}
@@ -179,13 +166,26 @@ public class APNsAdapter implements ProviderAdapter {
public PushManager<SimpleApnsPushNotification> load(Notifier notifier) {
try {
LinkedBlockingQueue<SimpleApnsPushNotification> queue = new LinkedBlockingQueue<SimpleApnsPushNotification>();
+ NioEventLoopGroup group = new NioEventLoopGroup();
PushManagerConfiguration config = new PushManagerConfiguration();
config.setConcurrentConnectionCount(Runtime.getRuntime().availableProcessors() * 2);
- PushManager<SimpleApnsPushNotification> pushManager = new PushManager<SimpleApnsPushNotification>(getApnsEnvironment(notifier), getSSLContext(notifier), null, null, queue, config);
+ PushManager<SimpleApnsPushNotification> pushManager = new PushManager<>(getApnsEnvironment(notifier), getSSLContext(notifier), group,null , queue, config,notifier.getName());
//only tested when a message is sent
pushManager.registerRejectedNotificationListener(new RejectedAPNsListener());
//this will get tested when start is called
pushManager.registerFailedConnectionListener(new FailedConnectionListener());
+
+ pushManager.registerExpiredTokenListener(new ExpiredTokenListener<SimpleApnsPushNotification>() {
+ @Override
+ public void handleExpiredTokens(PushManager<? extends SimpleApnsPushNotification> pushManager, Collection<ExpiredToken> expiredTokens) {
+ Map<String,Date> map = new HashMap<String,Date>();
+ for(ExpiredToken token : expiredTokens){
+ String expiredToken = new String(token.getToken());
+ map.put(expiredToken, token.getExpiration());
+ }
+ //TODO figure out way to call back and clear out em references
+ }
+ });
try {
if (!pushManager.isStarted()) { //ensure manager is started
pushManager.start();
[05/22] git commit: fix failure sqs message
Posted by sn...@apache.org.
fix failure sqs message
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/99078639
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/99078639
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/99078639
Branch: refs/heads/two-dot-o-events
Commit: 990786392acf9818cfaf09cf649cea8053279945
Parents: e30a7b3
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Oct 27 15:27:07 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Oct 27 15:27:07 2014 -0600
----------------------------------------------------------------------
.../usergrid/persistence/queue/impl/SQSQueueManagerImpl.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/99078639/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index 313b4d7..888370c 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -205,7 +205,7 @@ public class SQSQueueManagerImpl implements QueueManager {
}
DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(url,entries);
DeleteMessageBatchResult result = sqs.deleteMessageBatch(request);
- boolean successful = result.getFailed().size() > 0;
+ boolean successful = result.getFailed().size() <= 0;
if(!successful){
LOG.error("Commit failed {} messages", result.getFailed().size());
}
[13/22] git commit: add comments
Posted by sn...@apache.org.
add comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/25e36e69
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/25e36e69
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/25e36e69
Branch: refs/heads/two-dot-o-events
Commit: 25e36e69cfe9911eb385546b5042864fd3a7e69e
Parents: 8bd1ab4
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Oct 29 10:25:39 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Oct 29 10:25:39 2014 -0600
----------------------------------------------------------------------
.../usergrid/persistence/queue/impl/QueueScopeFactoryImpl.java | 2 +-
.../services/notifications/ApplicationQueueManager.java | 6 +++++-
.../usergrid/services/notifications/InactiveDeviceManager.java | 2 +-
.../services/notifications/ProviderAdapterFactory.java | 2 +-
.../apache/usergrid/services/notifications/gcm/GCMAdapter.java | 2 +-
5 files changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/25e36e69/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeFactoryImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeFactoryImpl.java
index 92ed075..2f8c43f 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeFactoryImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeFactoryImpl.java
@@ -29,7 +29,7 @@ import org.apache.usergrid.persistence.queue.QueueScopeFactory;
import java.util.UUID;
/**
- * Classy class class.
+ * Returns scope for queues.
*/
public class QueueScopeFactoryImpl implements QueueScopeFactory {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/25e36e69/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index 11b197c..1058c34 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -390,7 +390,11 @@ public class ApplicationQueueManager {
public void stop(){
for(ProviderAdapter adapter : getNotifierMap().values()){
- adapter.stop();
+ try {
+ adapter.stop();
+ }catch (Exception e){
+ LOG.error("failed to stop adapter",e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/25e36e69/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
index c54a595..82841d2 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
@@ -33,7 +33,7 @@ import java.util.HashMap;
import java.util.Map;
/**
- * Classy class class.
+ * remove inactive devices.
*/
public class InactiveDeviceManager {
private static final Logger LOG = LoggerFactory.getLogger(InactiveDeviceManager.class);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/25e36e69/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapterFactory.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapterFactory.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapterFactory.java
index a4022c1..ea42480 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapterFactory.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapterFactory.java
@@ -28,7 +28,7 @@ import org.apache.usergrid.services.notifications.gcm.GCMAdapter;
import java.util.HashMap;
/**
- * Classy class class.
+ * Get valid provideradapters
*/
public class ProviderAdapterFactory {
private static final String[] providers = new String[]{"apple", "google", "noop"};
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/25e36e69/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
index b65b25a..e0b32ee 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
@@ -43,7 +43,7 @@ public class GCMAdapter implements ProviderAdapter {
private final Notifier notifier;
private EntityManager entityManager;
- private Map<Notifier, Batch> notifierBatches = new HashMap<Notifier, Batch>();
+ private Map<Notifier, Batch> notifierBatches = new HashMap<>();
public GCMAdapter(EntityManager entityManager,Notifier notifier){
this.notifier = notifier;
[22/22] git commit: Merge branch 'two-dot-o' into two-dot-o-events
Posted by sn...@apache.org.
Merge branch 'two-dot-o' into two-dot-o-events
Conflicts:
stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityDeleteListener.java
stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9153eb9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9153eb9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9153eb9d
Branch: refs/heads/two-dot-o-events
Commit: 9153eb9d80f8af8f1b5a82d8fd03b91c0a77f202
Parents: 16ff75a 707c9b9
Author: Dave Johnson <dm...@apigee.com>
Authored: Wed Oct 29 15:21:10 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Wed Oct 29 15:21:10 2014 -0400
----------------------------------------------------------------------
.../usergrid/corepersistence/CpWalker.java | 44 +-
.../results/FilteringLoader.java | 63 +-
.../results/ResultsLoaderFactory.java | 5 +-
.../results/ResultsLoaderFactoryImpl.java | 4 -
.../results/ResultsVerifier.java | 6 +-
.../results/VersionVerifier.java | 8 +-
.../corepersistence/util/CpEntityMapUtils.java | 5 +-
.../usergrid/persistence/entities/Notifier.java | 2 -
.../usergrid/persistence/index/EntityIndex.java | 2 +-
.../persistence/index/EntityIndexFactory.java | 2 -
.../usergrid/persistence/index/IndexScope.java | 9 +-
.../persistence/index/guice/IndexModule.java | 1 -
.../index/impl/EsEntityIndexBatchImpl.java | 3 +-
.../index/impl/EsEntityIndexImpl.java | 71 ++-
.../persistence/index/impl/EsProvider.java | 8 +-
.../index/query/CandidateResults.java | 7 +-
.../persistence/index/query/Identifier.java | 3 +-
.../usergrid/persistence/index/query/Query.java | 27 +-
.../persistence/index/query/Results.java | 4 +-
.../queue/impl/QueueScopeFactoryImpl.java | 2 +-
.../queue/impl/SQSQueueManagerImpl.java | 6 +-
.../_maven.repositories | 8 -
...14c97c6e3ef40c88590e1b196d3ec55b-javadoc.jar | Bin 160201 -> 0 bytes
...14c97c6e3ef40c88590e1b196d3ec55b-sources.jar | Bin 59744 -> 0 bytes
...63dec68314c97c6e3ef40c88590e1b196d3ec55b.jar | Bin 67751 -> 0 bytes
...7c6e3ef40c88590e1b196d3ec55b.jar.lastUpdated | 13 -
...63dec68314c97c6e3ef40c88590e1b196d3ec55b.pom | 166 ------
...7c6e3ef40c88590e1b196d3ec55b.pom.lastUpdated | 13 -
.../pushy-0.4-apigee.pom | 166 ------
stack/pom.xml | 2 +-
stack/services/pom.xml | 2 +-
.../notifications/ApplicationQueueManager.java | 585 ++-----------------
.../notifications/InactiveDeviceManager.java | 79 +++
.../notifications/NotificationsService.java | 35 +-
.../services/notifications/ProviderAdapter.java | 42 +-
.../notifications/ProviderAdapterFactory.java | 49 ++
.../services/notifications/QueueListener.java | 84 ++-
.../services/notifications/TaskManager.java | 17 +-
.../services/notifications/TestAdapter.java | 23 +-
.../notifications/apns/APNsAdapter.java | 177 ++----
.../notifications/apns/EntityPushManager.java | 74 +++
.../apns/ExpiredTokenListener.java | 52 ++
.../services/notifications/gcm/GCMAdapter.java | 134 +++--
.../impl/ApplicationQueueManagerImpl.java | 523 +++++++++++++++++
.../services/notifiers/NotifiersService.java | 30 +-
.../notifications/NotifiersServiceIT.java | 33 +-
.../apns/MockSuccessfulProviderAdapter.java | 38 +-
.../apns/NotificationsServiceIT.java | 67 +--
.../gcm/MockSuccessfulProviderAdapter.java | 33 +-
.../gcm/NotificationsServiceIT.java | 81 +--
50 files changed, 1293 insertions(+), 1515 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9153eb9d/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9153eb9d/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
[21/22] git commit: Formatting and import cleanup only.
Posted by sn...@apache.org.
Formatting and import cleanup only.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/707c9b99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/707c9b99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/707c9b99
Branch: refs/heads/two-dot-o-events
Commit: 707c9b9989cfcdf21b39977494afb36b8a495000
Parents: 610eb2d
Author: Dave Johnson <dm...@apigee.com>
Authored: Wed Oct 29 15:19:30 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Wed Oct 29 15:19:30 2014 -0400
----------------------------------------------------------------------
.../corepersistence/CpEntityDeleteListener.java | 45 +++++++------
.../usergrid/corepersistence/CpSetup.java | 1 -
.../usergrid/corepersistence/CpWalker.java | 44 +++++++-----
.../results/FilteringLoader.java | 63 +++++++++--------
.../results/ResultsLoaderFactory.java | 5 +-
.../results/ResultsLoaderFactoryImpl.java | 4 --
.../results/ResultsVerifier.java | 6 +-
.../results/VersionVerifier.java | 8 ++-
.../corepersistence/util/CpEntityMapUtils.java | 5 +-
.../usergrid/persistence/index/EntityIndex.java | 4 +-
.../persistence/index/EntityIndexBatch.java | 6 --
.../persistence/index/EntityIndexFactory.java | 2 -
.../usergrid/persistence/index/IndexScope.java | 9 +--
.../persistence/index/guice/IndexModule.java | 1 -
.../index/impl/EsEntityIndexBatchImpl.java | 3 +-
.../index/impl/EsEntityIndexImpl.java | 71 ++++++++++----------
.../persistence/index/impl/EsProvider.java | 11 ++-
.../index/query/CandidateResults.java | 7 +-
.../persistence/index/query/Identifier.java | 3 +-
.../usergrid/persistence/index/query/Query.java | 27 +++++---
.../persistence/index/query/Results.java | 4 +-
21 files changed, 169 insertions(+), 160 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/707c9b99/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityDeleteListener.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityDeleteListener.java
index 70df7d5..4ec4056 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityDeleteListener.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityDeleteListener.java
@@ -68,28 +68,35 @@ public class CpEntityDeleteListener {
return Observable.create( new ObservableIterator<MvccEntity>( "deleteEntities" ) {
@Override
protected Iterator<MvccEntity> getIterator() {
- Iterator<MvccEntity> iterator = entityMetadataSerialization.loadHistory( entityEvent.getCollectionScope(), entity.getId(), entity.getVersion(), serializationFig.getHistorySize() );
+ Iterator<MvccEntity> iterator = entityMetadataSerialization.loadHistory(
+ entityEvent.getCollectionScope(),
+ entity.getId(),
+ entity.getVersion(),
+ serializationFig.getHistorySize() );
return iterator;
}
} ).subscribeOn(Schedulers.io())
- .buffer(serializationFig.getBufferSize())
- .flatMap(new Func1<List<MvccEntity>, Observable<EntityVersion>>() {
- @Override
- public Observable<EntityVersion> call(List<MvccEntity> mvccEntities) {
- MutationBatch mutationBatch = keyspace.prepareMutationBatch();
- List<EntityVersion> versions = new ArrayList<>();
- //actually delete the edge from both the commit log and
- for (MvccEntity mvccEntity : mvccEntities) {
- versions.add(mvccEntity);
- mutationBatch.mergeShallow(entityMetadataSerialization.delete(entityEvent.getCollectionScope(), mvccEntity.getId(), mvccEntity.getVersion()));
- }
- try {
- mutationBatch.execute();
- } catch (ConnectionException e) {
- throw new RuntimeException("Unable to execute mutation", e);
- }
- return Observable.from(versions);
+ .buffer(serializationFig.getBufferSize())
+ .flatMap(new Func1<List<MvccEntity>, Observable<EntityVersion>>() {
+ @Override
+ public Observable<EntityVersion> call(List<MvccEntity> mvccEntities) {
+ MutationBatch mutationBatch = keyspace.prepareMutationBatch();
+ List<EntityVersion> versions = new ArrayList<>();
+ //actually delete the edge from both the commit log and
+ for (MvccEntity mvccEntity : mvccEntities) {
+ versions.add(mvccEntity);
+ mutationBatch.mergeShallow(entityMetadataSerialization.delete(
+ entityEvent.getCollectionScope(),
+ mvccEntity.getId(),
+ mvccEntity.getVersion()));
}
- });
+ try {
+ mutationBatch.execute();
+ } catch (ConnectionException e) {
+ throw new RuntimeException("Unable to execute mutation", e);
+ }
+ return Observable.from(versions);
+ }
+ });
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/707c9b99/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java
index c1bab12..9f7b031 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java
@@ -22,7 +22,6 @@ import com.google.inject.Injector;
import com.netflix.config.ConfigurationManager;
import java.util.Properties;
import java.util.UUID;
-import java.util.logging.Level;
import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.hector.api.ddl.ComparatorType;
import static me.prettyprint.hector.api.factory.HFactory.createColumnFamilyDefinition;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/707c9b99/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
index 27d4224..c3a9fe6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
@@ -56,40 +56,52 @@ public class CpWalker {
}
- public void walkCollections( final CpEntityManager em, final EntityRef start, final CpVisitor visitor )
- throws Exception {
+ public void walkCollections( final CpEntityManager em, final EntityRef start,
+ final CpVisitor visitor ) throws Exception {
doWalkCollections( em, new SimpleId( start.getUuid(), start.getType() ), visitor );
}
- private void doWalkCollections( final CpEntityManager em, final Id applicationId, final CpVisitor visitor ) {
+ private void doWalkCollections(
+ final CpEntityManager em, final Id applicationId, final CpVisitor visitor ) {
final ApplicationScope applicationScope = em.getApplicationScope();
final GraphManager gm = em.getManagerCache().getGraphManager( applicationScope );
- logger.debug( "Loading edges types from {}:{}\n scope {}:{}", new Object[] {
- applicationId.getType(), applicationId.getUuid(), applicationScope.getApplication().getType(),
- applicationScope.getApplication().getUuid()
- } );
+ logger.debug( "Loading edges types from {}:{}\n scope {}:{}",
+ new Object[] {
+ applicationId.getType(),
+ applicationId.getUuid(),
+ applicationScope.getApplication().getType(),
+ applicationScope.getApplication().getUuid()
+ } );
//only search edge types that start with collections
Observable<String> edgeTypes = gm.getEdgeTypesFromSource(
- new SimpleSearchEdgeType( applicationId, CpNamingUtils.EDGE_COLL_SUFFIX, null ) );
+ new SimpleSearchEdgeType( applicationId, CpNamingUtils.EDGE_COLL_SUFFIX, null ) );
edgeTypes.flatMap( new Func1<String, Observable<Edge>>() {
@Override
public Observable<Edge> call( final String edgeType ) {
- logger.debug( "Loading edges of edgeType {} from {}:{}\n scope {}:{}", new Object[] {
- edgeType, applicationId.getType(), applicationId.getUuid(), applicationScope.getApplication().getType(),
+ logger.debug( "Loading edges of edgeType {} from {}:{}\n scope {}:{}",
+ new Object[] {
+ edgeType,
+ applicationId.getType(),
+ applicationId.getUuid(),
+ applicationScope.getApplication().getType(),
applicationScope.getApplication().getUuid()
} );
- return gm.loadEdgesFromSource( new SimpleSearchByEdgeType( applicationId, edgeType, Long.MAX_VALUE,
- SearchByEdgeType.Order.DESCENDING, null ) );
+ return gm.loadEdgesFromSource( new SimpleSearchByEdgeType(
+ applicationId,
+ edgeType,
+ Long.MAX_VALUE,
+ SearchByEdgeType.Order.DESCENDING,
+ null ) );
}
} ).doOnNext( new Action1<Edge>() {
@@ -98,16 +110,16 @@ public class CpWalker {
logger.info( "Re-indexing edge {}", edge );
- EntityRef targetNodeEntityRef =
- new SimpleEntityRef( edge.getTargetNode().getType(), edge.getTargetNode().getUuid() );
+ EntityRef targetNodeEntityRef = new SimpleEntityRef(
+ edge.getTargetNode().getType(), edge.getTargetNode().getUuid() );
Entity entity;
try {
entity = em.get( targetNodeEntityRef );
}
catch ( Exception ex ) {
- logger.error( "Error getting sourceEntity {}:{}, continuing", targetNodeEntityRef.getType(),
- targetNodeEntityRef.getUuid() );
+ logger.error( "Error getting sourceEntity {}:{}, continuing",
+ targetNodeEntityRef.getType(), targetNodeEntityRef.getUuid() );
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/707c9b99/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
index 00faefd..8ca2211 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
@@ -96,40 +96,32 @@ public class FilteringLoader implements ResultsLoader {
}
- /**
- * For each entity, holds the index it appears in our candidates for keeping ordering correct
- */
+ // For each entity, holds the index it appears in our candidates for keeping ordering correct
final Map<Id, Integer> orderIndex = new HashMap<>( crs.size() );
- /**
- * Maps the entity ids to our candidates
- */
+ // Maps the entity ids to our candidates
final Map<Id, CandidateResult> maxCandidateMapping = new HashMap<>( crs.size() );
- /**
- * Groups all candidate results by types. When search connections there will be multiple types,
- * so we want to batch
- * fetch them more efficiently
- */
- final HashMultimap<String, CandidateResult> groupedByScopes = HashMultimap.create( crs.size(), crs.size() );
+ // Groups all candidate results by types. When search connections there will be multiple
+ // types, so we want to batch fetch them more efficiently
+
+ final HashMultimap<String, CandidateResult> groupedByScopes =
+ HashMultimap.create( crs.size(), crs.size() );
final Iterator<CandidateResult> iter = crs.iterator();
- /**
- * TODO, in this case we're "optimizing" due to the limitations of collection scope. Perhaps we should
- * change the API to just be an application, then an "owner" scope?
- */
+ // TODO, in this case we're "optimizing" due to the limitations of collection scope.
+ // Perhaps we should change the API to just be an application, then an "owner" scope?
- /**
- * Go through the candidates and group them by scope for more efficient retrieval. Also remove duplicates before we even make a network call
- */
+ // Go through the candidates and group them by scope for more efficient retrieval.
+ // Also remove duplicates before we even make a network call
for ( int i = 0; iter.hasNext(); i++ ) {
final CandidateResult currentCandidate = iter.next();
- final String collectionType =
- CpNamingUtils.getCollectionScopeNameFromEntityType( currentCandidate.getId().getType() );
+ final String collectionType = CpNamingUtils.getCollectionScopeNameFromEntityType(
+ currentCandidate.getId().getType() );
final Id entityId = currentCandidate.getId();
@@ -154,9 +146,12 @@ public class FilteringLoader implements ResultsLoader {
if ( UUIDComparator.staticCompare( currentVersion, previousMaxVersion ) > 0 ) {
//de-index it
- logger.debug( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", new Object[] {
- entityId.getUuid(), entityId.getType(), previousMaxVersion, currentVersion
- } );
+ logger.debug( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}",
+ new Object[] {
+ entityId.getUuid(),
+ entityId.getType(),
+ previousMaxVersion,
+ currentVersion } );
//deindex this document, and remove the previous maxVersion
//we have to deindex this from our ownerId, since this is what gave us the reference
@@ -197,12 +192,13 @@ public class FilteringLoader implements ResultsLoader {
//now using the scope, load the collection
- // Get the collection scope and batch load all the versions. We put all entities in app/app for easy retrieval
- // unless persistence changes, we never want to read from any scope other than the app, app, scope name scope
+ // Get the collection scope and batch load all the versions. We put all entities in
+ // app/app for easy retrieval/ unless persistence changes, we never want to read from
+ // any scope other than the app, app, scope name scope
final CollectionScope collScope = new CollectionScopeImpl(
- applicationScope.getApplication(), applicationScope.getApplication(), scopeName );
+ applicationScope.getApplication(), applicationScope.getApplication(), scopeName);
- final EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collScope );
+ final EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collScope);
//load the results into the loader for this scope for validation
@@ -228,7 +224,8 @@ public class FilteringLoader implements ResultsLoader {
}
- //NOTE DO NOT execute the batch here. It changes the results and we need consistent paging until we aggregate all results
+ // NOTE DO NOT execute the batch here.
+ // It changes the results and we need consistent paging until we aggregate all results
return resultsVerifier.getResults( sortedResults.values() );
}
@@ -239,10 +236,12 @@ public class FilteringLoader implements ResultsLoader {
}
- protected void deIndex( final EntityIndexBatch batch, final Id ownerId, final CandidateResult candidateResult ) {
+ protected void deIndex( final EntityIndexBatch batch, final Id ownerId,
+ final CandidateResult candidateResult ) {
- IndexScope indexScope = new IndexScopeImpl( ownerId,
- CpNamingUtils.getCollectionScopeNameFromEntityType( candidateResult.getId().getType() ) );
+ IndexScope indexScope = new IndexScopeImpl(
+ ownerId,
+ CpNamingUtils.getCollectionScopeNameFromEntityType( candidateResult.getId().getType()));
batch.deindex( indexScope, candidateResult );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/707c9b99/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java
index ebd9380..779d604 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java
@@ -32,8 +32,7 @@ public interface ResultsLoaderFactory {
/**
* Get the load for results
- * @return
*/
- public ResultsLoader getLoader(final ApplicationScope applicationScope, final EntityRef ownerId,
- final Query.Level resultsLevel );
+ public ResultsLoader getLoader( final ApplicationScope applicationScope,
+ final EntityRef ownerId, final Query.Level resultsLevel );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/707c9b99/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java
index e987882..0f39fe3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java
@@ -31,10 +31,8 @@ import com.google.inject.Inject;
/**
* Factory for creating results
*/
-
public class ResultsLoaderFactoryImpl implements ResultsLoaderFactory {
-
private final CpManagerCache managerCache;
@@ -48,7 +46,6 @@ public class ResultsLoaderFactoryImpl implements ResultsLoaderFactory {
public ResultsLoader getLoader( final ApplicationScope applicationScope,
final EntityRef ownerId, final Query.Level resultsLevel ) {
-
ResultsVerifier verifier;
if ( resultsLevel == Query.Level.REFS ) {
@@ -61,7 +58,6 @@ public class ResultsLoaderFactoryImpl implements ResultsLoaderFactory {
verifier = new EntityVerifier(Query.MAX_LIMIT);
}
-
return new FilteringLoader( managerCache, verifier, ownerId, applicationScope );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/707c9b99/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsVerifier.java
index 995c50a..1b92bcc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsVerifier.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsVerifier.java
@@ -21,7 +21,6 @@ package org.apache.usergrid.corepersistence.results;
import java.util.Collection;
-
import org.apache.usergrid.persistence.Results;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.index.query.CandidateResult;
@@ -38,10 +37,9 @@ public interface ResultsVerifier {
public void loadResults(Collection<Id> ids, EntityCollectionManager ecm);
/**
- * Return true if the candidate result is a valid result that should be retained. * If it
- * should not it should also be removed from the list of possible return values in this loader
+ * Return true if the candidate result is a valid result that should be retained. If it should
+ * not it should also be removed from the list of possible return values in this loader
* @param candidateResult
- * @return
*/
public boolean isValid(CandidateResult candidateResult);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/707c9b99/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/VersionVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/VersionVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/VersionVerifier.java
index 3a03b7b..c541550 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/VersionVerifier.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/VersionVerifier.java
@@ -36,7 +36,7 @@ import com.fasterxml.uuid.UUIDComparator;
/**
- * A loader that verifies versions are correct in cassandra and match elasticsearch
+ * A loader that verifies versions are correct in Cassandra and match ElasticSearch
*/
public abstract class VersionVerifier implements ResultsVerifier {
@@ -69,7 +69,11 @@ public abstract class VersionVerifier implements ResultsVerifier {
if ( UUIDComparator.staticCompare( savedVersion, candidateResult.getVersion() ) > 0 ) {
logger.debug( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}",
- new Object[] { entityId.getUuid(), entityId.getType(), candidateResult.getVersion(), savedVersion
+ new Object[] {
+ entityId.getUuid(),
+ entityId.getType(),
+ candidateResult.getVersion(),
+ savedVersion
} );
return false;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/707c9b99/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpEntityMapUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpEntityMapUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpEntityMapUtils.java
index feed396..c110509 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpEntityMapUtils.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpEntityMapUtils.java
@@ -68,7 +68,8 @@ public class CpEntityMapUtils {
return fromMap( null, map, entityType, topLevel );
}
- public static Entity fromMap( Entity entity, Map<String, Object> map, String entityType, boolean topLevel ) {
+ public static Entity fromMap(
+ Entity entity, Map<String, Object> map, String entityType, boolean topLevel ) {
if ( entity == null ) {
entity = new Entity();
@@ -263,7 +264,7 @@ public class CpEntityMapUtils {
// field names lat and lon trigger ElasticSearch geo location
locMap.put("lat", locField.getValue().getLatitude());
locMap.put("lon", locField.getValue().getLongitude());
- entityMap.put( field.getName(), field.getValue());
+ entityMap.put( field.getName(), field.getValue());
} else if (f instanceof ByteArrayField) {
ByteArrayField bf = ( ByteArrayField ) f;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/707c9b99/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
index 44ad05c..5f4606c 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
@@ -30,8 +30,8 @@ import org.apache.usergrid.persistence.model.entity.Id;
public interface EntityIndex {
/**
- * This should ONLY ever be called once on application create. Otherwise we're introducing slowness into our system
- *
+ * This should ONLY ever be called once on application create.
+ * Otherwise we're introducing slowness into our system
*/
public void initializeIndex();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/707c9b99/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
index f98025b..1a11e9b 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
@@ -27,9 +27,6 @@ import org.apache.usergrid.persistence.model.entity.Id;
public interface EntityIndexBatch {
-
-
-
/**
* Create index for Entity
* @param indexScope The scope for the index
@@ -59,9 +56,6 @@ public interface EntityIndexBatch {
*/
public EntityIndexBatch deindex(final IndexScope scope, final Id id, final UUID version);
-
- //TODO: Create a delete method that delete's by Id. This will delete all documents from ES with the same entity Id
-
/**
* Execute the batch
*/
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/707c9b99/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexFactory.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexFactory.java
index 1a97b5a..78a5137 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexFactory.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexFactory.java
@@ -19,9 +19,7 @@
package org.apache.usergrid.persistence.index;
-
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-
import com.google.inject.assistedinject.Assisted;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/707c9b99/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexScope.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexScope.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexScope.java
index 9fbb7f5..5b70304 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexScope.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexScope.java
@@ -18,8 +18,6 @@
*/
package org.apache.usergrid.persistence.index;
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -31,11 +29,10 @@ public interface IndexScope {
*/
public String getName();
-
/**
- * @return A uuid that is unique to this context. It can be any uuid (time uuid preferred). Can be an application id
- * if this is indexed in a collection, or the collection owner. In a graph structure, this will be the source
- * node in the graph
+ * @return A uuid that is unique to this context. It can be any uuid (time uuid preferred).
+ * Can be an application id if this is indexed in a collection, or the collection owner.
+ * In a graph structure, this will be the source node in the graph
*/
public Id getOwner();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/707c9b99/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
index 13661cc..edc938b 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
@@ -22,7 +22,6 @@ package org.apache.usergrid.persistence.index.guice;
import org.apache.usergrid.persistence.index.IndexFig;
import com.google.inject.AbstractModule;
import com.google.inject.assistedinject.FactoryModuleBuilder;
-import org.apache.usergrid.persistence.collection.guice.CollectionModule;
import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.impl.EsEntityIndexImpl;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/707c9b99/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 151e850..fab135a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -312,7 +312,8 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
}
else if ( f instanceof UUIDField ) {
- entityMap.put( STRING_PREFIX + field.getName().toLowerCase(), field.getValue().toString().toLowerCase() );
+ entityMap.put( STRING_PREFIX + field.getName().toLowerCase(),
+ field.getValue().toString().toLowerCase() );
}
else {
entityMap.put( field.getName().toLowerCase(), field.getValue() );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/707c9b99/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 7584386..fc09b5a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -130,15 +130,12 @@ public class EsEntityIndexImpl implements EntityIndex {
CreateIndexResponse cir = admin.indices().prepareCreate( indexName ).execute().actionGet();
log.info( "Created new Index Name [{}] ACK=[{}]", indexName, cir.isAcknowledged() );
+ // create the document, this ensures the index is ready
+
+ // Immediately create a document and remove it to ensure the entire cluster is ready
+ // to receive documents. Occasionally we see errors. See this post:
+ // http://elasticsearch-users.115913.n3.nabble.com/IndexMissingException-on-create-index-followed-by-refresh-td1832793.html
- //create the document, this ensures the index is ready
- /**
- * Immediately create a document and remove it to ensure the entire cluster is ready to receive documents
- * . Occasionally we see
- * errors. See this post.
- * http://elasticsearch-users.115913.n3.nabble.com/IndexMissingException-on-create-index-followed-by-refresh-td1832793.html
- *
- */
testNewIndex();
}
catch ( IndexAlreadyExistsException expected ) {
@@ -167,18 +164,18 @@ public class EsEntityIndexImpl implements EntityIndex {
public boolean doOp() {
final String tempId = UUIDGenerator.newTimeUUID().toString();
+ client.prepareIndex( indexName, VERIFY_TYPE, tempId )
+ .setSource( DEFAULT_PAYLOAD ).get();
- client.prepareIndex( indexName, VERIFY_TYPE, tempId ).setSource( DEFAULT_PAYLOAD ).get();
-
- log.info( "Successfully created new document with docId {} in index {} and type {}", tempId, indexName,
- VERIFY_TYPE );
+ log.info( "Successfully created new document with docId {} in index {} and type {}",
+ tempId, indexName, VERIFY_TYPE );
- //delete all types, this way if we miss one it will get cleaned up
+ // delete all types, this way if we miss one it will get cleaned up
+ client.prepareDeleteByQuery( indexName ).setTypes( VERIFY_TYPE )
+ .setQuery( MATCH_ALL_QUERY_BUILDER ).get();
- client.prepareDeleteByQuery( indexName ).setTypes( VERIFY_TYPE ).setQuery( MATCH_ALL_QUERY_BUILDER )
- .get();
-
- log.info( "Successfully deleted all documents in index {} and type {}", indexName, VERIFY_TYPE );
+ log.info( "Successfully deleted all documents in index {} and type {}",
+ indexName, VERIFY_TYPE );
return true;
}
@@ -189,18 +186,19 @@ public class EsEntityIndexImpl implements EntityIndex {
/**
- * Setup ElasticSearch type mappings as a template that applies to all new indexes. Applies to all indexes that
- * start with our prefix.
+ * Setup ElasticSearch type mappings as a template that applies to all new indexes.
+ * Applies to all indexes that start with our prefix.
*/
private void createMappings() throws IOException {
- XContentBuilder xcb =
- IndexingUtils.createDoubleStringIndexMapping( XContentFactory.jsonBuilder(), "_default_" );
+ XContentBuilder xcb = IndexingUtils
+ .createDoubleStringIndexMapping( XContentFactory.jsonBuilder(), "_default_" );
- PutIndexTemplateResponse pitr = client.admin().indices().preparePutTemplate( "usergrid_template" )
- .setTemplate( config.getIndexPrefix() + "*" ).addMapping( "_default_",
- xcb ) // set mapping as the default for all types
- .execute().actionGet();
+ PutIndexTemplateResponse pitr = client.admin().indices()
+ .preparePutTemplate( "usergrid_template" )
+ .setTemplate( config.getIndexPrefix() + "*" )
+ .addMapping( "_default_", xcb ) // set mapping as the default for all types
+ .execute().actionGet();
}
@@ -226,9 +224,8 @@ public class EsEntityIndexImpl implements EntityIndex {
SearchResponse searchResponse;
if ( query.getCursor() == null ) {
- SearchRequestBuilder srb =
- client.prepareSearch( indexName ).setTypes( indexType ).setScroll( cursorTimeout + "m" )
- .setQuery( qb );
+ SearchRequestBuilder srb = client.prepareSearch( indexName )
+ .setTypes( indexType ).setScroll( cursorTimeout + "m" ) .setQuery( qb );
FilterBuilder fb = query.createFilterBuilder();
if ( fb != null ) {
@@ -252,21 +249,22 @@ public class EsEntityIndexImpl implements EntityIndex {
// type prefix to use. So, here we add an order by clause for every possible type
// that you can order by: string, number and boolean and we ask ElasticSearch
// to ignore any fields that are not present.
+
final String stringFieldName = STRING_PREFIX + sp.getPropertyName();
- final FieldSortBuilder stringSort =
- SortBuilders.fieldSort( stringFieldName ).order( order ).ignoreUnmapped( true );
+ final FieldSortBuilder stringSort = SortBuilders.fieldSort( stringFieldName )
+ .order( order ).ignoreUnmapped( true );
srb.addSort( stringSort );
log.debug( " Sort: {} order by {}", stringFieldName, order.toString() );
final String numberFieldName = NUMBER_PREFIX + sp.getPropertyName();
- final FieldSortBuilder numberSort =
- SortBuilders.fieldSort( numberFieldName ).order( order ).ignoreUnmapped( true );
+ final FieldSortBuilder numberSort = SortBuilders.fieldSort( numberFieldName )
+ .order( order ).ignoreUnmapped( true );
srb.addSort( numberSort );
log.debug( " Sort: {} order by {}", numberFieldName, order.toString() );
final String booleanFieldName = BOOLEAN_PREFIX + sp.getPropertyName();
- final FieldSortBuilder booleanSort =
- SortBuilders.fieldSort( booleanFieldName ).order( order ).ignoreUnmapped( true );
+ final FieldSortBuilder booleanSort = SortBuilders.fieldSort( booleanFieldName )
+ .order( order ).ignoreUnmapped( true );
srb.addSort( booleanSort );
log.debug( " Sort: {} order by {}", booleanFieldName, order.toString() );
}
@@ -283,7 +281,8 @@ public class EsEntityIndexImpl implements EntityIndex {
}
log.debug( "Executing query with cursor: {} ", scrollId );
- SearchScrollRequestBuilder ssrb = client.prepareSearchScroll( scrollId ).setScroll( cursorTimeout + "m" );
+ SearchScrollRequestBuilder ssrb = client.prepareSearchScroll( scrollId )
+ .setScroll( cursorTimeout + "m" );
searchResponse = ssrb.execute().actionGet();
}
@@ -329,7 +328,7 @@ public class EsEntityIndexImpl implements EntityIndex {
return true;
}
catch ( IndexMissingException e ) {
- log.error( "Unable to refresh index after create. Waiting before sleeping.", e );
+ log.error( "Unable to refresh index after create. Waiting before sleeping.", e);
throw e;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/707c9b99/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsProvider.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsProvider.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsProvider.java
index 9b37952..efc61a9 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsProvider.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsProvider.java
@@ -25,16 +25,13 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Properties;
-import java.util.logging.Level;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.usergrid.persistence.core.util.AvailablePortFinder;
import org.apache.usergrid.persistence.index.IndexFig;
import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import org.slf4j.Logger;
@@ -112,7 +109,7 @@ public class EsProvider {
.build();
log.info("-----------------------------------------------------------------------");
- log.info("Starting ElasticSearch embedded server with settings: \n" + settings.getAsMap() );
+ log.info("Starting ElasticSearch embedded server settings: \n"+settings.getAsMap());
log.info("-----------------------------------------------------------------------");
Node node = NodeBuilder.nodeBuilder().settings(settings)
@@ -172,8 +169,10 @@ public class EsProvider {
log.debug("Creating ElasticSearch client with settings: " + settings.getAsMap());
- //use this client when connecting via socket only, such as ssh tunnel or other firewall issues
-// newClient = new TransportClient(settings).addTransportAddress( new InetSocketTransportAddress("localhost", 9300) );
+ // use this client when connecting via socket only,
+ // such as ssh tunnel or other firewall issues
+ // newClient = new TransportClient(settings).addTransportAddress(
+ // new InetSocketTransportAddress("localhost", 9300) );
//use this client for quick connectivity
Node node = NodeBuilder.nodeBuilder().settings(settings)
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/707c9b99/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/CandidateResults.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/CandidateResults.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/CandidateResults.java
index d8c225f..a71bcbe 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/CandidateResults.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/CandidateResults.java
@@ -27,8 +27,8 @@ import org.slf4j.LoggerFactory;
/**
- * Internal results class, should not be returned as results to a user. Only returns candidate entity results
- *
+ * Internal results class, should not be returned as results to a user.
+ * Only returns candidate entity results
*/
public class CandidateResults implements Iterable<CandidateResult> {
@@ -66,8 +66,7 @@ public class CandidateResults implements Iterable<CandidateResult> {
return query;
}
-
-
+
public int size() {
return candidates.size();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/707c9b99/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Identifier.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Identifier.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Identifier.java
index 8d886d5..66dcd3b 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Identifier.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Identifier.java
@@ -43,7 +43,8 @@ public class Identifier implements Serializable {
Object value;
static Pattern emailRegEx = Pattern.compile( EMAIL_REX );
- //"Pattern nameRegEx" below used to be [a-zA-Z0-9_\\-./], changed it to contain a 'space' to address https://issues.apache.org/jira/browse/USERGRID-94
+ // "Pattern nameRegEx" below used to be [a-zA-Z0-9_\\-./], changed it to contain a 'space' to a
+ // ddress https://issues.apache.org/jira/browse/USERGRID-94
static Pattern nameRegEx = Pattern.compile( "[a-zA-Z0-9_\\-./ ]*" );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/707c9b99/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
index a1e25da..778134e 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
@@ -115,12 +115,13 @@ public class Query {
public Query( Query q ) {
if ( q != null ) {
type = q.type;
- sortPredicates = q.sortPredicates != null ? new ArrayList<SortPredicate>( q.sortPredicates ) : null;
+ sortPredicates = q.sortPredicates != null
+ ? new ArrayList<SortPredicate>( q.sortPredicates ) : null;
startResult = q.startResult;
cursor = q.cursor;
limit = q.limit;
- selectAssignments =
- q.selectAssignments != null ? new LinkedHashMap<String, String>( q.selectAssignments ) : null;
+ selectAssignments = q.selectAssignments != null
+ ? new LinkedHashMap<String, String>( q.selectAssignments ) : null;
mergeSelectResults = q.mergeSelectResults;
//level = q.level;
connection = q.connection;
@@ -132,9 +133,10 @@ public class Query {
resolution = q.resolution;
pad = q.pad;
rootOperand = q.rootOperand;
- identifiers = q.identifiers != null ? new ArrayList<Identifier>( q.identifiers ) : null;
- counterFilters =
- q.counterFilters != null ? new ArrayList<CounterFilterPredicate>( q.counterFilters ) : null;
+ identifiers = q.identifiers != null
+ ? new ArrayList<Identifier>( q.identifiers ) : null;
+ counterFilters = q.counterFilters != null
+ ? new ArrayList<CounterFilterPredicate>( q.counterFilters ) : null;
collection = q.collection;
}
}
@@ -253,7 +255,8 @@ public class Query {
}
- public static Query fromQueryParams( Map<String, List<String>> params ) throws QueryParseException {
+ public static Query fromQueryParams( Map<String, List<String>> params )
+ throws QueryParseException {
Query q = null;
CounterResolution resolution = null;
List<Identifier> identifiers = null;
@@ -621,8 +624,8 @@ public class Query {
for ( SortPredicate s : sortPredicates ) {
if ( s.getPropertyName().equals( sort.getPropertyName() ) ) {
- throw new QueryParseException(
- String.format( "Attempted to set sort order for %s more than once", s.getPropertyName() ) );
+ throw new QueryParseException( String.format(
+ "Attempted to set sort order for %s more than once", s.getPropertyName() ) );
}
}
sortPredicates.add( sort );
@@ -1094,7 +1097,9 @@ public class Query {
private final Query.SortDirection direction;
- public SortPredicate(@JsonProperty("propertyName") String propertyName, @JsonProperty("direction") Query.SortDirection direction ) {
+ public SortPredicate(@JsonProperty("propertyName") String propertyName,
+ @JsonProperty("direction") Query.SortDirection direction ) {
+
if ( propertyName == null ) {
throw new NullPointerException( "Property name was null" );
}
@@ -1232,7 +1237,7 @@ public class Query {
}
}
- if ( ( user == null ) && ( group == null ) && ( category == null ) && ( name == null ) ) {
+ if ( ( user == null ) && ( group == null ) && ( category == null ) && ( name == null)) {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/707c9b99/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Results.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Results.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Results.java
index d0c0571..89745d0 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Results.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Results.java
@@ -53,7 +53,9 @@ public class Results implements Iterable<Entity> {
final EntityCollectionManagerFactory ecmf;
- public Results( Query query, List<CandidateResult> candidates, EntityCollectionManagerFactory ecmf ) {
+ public Results( Query query, List<CandidateResult> candidates,
+ EntityCollectionManagerFactory ecmf ) {
+
this.query = query;
this.candidates = candidates;
this.ecmf = ecmf;
[06/22] git commit: add config for inactive dev retrieval
Posted by sn...@apache.org.
add config for inactive dev retrieval
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/f3343f1f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/f3343f1f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/f3343f1f
Branch: refs/heads/two-dot-o-events
Commit: f3343f1f8561ddc36c632e665ef8706163bbbcd0
Parents: 9907863
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Oct 27 15:33:49 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Oct 27 15:33:49 2014 -0600
----------------------------------------------------------------------
.../services/notifications/QueueListener.java | 16 +++++++++++-----
1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f3343f1f/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 48c110b..286daf1 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -64,7 +64,7 @@ public class QueueListener {
private long sleepWhenNoneFound = 0;
- private long sleepBetweenRuns = 5000;
+ private long sleepBetweenRuns = 0;
private ExecutorService pool;
private List<Future> futures;
@@ -73,6 +73,7 @@ public class QueueListener {
private Integer batchSize = 10;
private String queueName;
public QueueManager TEST_QUEUE_MANAGER;
+ private int consecutiveCallsToRemoveDevices;
public QueueListener(ServiceManagerFactory smf, EntityManagerFactory emf, MetricsFactory metricsService, Properties props){
this.queueManagerFactory = CpSetup.getInjector().getInstance(QueueManagerFactory.class);
@@ -93,12 +94,14 @@ public class QueueListener {
int threadCount = 0;
try {
- sleepBetweenRuns = new Long(properties.getProperty("usergrid.notifications.listener.sleep.between", "0")).longValue();
+ sleepBetweenRuns = new Long(properties.getProperty("usergrid.notifications.listener.sleep.between", ""+sleepBetweenRuns)).longValue();
sleepWhenNoneFound = new Long(properties.getProperty("usergrid.notifications.listener.sleep.after", ""+DEFAULT_SLEEP)).longValue();
batchSize = new Integer(properties.getProperty("usergrid.notifications.listener.batchSize", (""+batchSize)));
queueName = ApplicationQueueManager.getQueueNames(properties);
int maxThreads = new Integer(properties.getProperty("usergrid.notifications.listener.maxThreads", ""+MAX_THREADS));
+ consecutiveCallsToRemoveDevices = new Integer(properties.getProperty("usergrid.notifications.inactive.interval", ""+100));
+
futures = new ArrayList<Future>(maxThreads);
//create our thread pool based on our threadcount.
@@ -154,7 +157,6 @@ public class QueueListener {
LOG.info("retrieved batch of {} messages from queue {} ", messages.size(),queueName);
if (messages.size() > 0) {
- runCount++;
HashMap<UUID, List<QueueMessage>> messageMap = new HashMap<>(messages.size());
//group messages into hash map by app id
for (QueueMessage message : messages) {
@@ -207,9 +209,13 @@ public class QueueListener {
LOG.info("sleep between rounds...sleep...{}", sleepBetweenRuns);
Thread.sleep(sleepBetweenRuns);
}
- if(runCount % 100 == 0){
+ if(runCount++ % consecutiveCallsToRemoveDevices == 0){
for(ApplicationQueueManager applicationQueueManager : queueManagerMap.values()){
- applicationQueueManager.asyncCheckForInactiveDevices();
+ try {
+ applicationQueueManager.asyncCheckForInactiveDevices();
+ }catch (Exception inactiveDeviceException){
+ LOG.error("Inactive Device Get failed",inactiveDeviceException);
+ }
}
//clear everything
queueManagerMap.clear();
[11/22] git commit: refactor static classes out of adapters
Posted by sn...@apache.org.
refactor static classes out of adapters
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/0f4584c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/0f4584c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/0f4584c4
Branch: refs/heads/two-dot-o-events
Commit: 0f4584c47a775d97d0b2f3e297e5a5be8b06a31b
Parents: ad0cce9
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Oct 29 09:50:03 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Oct 29 09:50:03 2014 -0600
----------------------------------------------------------------------
.../usergrid/persistence/entities/Notifier.java | 5 -
.../notifications/ApplicationQueueManager.java | 76 ++++++-----
.../notifications/InactiveDeviceManager.java | 13 +-
.../notifications/NotificationsService.java | 24 +---
.../services/notifications/ProviderAdapter.java | 12 +-
.../notifications/ProviderAdapterFactory.java | 49 ++++++++
.../services/notifications/QueueListener.java | 65 +++++++---
.../services/notifications/TestAdapter.java | 21 +++-
.../notifications/apns/APNsAdapter.java | 126 +++++++------------
.../notifications/apns/EntityPushManager.java | 6 +-
.../apns/ExpiredTokenListener.java | 2 +-
.../services/notifications/gcm/GCMAdapter.java | 32 +++--
.../services/notifiers/NotifiersService.java | 23 ++--
.../notifications/NotifiersServiceIT.java | 33 +----
.../apns/MockSuccessfulProviderAdapter.java | 37 ++----
.../apns/NotificationsServiceIT.java | 43 ++-----
.../gcm/MockSuccessfulProviderAdapter.java | 31 ++---
.../gcm/NotificationsServiceIT.java | 74 -----------
18 files changed, 292 insertions(+), 380 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f4584c4/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notifier.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notifier.java
index ba3f2fc..819ccde 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notifier.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notifier.java
@@ -132,10 +132,5 @@ public class Notifier extends TypedEntity {
this.apiKey = apiKey;
}
- @JsonIgnore
- public EntityManager getEntityManager(){return entityManager;}
-
- public void setEntityManager(EntityManager entityManager){ this.entityManager = entityManager;}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f4584c4/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index 3fd84a6..11b197c 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -39,6 +39,7 @@ import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
+import java.security.Provider;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@@ -61,9 +62,7 @@ public class ApplicationQueueManager {
private final MetricsFactory metricsFactory;
private final String queueName;
- HashMap<Object, Notifier> notifierHashMap; // only retrieve notifiers once
-
- public final Map<String, ProviderAdapter> providerAdapters;
+ HashMap<Object, ProviderAdapter> notifierHashMap; // only retrieve notifiers once
public ApplicationQueueManager(JobScheduler jobScheduler, EntityManager entityManager, QueueManager queueManager, MetricsFactory metricsFactory, Properties properties){
@@ -72,12 +71,7 @@ public class ApplicationQueueManager {
this.jobScheduler = jobScheduler;
this.metricsFactory = metricsFactory;
this.queueName = getQueueNames(properties);
- providerAdapters = new HashMap<String, ProviderAdapter>(3);
- {
- providerAdapters.put("apple", new APNsAdapter());
- providerAdapters.put("google", new GCMAdapter());
- providerAdapters.put("noop", new TestAdapter());
- };
+
}
public boolean scheduleQueueJob(Notification notification) throws Exception{
@@ -106,7 +100,7 @@ public class ApplicationQueueManager {
final AtomicInteger deviceCount = new AtomicInteger(); //count devices so you can make a judgement on batching
final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<String>(); //build up list of issues
- final HashMap<Object,Notifier> notifierMap = getNotifierMap();
+ final HashMap<Object,ProviderAdapter> notifierMap = getNotifierMap();
//get devices in querystring, and make sure you have access
if (pathQuery != null) {
@@ -147,9 +141,9 @@ public class ApplicationQueueManager {
//find the device notifier info, match it to the payload
for (Map.Entry<String, Object> entry : payloads.entrySet()) {
- Notifier notifier = notifierMap.get(entry.getKey().toLowerCase());
+ ProviderAdapter adapter = notifierMap.get(entry.getKey().toLowerCase());
now = System.currentTimeMillis();
- String providerId = getProviderId(deviceRef, notifier);
+ String providerId = getProviderId(deviceRef, adapter.getNotifier());
if (providerId != null) {
notifierId = providerId;
notifierKey = entry.getKey().toLowerCase();
@@ -236,10 +230,10 @@ public class ApplicationQueueManager {
* only need to get notifiers once. will reset on next batch
* @return
*/
- public HashMap<Object,Notifier> getNotifierMap(){
+ public HashMap<Object,ProviderAdapter> getNotifierMap(){
if(notifierHashMap == null) {
long now = System.currentTimeMillis();
- notifierHashMap = new HashMap<Object, Notifier>();
+ notifierHashMap = new HashMap<Object, ProviderAdapter>();
Query query = new Query();
query.setCollection("notifiers");
query.setLimit(100);
@@ -251,12 +245,12 @@ public class ApplicationQueueManager {
int count = 0;
while (notifierIterator.hasNext()) {
Notifier notifier = notifierIterator.next();
- notifier.setEntityManager(em);
String name = notifier.getName() != null ? notifier.getName() : "";
UUID uuid = notifier.getUuid() != null ? notifier.getUuid() : UUID.randomUUID();
- notifierHashMap.put(name.toLowerCase(), notifier);
- notifierHashMap.put(uuid, notifier);
- notifierHashMap.put(uuid.toString(), notifier);
+ ProviderAdapter providerAdapter = ProviderAdapterFactory.getProviderAdapter(notifier,em);
+ notifierHashMap.put(name.toLowerCase(), providerAdapter);
+ notifierHashMap.put(uuid, providerAdapter);
+ notifierHashMap.put(uuid.toString(), providerAdapter);
if(count++ >= 100){
LOG.error("ApplicationQueueManager: too many notifiers...breaking out ", notifierHashMap.size());
break;
@@ -276,7 +270,7 @@ public class ApplicationQueueManager {
LOG.info("sending batch of {} notifications.", messages.size());
final Meter sendMeter = metricsFactory.getMeter(NotificationsService.class, "send");
- final Map<Object, Notifier> notifierMap = getNotifierMap();
+ final Map<Object, ProviderAdapter> notifierMap = getNotifierMap();
final ApplicationQueueManager proxy = this;
final ConcurrentHashMap<UUID,TaskManager> taskMap = new ConcurrentHashMap<UUID, TaskManager>(messages.size());
final ConcurrentHashMap<UUID,Notification> notificationMap = new ConcurrentHashMap<UUID, Notification>(messages.size());
@@ -310,10 +304,10 @@ public class ApplicationQueueManager {
try {
String notifierName = message.getNotifierKey().toLowerCase();
- Notifier notifier = notifierMap.get(notifierName.toLowerCase());
+ ProviderAdapter providerAdapter = notifierMap.get(notifierName.toLowerCase());
Object payload = translatedPayloads.get(notifierName);
Receipt receipt = new Receipt(notification.getUuid(), message.getNotifierId(), payload, deviceUUID);
- TaskTracker tracker = new TaskTracker(notifier, taskManager, receipt, deviceUUID);
+ TaskTracker tracker = new TaskTracker(providerAdapter.getNotifier(), taskManager, receipt, deviceUUID);
if(!isOkToSend(notification)){
tracker.failed(0, "Notification is duplicate/expired/cancelled.");
}else {
@@ -323,8 +317,7 @@ public class ApplicationQueueManager {
} else {
long now = System.currentTimeMillis();
try {
- ProviderAdapter providerAdapter = providerAdapters.get(notifier.getProvider());
- providerAdapter.sendNotification(message.getNotifierId(), notifier, payload, notification, tracker);
+ providerAdapter.sendNotification(message.getNotifierId(), payload, notification, tracker);
} catch (Exception e) {
tracker.failed(0, e.getMessage());
} finally {
@@ -362,7 +355,7 @@ public class ApplicationQueueManager {
@Override
public HashMap<UUID, ApplicationQueueMessage> call(List<ApplicationQueueMessage> queueMessages) {
//for gcm this will actually send notification
- for (ProviderAdapter providerAdapter : providerAdapters.values()) {
+ for (ProviderAdapter providerAdapter : notifierMap.values()) {
try {
providerAdapter.doneSendingNotifications();
} catch (Exception e) {
@@ -395,24 +388,26 @@ public class ApplicationQueueManager {
return o;
}
+ public void stop(){
+ for(ProviderAdapter adapter : getNotifierMap().values()){
+ adapter.stop();
+ }
+ }
+
/**
* Call the adapter with the notifier
*/
- private Map<String, Object> translatePayloads(Map<String, Object> payloads, Map<Object, Notifier> notifierMap) throws Exception {
- Map<String, Object> translatedPayloads = new HashMap<String, Object>(
- payloads.size());
+ private Map<String, Object> translatePayloads(Map<String, Object> payloads, Map<Object, ProviderAdapter> notifierMap) throws Exception {
+ Map<String, Object> translatedPayloads = new HashMap<String, Object>( payloads.size());
for (Map.Entry<String, Object> entry : payloads.entrySet()) {
String payloadKey = entry.getKey().toLowerCase();
Object payloadValue = entry.getValue();
- Notifier notifier = notifierMap.get(payloadKey);
- if (notifier != null) {
- ProviderAdapter providerAdapter = providerAdapters.get(notifier.getProvider());
- if (providerAdapter != null) {
- Object translatedPayload = payloadValue != null ? providerAdapter.translatePayload(payloadValue) : null;
- if (translatedPayload != null) {
- translatedPayloads.put(payloadKey, translatedPayload);
- }
+ ProviderAdapter providerAdapter = notifierMap.get(payloadKey);
+ if (providerAdapter != null) {
+ Object translatedPayload = payloadValue != null ? providerAdapter.translatePayload(payloadValue) : null;
+ if (translatedPayload != null) {
+ translatedPayloads.put(payloadKey, translatedPayload);
}
}
}
@@ -453,15 +448,14 @@ public class ApplicationQueueManager {
}
public void asyncCheckForInactiveDevices() throws Exception {
- Collection<Notifier> notifiers = getNotifierMap().values();
- for (final Notifier notifier : notifiers) {
+ Collection<ProviderAdapter> providerAdapters = getNotifierMap().values();
+ for (final ProviderAdapter providerAdapter : providerAdapters) {
try {
- ProviderAdapter providerAdapter = providerAdapters.get(notifier.getProvider());
if (providerAdapter != null) {
- LOG.debug("checking notifier {} for inactive devices", notifier);
- providerAdapter.removeInactiveDevices(notifier, em);
+ LOG.debug("checking notifier {} for inactive devices", providerAdapter.getNotifier());
+ providerAdapter.removeInactiveDevices();
- LOG.debug("finished checking notifier {} for inactive devices",notifier);
+ LOG.debug("finished checking notifier {} for inactive devices",providerAdapter.getNotifier());
}
} catch (Exception e) {
LOG.error("checkForInactiveDevices", e); // not
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f4584c4/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
index ecee485..c54a595 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
@@ -38,13 +38,14 @@ import java.util.Map;
public class InactiveDeviceManager {
private static final Logger LOG = LoggerFactory.getLogger(InactiveDeviceManager.class);
private final Notifier notifier;
+ private EntityManager entityManager;
- public InactiveDeviceManager(Notifier notifier){
+ public InactiveDeviceManager(Notifier notifier,EntityManager entityManager){
this.notifier = notifier;
+ this.entityManager = entityManager;
}
public void removeInactiveDevices( Map<String,Date> inactiveDeviceMap ){
final String notfierPostFix = ApplicationQueueManager.NOTIFIER_ID_POSTFIX;
- final EntityManager em = notifier.getEntityManager();
if (inactiveDeviceMap != null && inactiveDeviceMap.size() > 0) {
LOG.debug("processing {} inactive devices", inactiveDeviceMap.size());
Map<String, Object> clearPushtokenMap = new HashMap<String, Object>( 2);
@@ -57,16 +58,16 @@ public class InactiveDeviceManager {
// name
Query query = new Query();
query.addEqualityFilter(notifier.getName() + notfierPostFix, entry.getKey());
- Results results = em.searchCollection(em.getApplication(), "devices", query);
+ Results results = entityManager.searchCollection(entityManager.getApplication(), "devices", query);
for (Entity e : results.getEntities()) {
- em.updateProperties(e, clearPushtokenMap);
+ entityManager.updateProperties(e, clearPushtokenMap);
}
// uuid
query = new Query();
query.addEqualityFilter(notifier.getUuid() + notfierPostFix, entry.getKey());
- results = em.searchCollection(em.getApplication(), "devices", query);
+ results = entityManager.searchCollection(entityManager.getApplication(), "devices", query);
for (Entity e : results.getEntities()) {
- em.updateProperties(e, clearPushtokenMap);
+ entityManager.updateProperties(e, clearPushtokenMap);
}
}catch (Exception e){
LOG.error("failed to remove token",e);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f4584c4/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index 5420d29..a64704c 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -72,18 +72,6 @@ public class NotificationsService extends AbstractCollectionService {
MESSAGE_PROPERTY_DEVICE_UUID, UUID.class);
}
- // these 2 can be static, but GCM can't. future: would be nice to get gcm
- // static as well...
- public static ProviderAdapter APNS_ADAPTER = new APNsAdapter();
- public static ProviderAdapter TEST_ADAPTER = new TestAdapter();
-
- public final Map<String, ProviderAdapter> providerAdapters =
- new HashMap<String, ProviderAdapter>(3);
- {
- providerAdapters.put("apple", APNS_ADAPTER);
- providerAdapters.put("google", new GCMAdapter());
- providerAdapters.put("noop", TEST_ADAPTER);
- }
private ApplicationQueueManager notificationQueueManager;
private long gracePeriod;
@@ -250,10 +238,6 @@ public class NotificationsService extends AbstractCollectionService {
return (notification.getStarted() == null);
}
- public Set<String> getProviders() {
- return providerAdapters.keySet();
- }
-
// validate payloads
private void validate(EntityRef ref, ServicePayload servicePayload)
throws Exception {
@@ -278,8 +262,7 @@ public class NotificationsService extends AbstractCollectionService {
throw new IllegalArgumentException("notifier \""
+ notifierId + "\" not found");
}
- ProviderAdapter providerAdapter = providerAdapters.get(notifier
- .getProvider());
+ ProviderAdapter providerAdapter = ProviderAdapterFactory.getProviderAdapter(notifier,em);
Object payload = entry.getValue();
try {
return providerAdapter.translatePayload(payload); // validate
@@ -354,10 +337,9 @@ public class NotificationsService extends AbstractCollectionService {
* failure
*/
public void testConnection(Notifier notifier) throws Exception {
- ProviderAdapter providerAdapter = providerAdapters.get(notifier.getProvider());
+ ProviderAdapter providerAdapter = ProviderAdapterFactory.getProviderAdapter(notifier,em);
if (providerAdapter != null) {
- notifier.setEntityManager(em);
- providerAdapter.testConnection(notifier);
+ providerAdapter.testConnection();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f4584c4/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapter.java
index 33e921f..8acd006 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapter.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapter.java
@@ -34,10 +34,9 @@ import org.apache.usergrid.services.ServicePayload;
*/
public interface ProviderAdapter {
- public void testConnection(Notifier notifier) throws ConnectionException;
+ public void testConnection() throws ConnectionException;
- public void sendNotification(String providerId, Notifier notifier,
- Object payload, Notification notification, TaskTracker tracker)
+ public void sendNotification(String providerId, Object payload, Notification notification, TaskTracker tracker)
throws Exception;
/**
@@ -46,10 +45,13 @@ public interface ProviderAdapter {
*/
public void doneSendingNotifications() throws Exception;
- public void removeInactiveDevices(Notifier notifier,
- EntityManager em) throws Exception;
+ public void removeInactiveDevices() throws Exception;
public Object translatePayload(Object payload) throws Exception;
public void validateCreateNotifier(ServicePayload payload) throws Exception;
+
+ public void stop();
+
+ public Notifier getNotifier();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f4584c4/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapterFactory.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapterFactory.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapterFactory.java
new file mode 100644
index 0000000..a4022c1
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapterFactory.java
@@ -0,0 +1,49 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.services.notifications;
+
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.entities.Notifier;
+import org.apache.usergrid.services.notifications.apns.APNsAdapter;
+import org.apache.usergrid.services.notifications.gcm.GCMAdapter;
+
+import java.util.HashMap;
+
+/**
+ * Classy class class.
+ */
+public class ProviderAdapterFactory {
+ private static final String[] providers = new String[]{"apple", "google", "noop"};
+ public static ProviderAdapter getProviderAdapter(Notifier notifier, EntityManager entityManager){
+ ProviderAdapter adapter = null;
+ switch(notifier.getProvider().toLowerCase()){
+ case "apple" : adapter = new APNsAdapter(entityManager,notifier); break;
+ case "google" : adapter = new GCMAdapter(entityManager ,notifier); break;
+ case "noop" : adapter = new TestAdapter(notifier); break;
+ }
+ return adapter;
+
+ }
+
+ public static String[] getValidProviders() {
+ return providers;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f4584c4/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index fffc8cd..3c788a9 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -18,6 +18,7 @@ package org.apache.usergrid.services.notifications;
import com.codahale.metrics.*;
import com.codahale.metrics.Timer;
+import com.google.common.cache.*;
import org.apache.usergrid.corepersistence.CpSetup;
import org.apache.usergrid.metrics.MetricsFactory;
@@ -35,10 +36,7 @@ import org.slf4j.LoggerFactory;
import rx.Observable;
import javax.annotation.PostConstruct;
import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class QueueListener {
@@ -148,7 +146,9 @@ public class QueueListener {
QueueManager queueManager = TEST_QUEUE_MANAGER != null ? TEST_QUEUE_MANAGER : queueManagerFactory.getQueueManager(queueScope);
// run until there are no more active jobs
long runCount = 0;
- Map<UUID,ApplicationQueueManager> queueManagerMap = new ConcurrentHashMap<>(); //keep a cache of queuemangers then clear them at an interval
+ //cache to retrieve push manager, cached per notifier, so many notifications will get same push manager
+ LoadingCache<UUID, ApplicationQueueManager> queueManagerMap = getQueueManagerCache(queueManager);
+
while ( true ) {
try {
@@ -175,20 +175,7 @@ public class QueueListener {
//send each set of app ids together
for (Map.Entry<UUID, List<QueueMessage>> entry : messageMap.entrySet()) {
UUID applicationId = entry.getKey();
- EntityManager entityManager = emf.getEntityManager(applicationId);
- ServiceManager serviceManager = smf.getServiceManager(applicationId);
-
ApplicationQueueManager manager = queueManagerMap.get(applicationId);
- if(manager==null) {
- manager = new ApplicationQueueManager(
- new JobScheduler(serviceManager, entityManager),
- entityManager,
- queueManager,
- metricsService,
- properties
- );
- queueManagerMap.put(applicationId,manager);
- }
LOG.info("send batch for app {} of {} messages", entry.getKey(), entry.getValue().size());
Observable current = manager.sendBatchToProviders(entry.getValue(),queueName);
if(merge == null)
@@ -210,7 +197,7 @@ public class QueueListener {
Thread.sleep(sleepBetweenRuns);
}
if(++runCount % consecutiveCallsToRemoveDevices == 0){
- for(ApplicationQueueManager applicationQueueManager : queueManagerMap.values()){
+ for(ApplicationQueueManager applicationQueueManager : queueManagerMap.asMap().values()){
try {
applicationQueueManager.asyncCheckForInactiveDevices();
}catch (Exception inactiveDeviceException){
@@ -218,7 +205,6 @@ public class QueueListener {
}
}
//clear everything
- queueManagerMap.clear();
runCount=0;
}
}
@@ -244,6 +230,43 @@ public class QueueListener {
}
}
+ private LoadingCache<UUID, ApplicationQueueManager> getQueueManagerCache(final QueueManager queueManager) {
+ return CacheBuilder
+ .newBuilder()
+ .expireAfterAccess(10, TimeUnit.MINUTES)
+ .removalListener(new RemovalListener<UUID, ApplicationQueueManager>() {
+ @Override
+ public void onRemoval(
+ RemovalNotification<UUID, ApplicationQueueManager> queueManagerNotifiication) {
+ try {
+ queueManagerNotifiication.getValue().stop();
+ } catch (Exception ie) {
+ LOG.error("Failed to shutdown from cache", ie);
+ }
+ }
+ }).build(new CacheLoader<UUID, ApplicationQueueManager>() {
+ @Override
+ public ApplicationQueueManager load(final UUID applicationId) {
+ try {
+ EntityManager entityManager = emf.getEntityManager(applicationId);
+ ServiceManager serviceManager = smf.getServiceManager(applicationId);
+
+ ApplicationQueueManager manager = new ApplicationQueueManager(
+ new JobScheduler(serviceManager, entityManager),
+ entityManager,
+ queueManager,
+ metricsService,
+ properties
+ );
+ return manager;
+ } catch (Exception e) {
+ LOG.error("Could not instantiate queue manager", e);
+ return null;
+ }
+ }
+ });
+ }
+
public void stop(){
LOG.info("stop processes");
@@ -263,4 +286,6 @@ public class QueueListener {
}
public int getBatchSize(){return batchSize;}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f4584c4/stack/services/src/main/java/org/apache/usergrid/services/notifications/TestAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TestAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TestAdapter.java
index b4eb767..1007dc6 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TestAdapter.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TestAdapter.java
@@ -38,25 +38,26 @@ public class TestAdapter implements ProviderAdapter {
private static final Logger log = LoggerFactory.getLogger(TestAdapter.class);
private static final int DELAY = 1; // if delay > 0, uses threadpool
+ private final Notifier notifier;
private ExecutorService pool = null;
- public TestAdapter() {
+ public TestAdapter(Notifier notifier) {
if (DELAY > 0) {
pool = Executors
.newFixedThreadPool(APNsAdapter.MAX_CONNECTION_POOL_SIZE);
}
+ this.notifier = notifier;
}
@Override
- public void testConnection(Notifier notifier) throws ConnectionException {
+ public void testConnection() throws ConnectionException {
}
@Override
public void sendNotification(
String providerId,
- Notifier notifier,
- final Object payload,
+ final Object payload,
Notification notification,
TaskTracker tracker)
throws Exception {
@@ -89,7 +90,7 @@ public class TestAdapter implements ProviderAdapter {
}
@Override
- public void removeInactiveDevices(Notifier notifier, EntityManager em) throws Exception {
+ public void removeInactiveDevices() throws Exception {
log.debug("getInactiveDevices()");
}
@@ -101,4 +102,14 @@ public class TestAdapter implements ProviderAdapter {
@Override
public void validateCreateNotifier(ServicePayload payload) throws Exception {
}
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public Notifier getNotifier() {
+ return notifier;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f4584c4/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
index 7ab1dc3..da5d3e5 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
@@ -60,13 +60,18 @@ public class APNsAdapter implements ProviderAdapter {
validEnvironments.add("mock");
}
- public APNsAdapter(){}
+ private final Notifier notifier;
+
+ private EntityManager entityManager;
+ private EntityPushManager pushManager;
+
+ public APNsAdapter(EntityManager entityManager, Notifier notifier){
+ this.entityManager = entityManager;
+ this.notifier = notifier;
+ }
@Override
- public void testConnection(Notifier notifier) throws ConnectionException {
- if(isMock(notifier)){
- delayRandom(notifier); return;
- }
+ public void testConnection() throws ConnectionException {
TestAPNsNotification notification = TestAPNsNotification.create(TEST_TOKEN, TEST_PAYLOAD);
try {
CountDownLatch latch = new CountDownLatch(1);
@@ -99,8 +104,7 @@ public class APNsAdapter implements ProviderAdapter {
}
@Override
- public void sendNotification(String providerId, Notifier notifier,
- Object payload, Notification notification, TaskTracker tracker)
+ public void sendNotification(String providerId, Object payload, Notification notification, TaskTracker tracker)
throws Exception {
APNsNotification apnsNotification = APNsNotification.create(providerId, payload.toString(), notification, tracker);
PushManager<SimpleApnsPushNotification> pushManager = getPushManager(notifier);
@@ -119,74 +123,35 @@ public class APNsAdapter implements ProviderAdapter {
}
@Override
- public void removeInactiveDevices(Notifier notifier,EntityManager em) throws Exception {
+ public void removeInactiveDevices() throws Exception {
PushManager<SimpleApnsPushNotification> pushManager = getPushManager(notifier);
pushManager.requestExpiredTokens();
}
private EntityPushManager getPushManager(Notifier notifier) throws ExecutionException {
- EntityPushManager pushManager = apnsServiceMap.get(notifier);
if(pushManager != null && !pushManager.isStarted() && pushManager.isShutDown()){
- apnsServiceMap.invalidate(notifier);
- pushManager = apnsServiceMap.get(notifier);
+ PushManagerConfiguration config = new PushManagerConfiguration();
+ config.setConcurrentConnectionCount(Runtime.getRuntime().availableProcessors() * 2);
+ EntityPushManager pushManager = new EntityPushManager(notifier,entityManager, config);
+ //only tested when a message is sent
+ pushManager.registerRejectedNotificationListener(new RejectedAPNsListener());
+ //this will get tested when start is called
+ pushManager.registerFailedConnectionListener(new FailedConnectionListener());
+ //unregistered expired devices
+ pushManager.registerExpiredTokenListener(new ExpiredTokenListener());
+
+ try {
+ if (!pushManager.isStarted()) { //ensure manager is started
+ pushManager.start();
+ }
+ }catch(IllegalStateException ise){
+ logger.debug("failed to start",ise);//could have failed because its started
+ }
+ return pushManager;
}
return pushManager;
}
- //cache to retrieve push manager, cached per notifier, so many notifications will get same push manager
- private static LoadingCache<Notifier, EntityPushManager> apnsServiceMap = CacheBuilder
- .newBuilder()
- .expireAfterAccess(10, TimeUnit.MINUTES)
- .removalListener(new RemovalListener<Notifier, EntityPushManager>() {
- @Override
- public void onRemoval(
- RemovalNotification<Notifier,EntityPushManager> notification) {
- try {
- EntityPushManager manager = notification.getValue();
- if (!manager.isShutDown()) {
- List<SimpleApnsPushNotification> notifications = manager.shutdown(3000);
- for (SimpleApnsPushNotification notification1 : notifications) {
- try {
- ((APNsNotification) notification1).messageSendFailed(new Exception("Cache Expired: Shutting down sender"));
- }catch (Exception e){
- logger.error("Failed to mark notification",e);
- }
- }
- }
- } catch (Exception ie) {
- logger.error("Failed to shutdown from cache", ie);
- }
- }
- }).build(new CacheLoader<Notifier, EntityPushManager>() {
- @Override
- public EntityPushManager load(final Notifier notifier) {
- try {
- PushManagerConfiguration config = new PushManagerConfiguration();
- config.setConcurrentConnectionCount(Runtime.getRuntime().availableProcessors() * 2);
- EntityPushManager pushManager = new EntityPushManager(notifier, config);
- //only tested when a message is sent
- pushManager.registerRejectedNotificationListener(new RejectedAPNsListener());
- //this will get tested when start is called
- pushManager.registerFailedConnectionListener(new FailedConnectionListener());
- //unregistered expired devices
- pushManager.registerExpiredTokenListener(new ExpiredTokenListener());
-
- try {
- if (!pushManager.isStarted()) { //ensure manager is started
- pushManager.start();
- }
- }catch(IllegalStateException ise){
- logger.debug("failed to start",ise);//could have failed because its started
- }
- return pushManager;
- } catch (Exception e) {
- logger.error("Could not instantiate pushmanager", e);
- return null;
- }
- }
- });
-
-
@Override
public Object translatePayload(Object objPayload) throws Exception {
@@ -220,23 +185,26 @@ public class APNsAdapter implements ProviderAdapter {
"p12Certificate");
}
}
- public boolean isMock(Notifier notifier){
- return notifier.getEnvironment() !=null ? notifier.getEnvironment().equals("mock") : false ;
- }
- public boolean delayRandom(Notifier notifier) {
- boolean wasDelayed = false;
- if (isMock(notifier)) {
+
+ @Override
+ public void stop(){
try {
- Thread.sleep(
- new Random().nextInt(300)
- );
- wasDelayed = true;
- } catch (InterruptedException ie) {
- //delay was stopped
+ if (!pushManager.isShutDown()) {
+ List<SimpleApnsPushNotification> notifications = pushManager.shutdown(3000);
+ for (SimpleApnsPushNotification notification1 : notifications) {
+ try {
+ ((APNsNotification) notification1).messageSendFailed(new Exception("Cache Expired: Shutting down sender"));
+ }catch (Exception e){
+ logger.error("Failed to mark notification",e);
+ }
+ }
+ }
+ } catch (Exception ie) {
+ logger.error("Failed to shutdown from cache", ie);
}
- }
- return wasDelayed;
}
+ @Override
+ public Notifier getNotifier(){return notifier;}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f4584c4/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/EntityPushManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/EntityPushManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/EntityPushManager.java
index 8283458..d675081 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/EntityPushManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/EntityPushManager.java
@@ -38,14 +38,16 @@ import java.util.concurrent.LinkedBlockingDeque;
*/
public class EntityPushManager extends PushManager<SimpleApnsPushNotification> {
private final Notifier notifier;
+ private final EntityManager entityManager;
- public EntityPushManager( Notifier notifier, PushManagerConfiguration configuration) {
+ public EntityPushManager( Notifier notifier, EntityManager entityManager, PushManagerConfiguration configuration) {
super(getApnsEnvironment(notifier), getSSLContext(notifier), null, null, new LinkedBlockingDeque<SimpleApnsPushNotification>(), configuration, notifier.getName());
this.notifier = notifier;
+ this.entityManager = entityManager;
}
public EntityManager getEntityManager() {
- return notifier.getEntityManager();
+ return entityManager;
}
public Notifier getNotifier() {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f4584c4/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java
index 3daa20c..6408dfd 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java
@@ -53,7 +53,7 @@ public class ExpiredTokenListener implements com.relayrides.pushy.apns.ExpiredTo
}
if(pushManager instanceof EntityPushManager){
EntityPushManager entityPushManager = (EntityPushManager) pushManager;
- InactiveDeviceManager inactiveDeviceManager = new InactiveDeviceManager(entityPushManager.getNotifier());
+ InactiveDeviceManager inactiveDeviceManager = new InactiveDeviceManager(entityPushManager.getNotifier(),entityPushManager.getEntityManager());
inactiveDeviceManager.removeInactiveDevices(inactiveDeviceMap);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f4584c4/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
index f8de5ff..b65b25a 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
@@ -40,11 +40,17 @@ public class GCMAdapter implements ProviderAdapter {
private static final Logger LOG = LoggerFactory.getLogger(GCMAdapter.class);
private static final int SEND_RETRIES = 3;
private static int BATCH_SIZE = 1000;
+ private final Notifier notifier;
+ private EntityManager entityManager;
private Map<Notifier, Batch> notifierBatches = new HashMap<Notifier, Batch>();
+ public GCMAdapter(EntityManager entityManager,Notifier notifier){
+ this.notifier = notifier;
+ this.entityManager = entityManager;
+ }
@Override
- public void testConnection(Notifier notifier) throws ConnectionException {
+ public void testConnection() throws ConnectionException {
Sender sender = new Sender(notifier.getApiKey());
Message message = new Message.Builder().build();
try {
@@ -56,7 +62,7 @@ public class GCMAdapter implements ProviderAdapter {
}
@Override
- public void sendNotification(String providerId, Notifier notifier,
+ public void sendNotification(String providerId,
Object payload, Notification notification, TaskTracker tracker)
throws Exception {
Map<String,Object> map = (Map<String, Object>) payload;
@@ -66,12 +72,11 @@ public class GCMAdapter implements ProviderAdapter {
expireSeconds = expireSeconds <= 2419200 ? expireSeconds : 2419200; //send the max gcm value documented here http://developer.android.com/google/gcm/adv.html#ttl
map.put(expiresKey, expireSeconds);
}
- Batch batch = getBatch(notifier, map);
+ Batch batch = getBatch( map);
batch.add(providerId, tracker);
}
- synchronized private Batch getBatch(Notifier notifier,
- Map<String, Object> payload) {
+ synchronized private Batch getBatch( Map<String, Object> payload) {
Batch batch = notifierBatches.get(notifier);
if (batch == null && payload != null) {
batch = new Batch(notifier, payload);
@@ -88,13 +93,12 @@ public class GCMAdapter implements ProviderAdapter {
}
@Override
- public void removeInactiveDevices(Notifier notifier,
- EntityManager em) throws Exception {
- Batch batch = getBatch(notifier, null);
+ public void removeInactiveDevices( ) throws Exception {
+ Batch batch = getBatch( null);
Map<String,Date> map = null;
if(batch != null) {
map = batch.getAndClearInactiveDevices();
- InactiveDeviceManager deviceManager = new InactiveDeviceManager(notifier);
+ InactiveDeviceManager deviceManager = new InactiveDeviceManager(notifier,entityManager);
deviceManager.removeInactiveDevices(map);
}
@@ -126,6 +130,16 @@ public class GCMAdapter implements ProviderAdapter {
}
}
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public Notifier getNotifier() {
+ return notifier;
+ }
+
private class Batch {
private Notifier notifier;
private Map payload;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f4584c4/stack/services/src/main/java/org/apache/usergrid/services/notifiers/NotifiersService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifiers/NotifiersService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifiers/NotifiersService.java
index 8c41e0a..ae089fe 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifiers/NotifiersService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifiers/NotifiersService.java
@@ -18,6 +18,10 @@ package org.apache.usergrid.services.notifiers;
import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.entities.Notifier;
+import org.apache.usergrid.services.notifications.ProviderAdapterFactory;
+import org.apache.usergrid.services.notifications.TestAdapter;
+import org.apache.usergrid.services.notifications.apns.APNsAdapter;
+import org.apache.usergrid.services.notifications.gcm.GCMAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.services.*;
@@ -25,6 +29,8 @@ import org.apache.usergrid.services.notifications.NotificationsService;
import org.apache.usergrid.services.notifications.ProviderAdapter;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Set;
public class NotifiersService extends AbstractCollectionService {
@@ -45,22 +51,21 @@ public class NotifiersService extends AbstractCollectionService {
NotificationsService ns = (NotificationsService) sm
.getService("notifications");
- Set<String> providers = ns.getProviders();
String provider = payload.getStringProperty("provider");
- if (!providers.contains(provider)) {
- throw new IllegalArgumentException("provider must be one of: "
- + Arrays.toString(providers.toArray()));
- }
- ProviderAdapter providerAdapter = ns.providerAdapters.get(provider);
- providerAdapter.validateCreateNotifier(payload);
ServiceResults results = super.postCollection(context);
-
- Notifier notifier =(Notifier) results.getEntity();
+ Notifier notifier = (Notifier) results.getEntity();
if (notifier != null) {
try {
+ ProviderAdapter providerAdapter = ProviderAdapterFactory.getProviderAdapter(notifier, em);
+
+ if (providerAdapter==null) {
+ throw new IllegalArgumentException("provider must be one of: "
+ + Arrays.toString(ProviderAdapterFactory.getValidProviders()));
+ }
+ providerAdapter.validateCreateNotifier(payload);
ns.testConnection(notifier);
} catch (Exception e) {
logger.info("notifier testConnection() failed", e);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f4584c4/stack/services/src/test/java/org/apache/usergrid/services/notifications/NotifiersServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/NotifiersServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/NotifiersServiceIT.java
index 61cf1e6..7ced70c 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/NotifiersServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/NotifiersServiceIT.java
@@ -44,7 +44,6 @@ public class NotifiersServiceIT extends AbstractServiceIT {
@Before
public void before() throws Exception {
ns = (NotificationsService) app.getSm().getService("notifications");
- MockSuccessfulProviderAdapter.install(ns);
}
@Test
@@ -76,16 +75,6 @@ public class NotifiersServiceIT extends AbstractServiceIT {
// ok
}
- // mock action (based on verified actual behavior) //
- ns.providerAdapters
- .put("google",
- new org.apache.usergrid.services.notifications.gcm.MockSuccessfulProviderAdapter() {
- @Override
- public void testConnection(Notifier notifier)
- throws ConnectionException {
- throw new ConnectionException("", null);
- }
- });
app.put("apiKey", "xxx");
@@ -100,8 +89,6 @@ public class NotifiersServiceIT extends AbstractServiceIT {
@Test
public void badAPNsEnvironment() throws Exception {
- MockSuccessfulProviderAdapter.uninstall(ns);
-
app.clear();
app.put("provider", "apple");
app.put("environment", "xxx");
@@ -141,16 +128,7 @@ public class NotifiersServiceIT extends AbstractServiceIT {
@Test
public void badAPNsCertificate() throws Exception {
- // mock error (based on verified actual behavior) //
- ns.providerAdapters.put("apple", new MockSuccessfulProviderAdapter() {
- @Override
- public void testConnection(Notifier notifier)
- throws ConnectionException {
- Exception e = new SocketException(
- "Connection closed by remote host");
- throw new ConnectionException(e.getMessage(), e);
- }
- });
+
app.clear();
app.put("provider", "apple");
@@ -174,15 +152,6 @@ public class NotifiersServiceIT extends AbstractServiceIT {
@Test
public void badAPNsPassword() throws Exception {
- // mock error (based on verified actual behavior) //
- ns.providerAdapters.put("apple", new MockSuccessfulProviderAdapter() {
- @Override
- public void testConnection(Notifier notifier)
- throws ConnectionException {
- Exception e = new Exception("invalid ssl config");
- throw new ConnectionException(e.getMessage(), e);
- }
- });
app.clear();
app.put("provider", "apple");
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f4584c4/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/MockSuccessfulProviderAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/MockSuccessfulProviderAdapter.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/MockSuccessfulProviderAdapter.java
index e864e4f..b3096b7 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/MockSuccessfulProviderAdapter.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/MockSuccessfulProviderAdapter.java
@@ -23,34 +23,15 @@ import org.apache.usergrid.services.notifications.NotificationsService;
import org.apache.usergrid.services.notifications.ProviderAdapter;
import org.apache.usergrid.services.notifications.TaskTracker;
-import java.util.Date;
-import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.services.ServicePayload;
public class MockSuccessfulProviderAdapter implements ProviderAdapter {
private static ProviderAdapter realProviderAdapter;
- public static void install(NotificationsService ns) {
- install(ns, false);
- }
-
- public static void install(NotificationsService ns, boolean doAsync) {
- if (realProviderAdapter != null)
- realProviderAdapter = ns.providerAdapters.get("apple");
- ns.providerAdapters.put("apple", new MockSuccessfulProviderAdapter(
- doAsync));
- }
-
- public static void uninstall(NotificationsService ns) {
- if (realProviderAdapter != null) {
- ns.providerAdapters.put("apple", realProviderAdapter);
- }
- }
private ExecutorService pool;
@@ -65,7 +46,7 @@ public class MockSuccessfulProviderAdapter implements ProviderAdapter {
}
@Override
- public void testConnection(Notifier notifier) throws ConnectionException {
+ public void testConnection() throws ConnectionException {
}
@Override
@@ -74,8 +55,7 @@ public class MockSuccessfulProviderAdapter implements ProviderAdapter {
}
@Override
- public void removeInactiveDevices(Notifier notifier,
- EntityManager em) {
+ public void removeInactiveDevices() {
}
@Override
@@ -83,12 +63,21 @@ public class MockSuccessfulProviderAdapter implements ProviderAdapter {
}
@Override
+ public void stop() {
+
+ }
+
+ @Override
+ public Notifier getNotifier() {
+ return null;
+ }
+
+ @Override
public void doneSendingNotifications() throws Exception {
}
@Override
- public void sendNotification(final String providerId,
- final Notifier notifier, final Object payload,
+ public void sendNotification(final String providerId, final Object payload,
final Notification notification, final TaskTracker tracker)
throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f4584c4/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
index 31a653c..2a4ec73 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
@@ -327,8 +327,6 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
@Test
public void badPayloads() throws Exception {
- MockSuccessfulProviderAdapter.uninstall(ns);
-
// bad payloads format
app.clear();
@@ -400,24 +398,6 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
// mock action (based on verified actual behavior) //
- if (!USE_REAL_CONNECTIONS) {
- ns.providerAdapters.put("apple",
- new MockSuccessfulProviderAdapter() {
- @Override
- public void sendNotification(String providerId,
- Notifier notifier, Object payload,
- Notification notification, TaskTracker tracker)
- throws Exception {
- APNsNotification apnsNotification = APNsNotification
- .create(providerId, payload.toString(),
- notification, tracker);
- apnsNotification.messageSent();
- apnsNotification
- .messageSendFailed( RejectedNotificationReason.INVALID_TOKEN);
- }
- });
- }
-
// create push notification //
HashMap<String, Object> properties = new LinkedHashMap<String, Object>();
@@ -628,16 +608,16 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
// mock error (based on verified actual behavior) //
if (!USE_REAL_CONNECTIONS) {
- ns.providerAdapters.put("apple",
- new MockSuccessfulProviderAdapter() {
- @Override
- public void testConnection(Notifier notifier)
- throws ConnectionException {
- Exception e = new SocketException(
- "Connection closed by remote host");
- throw new ConnectionException(e.getMessage(), e);
- }
- });
+// ns.providerAdapters.put("apple",
+// new MockSuccessfulProviderAdapter() {
+// @Override
+// public void testConnection(Notifier notifier)
+// throws ConnectionException {
+// Exception e = new SocketException(
+// "Connection closed by remote host");
+// throw new ConnectionException(e.getMessage(), e);
+// }
+// });
}
// create push notification //
@@ -685,8 +665,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
// mock action (based on verified actual behavior) //
if (!USE_REAL_CONNECTIONS) {
- ns.providerAdapters.put("apple",
- new MockSuccessfulProviderAdapter());
+
}
// create push notification //
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f4584c4/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/MockSuccessfulProviderAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/MockSuccessfulProviderAdapter.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/MockSuccessfulProviderAdapter.java
index 3836fe8..c41e5ab 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/MockSuccessfulProviderAdapter.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/MockSuccessfulProviderAdapter.java
@@ -32,23 +32,13 @@ public class MockSuccessfulProviderAdapter implements ProviderAdapter {
private static ProviderAdapter realProviderAdapter;
- public static void install(NotificationsService ns) {
- if (realProviderAdapter != null)
- realProviderAdapter = ns.providerAdapters.get("google");
- ns.providerAdapters.put("google", new MockSuccessfulProviderAdapter());
- }
- public static void uninstall(NotificationsService ns) {
- if (realProviderAdapter != null) {
- ns.providerAdapters.put("google", realProviderAdapter);
- }
- }
public MockSuccessfulProviderAdapter() {
}
@Override
- public void testConnection(Notifier notifier) throws ConnectionException {
+ public void testConnection() throws ConnectionException {
}
@Override
@@ -56,14 +46,20 @@ public class MockSuccessfulProviderAdapter implements ProviderAdapter {
return payload.toString();
}
+
+
@Override
- public void removeInactiveDevices(Notifier notifier,
- EntityManager em) throws Exception {
+ public void validateCreateNotifier(ServicePayload payload) throws Exception {
+ }
+
+ @Override
+ public void stop() {
}
@Override
- public void validateCreateNotifier(ServicePayload payload) throws Exception {
+ public Notifier getNotifier() {
+ return null;
}
@Override
@@ -71,7 +67,12 @@ public class MockSuccessfulProviderAdapter implements ProviderAdapter {
}
@Override
- public void sendNotification(String providerId, Notifier notifier,
+ public void removeInactiveDevices() throws Exception {
+
+ }
+
+ @Override
+ public void sendNotification(String providerId,
Object payload, Notification notification, final TaskTracker tracker)
throws Exception {
new Thread() {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f4584c4/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
index ea0b7da..3795be1 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
@@ -240,51 +240,6 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
}
}
- @Ignore("todo: how can I mock this?")
- @Test
- public void providerIdUpdate() throws Exception {
-
- // mock action (based on verified actual behavior) //
- final String newProviderId = "newProviderId";
- ns.providerAdapters.put("google", new MockSuccessfulProviderAdapter() {
- @Override
- public void sendNotification(String providerId, Notifier notifier,
- Object payload, Notification notification,
- TaskTracker tracker) throws Exception {
- tracker.completed(newProviderId);
- }
- });
-
- // create push notification //
-
- app.clear();
- String payload = "Hello, World!";
- Map<String, String> payloads = new HashMap<String, String>(1);
- payloads.put(notifier.getUuid().toString(), payload);
- app.put("payloads", payloads);
- app.put("queued", System.currentTimeMillis());
- app.put("debug",true);
-
- Entity e = app.testRequest(ServiceAction.POST, 1,"devices",device1.getUuid(), "notifications")
- .getEntity();
- app.testRequest(ServiceAction.GET, 1, "notifications", e.getUuid());
-
- Notification notification = app.getEm().get(e.getUuid(),
- Notification.class);
- assertEquals(
- notification.getPayloads().get(notifier.getUuid().toString()),
- payload);
-
- ns.addDevice(notification, device1);
-
- // perform push //
- notification = scheduleNotificationAndWait(notification);
- checkReceipts(notification, 1);
-
- Device device = (Device) app.getEm().get(device1).toTypedEntity();
- assertEquals(newProviderId,
- device.getProperty(notifier.getName() + NOTIFIER_ID_POSTFIX));
- }
@Test
public void badPayloads() throws Exception {
@@ -353,20 +308,6 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
@Test
public void badToken() throws Exception {
- // mock action (based on verified actual behavior) //
- if (!USE_REAL_CONNECTIONS) {
- ns.providerAdapters.put("google",
- new MockSuccessfulProviderAdapter() {
- @Override
- public void sendNotification(String providerId,
- Notifier notifier, Object payload,
- Notification notification, TaskTracker tracker)
- throws Exception {
- tracker.failed("InvalidRegistration",
- "InvalidRegistration");
- }
- });
- }
// create push notification //
@@ -410,21 +351,6 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
@Test
public void badAPIKey() throws Exception {
- if (!USE_REAL_CONNECTIONS) {
- // mock action (based on verified actual behavior) //
- ns.providerAdapters.put("google",
- new MockSuccessfulProviderAdapter() {
- @Override
- public void sendNotification(String providerId,
- Notifier notifier, Object payload,
- Notification notification, TaskTracker tracker)
- throws Exception {
- Exception e = new IOException();
- throw new ConnectionException(e.getMessage(), e);
- }
- });
- }
-
// create push notification //
app.clear();
[20/22] git commit: fix google push logic
Posted by sn...@apache.org.
fix google push logic
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/610eb2da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/610eb2da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/610eb2da
Branch: refs/heads/two-dot-o-events
Commit: 610eb2da7202a899d41637ddd513488861e2bb16
Parents: ca78792
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Oct 29 11:59:50 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Oct 29 11:59:50 2014 -0600
----------------------------------------------------------------------
.../services/notifications/gcm/GCMAdapter.java | 91 +++++++++++---------
1 file changed, 52 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/610eb2da/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
index e5867ea..36cb7f0 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
@@ -79,20 +79,24 @@ public class GCMAdapter implements ProviderAdapter {
batch.add(providerId, tracker);
}
- synchronized private Batch getBatch( Map<String, Object> payload) {
- long hash = MurmurHash.hash64(payload);
- Batch batch = batches.get(hash);
- if (batch == null && payload != null) {
- batch = new Batch(notifier,payload);
- batches.put(hash,batch);
+ private Batch getBatch( Map<String, Object> payload) {
+ synchronized (this) {
+ long hash = MurmurHash.hash64(payload);
+ Batch batch = batches.get(hash);
+ if (batch == null && payload != null) {
+ batch = new Batch(notifier, payload);
+ batches.put(hash, batch);
+ }
+ return batch;
}
- return batch;
}
@Override
- synchronized public void doneSendingNotifications() throws Exception {
- for (Batch batch : batches.values()) {
- batch.send();
+ public void doneSendingNotifications() throws Exception {
+ synchronized (this) {
+ for (Batch batch : batches.values()) {
+ batch.send();
+ }
}
}
@@ -163,11 +167,18 @@ public class GCMAdapter implements ProviderAdapter {
return map;
}
- synchronized void add(String id, TaskTracker tracker) throws Exception {
- ids.add(id);
- trackers.add(tracker);
- if (ids.size() == BATCH_SIZE) {
- send();
+ void add(String id, TaskTracker tracker) throws Exception {
+ synchronized (this) {
+ if(!ids.contains(id)) { //dedupe to a device
+ ids.add(id);
+ trackers.add(tracker);
+ if (ids.size() == BATCH_SIZE) {
+ send();
+ }
+ }else{
+ tracker.completed();
+ }
+
}
}
@@ -177,33 +188,35 @@ public class GCMAdapter implements ProviderAdapter {
// anything that JSONValue can handle is fine.
// (What is necessary here is that the Map needs to have a nested
// structure.)
- synchronized void send() throws Exception {
- if (ids.size() == 0)
- return;
- Sender sender = new Sender(notifier.getApiKey());
- Message.Builder builder = new Message.Builder();
- builder.setData(payload);
- Message message = builder.build();
-
- MulticastResult multicastResult = sender.send(message, ids, SEND_RETRIES);
- LOG.debug("sendNotification result: {}", multicastResult);
-
- for (int i = 0; i < multicastResult.getResults().size(); i++) {
- Result result = multicastResult.getResults().get(i);
-
- if (result.getMessageId() != null) {
- String canonicalRegId = result.getCanonicalRegistrationId();
- trackers.get(i).completed(canonicalRegId);
- } else {
- String error = result.getErrorCodeName();
- trackers.get(i).failed(error, error);
- if (Constants.ERROR_NOT_REGISTERED.equals(error) || Constants.ERROR_INVALID_REGISTRATION.equals(error)) {
- inactiveDevices.put(ids.get(i), new Date());
+ void send() throws Exception {
+ synchronized (this) {
+ if (ids.size() == 0)
+ return;
+ Sender sender = new Sender(notifier.getApiKey());
+ Message.Builder builder = new Message.Builder();
+ builder.setData(payload);
+ Message message = builder.build();
+
+ MulticastResult multicastResult = sender.send(message, ids, SEND_RETRIES);
+ LOG.debug("sendNotification result: {}", multicastResult);
+
+ for (int i = 0; i < multicastResult.getResults().size(); i++) {
+ Result result = multicastResult.getResults().get(i);
+
+ if (result.getMessageId() != null) {
+ String canonicalRegId = result.getCanonicalRegistrationId();
+ trackers.get(i).completed(canonicalRegId);
+ } else {
+ String error = result.getErrorCodeName();
+ trackers.get(i).failed(error, error);
+ if (Constants.ERROR_NOT_REGISTERED.equals(error) || Constants.ERROR_INVALID_REGISTRATION.equals(error)) {
+ inactiveDevices.put(ids.get(i), new Date());
+ }
}
}
+ this.ids.clear();
+ this.trackers.clear();
}
- this.ids.clear();
- this.trackers.clear();
}
}
}
[17/22] git commit: comments on interface
Posted by sn...@apache.org.
comments on interface
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c51c0ce4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c51c0ce4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c51c0ce4
Branch: refs/heads/two-dot-o-events
Commit: c51c0ce4e55cbf503c0fe2194d3da4697ab190f2
Parents: 8f4720d
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Oct 29 11:08:31 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Oct 29 11:08:31 2014 -0600
----------------------------------------------------------------------
.../notifications/ApplicationQueueManager.java | 21 +++++++++++++++++++-
1 file changed, 20 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c51c0ce4/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index 8012b42..101c839 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -28,7 +28,7 @@ import rx.Observable;
import java.util.List;
/**
- * Classy class class.
+ * Manages Queues for Applications
*/
public interface ApplicationQueueManager {
@@ -38,11 +38,30 @@ public interface ApplicationQueueManager {
public static final String DEFAULT_QUEUE_NAME = "push_v1";
+ /**
+ * send notification to queue
+ * @param notification
+ * @param jobExecution
+ * @throws Exception
+ */
void queueNotification(Notification notification, JobExecution jobExecution) throws Exception;
+ /**
+ * send notifications to providers
+ * @param messages
+ * @param queuePath
+ * @return
+ */
Observable sendBatchToProviders(List<QueueMessage> messages, String queuePath);
+ /**
+ * stop processing and send message to providers to stop
+ */
void stop();
+ /**
+ * check for inactive devices, apple and google require this
+ * @throws Exception
+ */
void asyncCheckForInactiveDevices() throws Exception;
}
[12/22] git commit: remove unneeded refs
Posted by sn...@apache.org.
remove unneeded refs
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8bd1ab46
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8bd1ab46
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8bd1ab46
Branch: refs/heads/two-dot-o-events
Commit: 8bd1ab46f230c3ab7e28a9c2c295844735e28ff5
Parents: 0f4584c
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Oct 29 10:01:02 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Oct 29 10:01:02 2014 -0600
----------------------------------------------------------------------
.../services/notifiers/NotifiersService.java | 17 +----------------
1 file changed, 1 insertion(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8bd1ab46/stack/services/src/main/java/org/apache/usergrid/services/notifiers/NotifiersService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifiers/NotifiersService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifiers/NotifiersService.java
index ae089fe..3e9ca3e 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifiers/NotifiersService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifiers/NotifiersService.java
@@ -16,22 +16,14 @@
*/
package org.apache.usergrid.services.notifiers;
-import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.entities.Notifier;
import org.apache.usergrid.services.notifications.ProviderAdapterFactory;
-import org.apache.usergrid.services.notifications.TestAdapter;
-import org.apache.usergrid.services.notifications.apns.APNsAdapter;
-import org.apache.usergrid.services.notifications.gcm.GCMAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.services.*;
import org.apache.usergrid.services.notifications.NotificationsService;
import org.apache.usergrid.services.notifications.ProviderAdapter;
-
import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
public class NotifiersService extends AbstractCollectionService {
@@ -46,15 +38,7 @@ public class NotifiersService extends AbstractCollectionService {
@Override
public ServiceResults postCollection(ServiceContext context)
throws Exception {
-
ServicePayload payload = context.getPayload();
-
- NotificationsService ns = (NotificationsService) sm
- .getService("notifications");
-
- String provider = payload.getStringProperty("provider");
-
-
ServiceResults results = super.postCollection(context);
Notifier notifier = (Notifier) results.getEntity();
if (notifier != null) {
@@ -66,6 +50,7 @@ public class NotifiersService extends AbstractCollectionService {
+ Arrays.toString(ProviderAdapterFactory.getValidProviders()));
}
providerAdapter.validateCreateNotifier(payload);
+ NotificationsService ns = (NotificationsService) sm.getService("notifications");
ns.testConnection(notifier);
} catch (Exception e) {
logger.info("notifier testConnection() failed", e);
[19/22] git commit: fix gcm batching
Posted by sn...@apache.org.
fix gcm batching
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ca787926
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ca787926
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ca787926
Branch: refs/heads/two-dot-o-events
Commit: ca78792686f1cd06c984045d4a27921009134866
Parents: a62df3a
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Oct 29 11:44:51 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Oct 29 11:44:51 2014 -0600
----------------------------------------------------------------------
.../services/notifications/gcm/GCMAdapter.java | 24 +++++++++++---------
1 file changed, 13 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ca787926/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
index e0b32ee..e5867ea 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
@@ -16,6 +16,7 @@
*/
package org.apache.usergrid.services.notifications.gcm;
+import com.clearspring.analytics.hash.MurmurHash;
import com.google.android.gcm.server.*;
import org.apache.usergrid.persistence.entities.Notification;
import org.apache.usergrid.persistence.entities.Notifier;
@@ -34,6 +35,8 @@ import org.apache.usergrid.services.notifications.TaskTracker;
import java.io.IOException;
import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
public class GCMAdapter implements ProviderAdapter {
@@ -43,11 +46,12 @@ public class GCMAdapter implements ProviderAdapter {
private final Notifier notifier;
private EntityManager entityManager;
- private Map<Notifier, Batch> notifierBatches = new HashMap<>();
+ private ConcurrentHashMap<Long,Batch> batches;
public GCMAdapter(EntityManager entityManager,Notifier notifier){
this.notifier = notifier;
this.entityManager = entityManager;
+ batches = new ConcurrentHashMap<>();
}
@Override
public void testConnection() throws ConnectionException {
@@ -62,8 +66,7 @@ public class GCMAdapter implements ProviderAdapter {
}
@Override
- public void sendNotification(String providerId,
- Object payload, Notification notification, TaskTracker tracker)
+ public void sendNotification(String providerId, Object payload, Notification notification, TaskTracker tracker)
throws Exception {
Map<String,Object> map = (Map<String, Object>) payload;
final String expiresKey = "time_to_live";
@@ -77,17 +80,18 @@ public class GCMAdapter implements ProviderAdapter {
}
synchronized private Batch getBatch( Map<String, Object> payload) {
- Batch batch = notifierBatches.get(notifier);
+ long hash = MurmurHash.hash64(payload);
+ Batch batch = batches.get(hash);
if (batch == null && payload != null) {
- batch = new Batch(notifier, payload);
- notifierBatches.put(notifier, batch);
+ batch = new Batch(notifier,payload);
+ batches.put(hash,batch);
}
return batch;
}
@Override
synchronized public void doneSendingNotifications() throws Exception {
- for (Batch batch : notifierBatches.values()) {
+ for (Batch batch : batches.values()) {
batch.send();
}
}
@@ -95,9 +99,8 @@ public class GCMAdapter implements ProviderAdapter {
@Override
public void removeInactiveDevices( ) throws Exception {
Batch batch = getBatch( null);
- Map<String,Date> map = null;
if(batch != null) {
- map = batch.getAndClearInactiveDevices();
+ Map<String,Date> map = batch.getAndClearInactiveDevices();
InactiveDeviceManager deviceManager = new InactiveDeviceManager(notifier,entityManager);
deviceManager.removeInactiveDevices(map);
}
@@ -147,7 +150,7 @@ public class GCMAdapter implements ProviderAdapter {
private List<TaskTracker> trackers;
private Map<String, Date> inactiveDevices = new HashMap<String, Date>();
- Batch(Notifier notifier, Map<String, Object> payload) {
+ Batch(Notifier notifier, Map<String,Object> payload) {
this.notifier = notifier;
this.payload = payload;
this.ids = new ArrayList<String>();
@@ -163,7 +166,6 @@ public class GCMAdapter implements ProviderAdapter {
synchronized void add(String id, TaskTracker tracker) throws Exception {
ids.add(id);
trackers.add(tracker);
-
if (ids.size() == BATCH_SIZE) {
send();
}
[15/22] git commit: adding docs
Posted by sn...@apache.org.
adding docs
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/955a92bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/955a92bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/955a92bc
Branch: refs/heads/two-dot-o-events
Commit: 955a92bc4d29a652f5398e980ac2322095f44577
Parents: 78e34b2
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Oct 29 10:44:39 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Oct 29 10:44:39 2014 -0600
----------------------------------------------------------------------
.../services/notifications/ProviderAdapter.java | 30 ++++++++++++++++++++
.../notifications/apns/APNsAdapter.java | 25 ++++++++--------
2 files changed, 43 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/955a92bc/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapter.java
index 8acd006..1783882 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapter.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapter.java
@@ -34,8 +34,20 @@ import org.apache.usergrid.services.ServicePayload;
*/
public interface ProviderAdapter {
+ /**
+ * test the connection
+ * @throws ConnectionException
+ */
public void testConnection() throws ConnectionException;
+ /**
+ * send a notification
+ * @param providerId
+ * @param payload
+ * @param notification
+ * @param tracker
+ * @throws Exception
+ */
public void sendNotification(String providerId, Object payload, Notification notification, TaskTracker tracker)
throws Exception;
@@ -45,12 +57,30 @@ public interface ProviderAdapter {
*/
public void doneSendingNotifications() throws Exception;
+ /**
+ * remove inactive devices
+ * @throws Exception
+ */
public void removeInactiveDevices() throws Exception;
+ /**
+ * translate payload for each notifier
+ * @param payload
+ * @return
+ * @throws Exception
+ */
public Object translatePayload(Object payload) throws Exception;
+ /**
+ * Validate payload from services
+ * @param payload
+ * @throws Exception
+ */
public void validateCreateNotifier(ServicePayload payload) throws Exception;
+ /**
+ * stop the adapter when you are done, so it can quit processing notifications
+ */
public void stop();
public Notifier getNotifier();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/955a92bc/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
index da5d3e5..79a3b9e 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
@@ -187,22 +187,23 @@ public class APNsAdapter implements ProviderAdapter {
}
@Override
- public void stop(){
- try {
- if (!pushManager.isShutDown()) {
- List<SimpleApnsPushNotification> notifications = pushManager.shutdown(3000);
- for (SimpleApnsPushNotification notification1 : notifications) {
- try {
- ((APNsNotification) notification1).messageSendFailed(new Exception("Cache Expired: Shutting down sender"));
- }catch (Exception e){
- logger.error("Failed to mark notification",e);
- }
+ public void stop() {
+ try {
+ if (!pushManager.isShutDown()) {
+ List<SimpleApnsPushNotification> notifications = pushManager.shutdown(3000);
+ for (SimpleApnsPushNotification notification1 : notifications) {
+ try {
+ ((APNsNotification) notification1).messageSendFailed(new Exception("Cache Expired: Shutting down sender"));
+ } catch (Exception e) {
+ logger.error("Failed to mark notification", e);
}
}
- } catch (Exception ie) {
- logger.error("Failed to shutdown from cache", ie);
}
+ } catch (Exception ie) {
+ logger.error("Failed to shutdown from cache", ie);
+ }
}
+
@Override
public Notifier getNotifier(){return notifier;}
[14/22] git commit: Merge branch 'two-dot-o' of
https://git-wip-us.apache.org/repos/asf/incubator-usergrid into pushy_4-0
Posted by sn...@apache.org.
Merge branch 'two-dot-o' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into pushy_4-0
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/78e34b2b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/78e34b2b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/78e34b2b
Branch: refs/heads/two-dot-o-events
Commit: 78e34b2bfba8e699a4525426f0888b82f5f8b597
Parents: 25e36e6 a100c07
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Oct 29 10:29:06 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Oct 29 10:29:06 2014 -0600
----------------------------------------------------------------------
.../main/resources/usergrid-default.properties | 3 +
.../corepersistence/CpEntityManager.java | 73 +-
.../corepersistence/CpEntityManagerFactory.java | 53 +-
.../corepersistence/CpManagerCache.java | 117 ++-
.../corepersistence/CpRelationManager.java | 944 +++++++++++--------
.../persistence/index/query/EntityResults.java | 6 +-
stack/loadtests/src/main/scripts/gatling-mvn.sh | 28 +
7 files changed, 767 insertions(+), 457 deletions(-)
----------------------------------------------------------------------
[02/22] git commit: rewrite remove inactive devices
Posted by sn...@apache.org.
rewrite remove inactive devices
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/e1b2e736
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/e1b2e736
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/e1b2e736
Branch: refs/heads/two-dot-o-events
Commit: e1b2e7365e84c4d0a5a02fdae10d7adac80e5263
Parents: d5d859b
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Oct 24 16:07:40 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Oct 24 16:07:40 2014 -0600
----------------------------------------------------------------------
.../usergrid/persistence/entities/Notifier.java | 10 ++-
.../notifications/ApplicationQueueManager.java | 90 +++++---------------
.../notifications/InactiveDeviceManager.java | 77 +++++++++++++++++
.../notifications/NotificationsService.java | 1 +
.../services/notifications/ProviderAdapter.java | 2 +-
.../services/notifications/QueueListener.java | 24 ++++--
.../notifications/apns/APNsAdapter.java | 68 ++++-----------
.../notifications/apns/EntityPushManager.java | 72 ++++++++++++++++
.../apns/ExpiredTokenListener.java | 60 +++++++++++++
.../services/notifications/gcm/GCMAdapter.java | 7 +-
10 files changed, 280 insertions(+), 131 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e1b2e736/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notifier.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notifier.java
index be0b447..ba3f2fc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notifier.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notifier.java
@@ -18,6 +18,7 @@ package org.apache.usergrid.persistence.entities;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.TypedEntity;
import org.apache.usergrid.persistence.annotations.EntityProperty;
@@ -34,6 +35,8 @@ public class Notifier extends TypedEntity {
public static final String ENTITY_TYPE = "notifier";
+ protected EntityManager entityManager;
+
@EntityProperty(aliasProperty = true, unique = true, basic = true)
protected String name;
@@ -63,8 +66,6 @@ public class Notifier extends TypedEntity {
uuid = id;
}
-
-
@Override
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
public String getName() {
@@ -131,5 +132,10 @@ public class Notifier extends TypedEntity {
this.apiKey = apiKey;
}
+ @JsonIgnore
+ public EntityManager getEntityManager(){return entityManager;}
+
+ public void setEntityManager(EntityManager entityManager){ this.entityManager = entityManager;}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e1b2e736/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index a83513f..3fd84a6 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -63,16 +63,7 @@ public class ApplicationQueueManager {
HashMap<Object, Notifier> notifierHashMap; // only retrieve notifiers once
- public final Map<String, ProviderAdapter> providerAdapters = new HashMap<String, ProviderAdapter>(3);
- {
- providerAdapters.put("apple", APNS_ADAPTER);
- providerAdapters.put("google", new GCMAdapter());
- providerAdapters.put("noop", TEST_ADAPTER);
- };
- // these 2 can be static, but GCM can't. future: would be nice to get gcm
- // static as well...
- public static ProviderAdapter APNS_ADAPTER = new APNsAdapter();
- public static ProviderAdapter TEST_ADAPTER = new TestAdapter();
+ public final Map<String, ProviderAdapter> providerAdapters;
public ApplicationQueueManager(JobScheduler jobScheduler, EntityManager entityManager, QueueManager queueManager, MetricsFactory metricsFactory, Properties properties){
@@ -81,6 +72,12 @@ public class ApplicationQueueManager {
this.jobScheduler = jobScheduler;
this.metricsFactory = metricsFactory;
this.queueName = getQueueNames(properties);
+ providerAdapters = new HashMap<String, ProviderAdapter>(3);
+ {
+ providerAdapters.put("apple", new APNsAdapter());
+ providerAdapters.put("google", new GCMAdapter());
+ providerAdapters.put("noop", new TestAdapter());
+ };
}
public boolean scheduleQueueJob(Notification notification) throws Exception{
@@ -254,6 +251,7 @@ public class ApplicationQueueManager {
int count = 0;
while (notifierIterator.hasNext()) {
Notifier notifier = notifierIterator.next();
+ notifier.setEntityManager(em);
String name = notifier.getName() != null ? notifier.getName() : "";
UUID uuid = notifier.getUuid() != null ? notifier.getUuid() : UUID.randomUUID();
notifierHashMap.put(name.toLowerCase(), notifier);
@@ -454,71 +452,27 @@ public class ApplicationQueueManager {
}
}
- public void asyncCheckForInactiveDevices(Set<Notifier> notifiers) throws Exception {
+ public void asyncCheckForInactiveDevices() throws Exception {
+ Collection<Notifier> notifiers = getNotifierMap().values();
for (final Notifier notifier : notifiers) {
- INACTIVE_DEVICE_CHECK_POOL.execute(new Runnable() {
- @Override
- public void run() {
- try {
- checkForInactiveDevices(notifier);
- } catch (Exception e) {
- LOG.error("checkForInactiveDevices", e); // not
- // essential so
- // don't fail,
- // but log
- }
- }
- });
- }
- }
+ try {
+ ProviderAdapter providerAdapter = providerAdapters.get(notifier.getProvider());
+ if (providerAdapter != null) {
+ LOG.debug("checking notifier {} for inactive devices", notifier);
+ providerAdapter.removeInactiveDevices(notifier, em);
- /** gets the list of inactive devices from the Provider and updates them */
- private void checkForInactiveDevices(Notifier notifier) throws Exception {
- ProviderAdapter providerAdapter = providerAdapters.get(notifier
- .getProvider());
- if (providerAdapter != null) {
- LOG.debug("checking notifier {} for inactive devices", notifier);
- Map<String, Date> inactiveDeviceMap = providerAdapter
- .getInactiveDevices(notifier, em);
-
- if (inactiveDeviceMap != null && inactiveDeviceMap.size() > 0) {
- LOG.debug("processing {} inactive devices",
- inactiveDeviceMap.size());
- Map<String, Object> clearPushtokenMap = new HashMap<String, Object>(
- 2);
- clearPushtokenMap.put(notifier.getName() + NOTIFIER_ID_POSTFIX,
- "");
- clearPushtokenMap.put(notifier.getUuid() + NOTIFIER_ID_POSTFIX,
- "");
-
- // todo: this could be done in a single query
- for (Map.Entry<String, Date> entry : inactiveDeviceMap
- .entrySet()) {
- // name
- Query query = new Query();
- query.addEqualityFilter(notifier.getName()
- + NOTIFIER_ID_POSTFIX, entry.getKey());
- Results results = em.searchCollection(em.getApplication(),
- "devices", query);
- for (Entity e : results.getEntities()) {
- em.updateProperties(e, clearPushtokenMap);
- }
- // uuid
- query = new Query();
- query.addEqualityFilter(notifier.getUuid()
- + NOTIFIER_ID_POSTFIX, entry.getKey());
- results = em.searchCollection(em.getApplication(),
- "devices", query);
- for (Entity e : results.getEntities()) {
- em.updateProperties(e, clearPushtokenMap);
- }
+ LOG.debug("finished checking notifier {} for inactive devices",notifier);
}
+ } catch (Exception e) {
+ LOG.error("checkForInactiveDevices", e); // not
+ // essential so
+ // don't fail,
+ // but log
}
- LOG.debug("finished checking notifier {} for inactive devices",
- notifier);
}
}
+
private boolean isOkToSend(Notification notification) {
Map<String,Long> stats = notification.getStatistics();
if (stats != null && notification.getExpectedCount() == (stats.get("sent")+ stats.get("errors"))) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e1b2e736/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
new file mode 100644
index 0000000..ecee485
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.services.notifications;
+
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.entities.Notifier;
+import org.apache.usergrid.persistence.index.query.Query;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Classy class class.
+ */
+public class InactiveDeviceManager {
+ private static final Logger LOG = LoggerFactory.getLogger(InactiveDeviceManager.class);
+ private final Notifier notifier;
+
+ public InactiveDeviceManager(Notifier notifier){
+ this.notifier = notifier;
+ }
+ public void removeInactiveDevices( Map<String,Date> inactiveDeviceMap ){
+ final String notfierPostFix = ApplicationQueueManager.NOTIFIER_ID_POSTFIX;
+ final EntityManager em = notifier.getEntityManager();
+ if (inactiveDeviceMap != null && inactiveDeviceMap.size() > 0) {
+ LOG.debug("processing {} inactive devices", inactiveDeviceMap.size());
+ Map<String, Object> clearPushtokenMap = new HashMap<String, Object>( 2);
+ clearPushtokenMap.put(notifier.getName() + notfierPostFix, "");
+ clearPushtokenMap.put(notifier.getUuid() + notfierPostFix, "");
+
+ // todo: this could be done in a single query
+ for (Map.Entry<String, Date> entry : inactiveDeviceMap.entrySet()) {
+ try {
+ // name
+ Query query = new Query();
+ query.addEqualityFilter(notifier.getName() + notfierPostFix, entry.getKey());
+ Results results = em.searchCollection(em.getApplication(), "devices", query);
+ for (Entity e : results.getEntities()) {
+ em.updateProperties(e, clearPushtokenMap);
+ }
+ // uuid
+ query = new Query();
+ query.addEqualityFilter(notifier.getUuid() + notfierPostFix, entry.getKey());
+ results = em.searchCollection(em.getApplication(), "devices", query);
+ for (Entity e : results.getEntities()) {
+ em.updateProperties(e, clearPushtokenMap);
+ }
+ }catch (Exception e){
+ LOG.error("failed to remove token",e);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e1b2e736/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index 69f66e5..5420d29 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -356,6 +356,7 @@ public class NotificationsService extends AbstractCollectionService {
public void testConnection(Notifier notifier) throws Exception {
ProviderAdapter providerAdapter = providerAdapters.get(notifier.getProvider());
if (providerAdapter != null) {
+ notifier.setEntityManager(em);
providerAdapter.testConnection(notifier);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e1b2e736/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapter.java
index 5268674..33e921f 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapter.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapter.java
@@ -46,7 +46,7 @@ public interface ProviderAdapter {
*/
public void doneSendingNotifications() throws Exception;
- public Map<String, Date> getInactiveDevices(Notifier notifier,
+ public void removeInactiveDevices(Notifier notifier,
EntityManager em) throws Exception;
public Object translatePayload(Object payload) throws Exception;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e1b2e736/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 7ce315b..432ad7f 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
import rx.Observable;
import javax.annotation.PostConstruct;
import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -151,7 +152,7 @@ public class QueueListener {
LOG.info("retrieved batch of {} messages from queue {} ", messages.size(),queueName);
if (messages.size() > 0) {
-
+ Map<UUID,ApplicationQueueManager> queueManagerMap = new ConcurrentHashMap<>();
HashMap<UUID, List<QueueMessage>> messageMap = new HashMap<>(messages.size());
//group messages into hash map by app id
for (QueueMessage message : messages) {
@@ -172,14 +173,18 @@ public class QueueListener {
UUID applicationId = entry.getKey();
EntityManager entityManager = emf.getEntityManager(applicationId);
ServiceManager serviceManager = smf.getServiceManager(applicationId);
- final ApplicationQueueManager manager = new ApplicationQueueManager(
- new JobScheduler(serviceManager, entityManager),
- entityManager,
- queueManager,
- metricsService,
- properties
- );
+ ApplicationQueueManager manager = queueManagerMap.get(applicationId);
+ if(manager==null) {
+ manager = new ApplicationQueueManager(
+ new JobScheduler(serviceManager, entityManager),
+ entityManager,
+ queueManager,
+ metricsService,
+ properties
+ );
+ queueManagerMap.put(applicationId,manager);
+ }
LOG.info("send batch for app {} of {} messages", entry.getKey(), entry.getValue().size());
Observable current = manager.sendBatchToProviders(entry.getValue(),queueName);
if(merge == null)
@@ -192,6 +197,9 @@ public class QueueListener {
merge.toBlocking().lastOrDefault(null);
}
queueManager.commitMessages(messages);
+ for(ApplicationQueueManager applicationQueueManager : queueManagerMap.values()){
+ applicationQueueManager.asyncCheckForInactiveDevices();
+ }
meter.mark(messages.size());
LOG.info("sent batch {} messages duration {} ms", messages.size(),System.currentTimeMillis() - now);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e1b2e736/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
index 4277b8a..7ab1dc3 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
@@ -47,8 +47,7 @@ import javax.net.ssl.SSLContext;
*/
public class APNsAdapter implements ProviderAdapter {
- private static final Logger logger = LoggerFactory
- .getLogger(APNsAdapter.class);
+ private static final Logger logger = LoggerFactory.getLogger(APNsAdapter.class);
public static int MAX_CONNECTION_POOL_SIZE = 15;
private static final Set<String> validEnvironments = new HashSet<String>();
@@ -72,7 +71,7 @@ public class APNsAdapter implements ProviderAdapter {
try {
CountDownLatch latch = new CountDownLatch(1);
notification.setLatch(latch);
- PushManager<SimpleApnsPushNotification> pushManager = getPushManager(notifier);
+ EntityPushManager pushManager = getPushManager(notifier);
addToQueue(pushManager, notification);
latch.await(10000,TimeUnit.MILLISECONDS);
if(notification.hasFailed()){
@@ -80,10 +79,10 @@ public class APNsAdapter implements ProviderAdapter {
throw new ConnectionException("Bad certificate. Double-check your environment.",notification.getCause() != null ? notification.getCause() : new Exception("Bad certificate."));
}
notification.finished();
- } catch (Exception e) {
- notification.finished();
+ } catch (Exception e) {
+ notification.finished();
- if (e instanceof ConnectionException) {
+ if (e instanceof ConnectionException) {
throw (ConnectionException) e;
}
if (e instanceof InterruptedException) {
@@ -120,16 +119,13 @@ public class APNsAdapter implements ProviderAdapter {
}
@Override
- public Map<String, Date> getInactiveDevices(Notifier notifier,
- EntityManager em) throws Exception {
- Map<String,Date> map = new HashMap<String,Date>();
+ public void removeInactiveDevices(Notifier notifier,EntityManager em) throws Exception {
PushManager<SimpleApnsPushNotification> pushManager = getPushManager(notifier);
pushManager.requestExpiredTokens();
- return map;
}
- private PushManager<SimpleApnsPushNotification> getPushManager(Notifier notifier) throws ExecutionException {
- PushManager<SimpleApnsPushNotification> pushManager = apnsServiceMap.get(notifier);
+ private EntityPushManager getPushManager(Notifier notifier) throws ExecutionException {
+ EntityPushManager pushManager = apnsServiceMap.get(notifier);
if(pushManager != null && !pushManager.isStarted() && pushManager.isShutDown()){
apnsServiceMap.invalidate(notifier);
pushManager = apnsServiceMap.get(notifier);
@@ -138,15 +134,15 @@ public class APNsAdapter implements ProviderAdapter {
}
//cache to retrieve push manager, cached per notifier, so many notifications will get same push manager
- private static LoadingCache<Notifier, PushManager<SimpleApnsPushNotification>> apnsServiceMap = CacheBuilder
+ private static LoadingCache<Notifier, EntityPushManager> apnsServiceMap = CacheBuilder
.newBuilder()
.expireAfterAccess(10, TimeUnit.MINUTES)
- .removalListener(new RemovalListener<Notifier, PushManager<SimpleApnsPushNotification>>() {
+ .removalListener(new RemovalListener<Notifier, EntityPushManager>() {
@Override
public void onRemoval(
- RemovalNotification<Notifier, PushManager<SimpleApnsPushNotification>> notification) {
+ RemovalNotification<Notifier,EntityPushManager> notification) {
try {
- PushManager<SimpleApnsPushNotification> manager = notification.getValue();
+ EntityPushManager manager = notification.getValue();
if (!manager.isShutDown()) {
List<SimpleApnsPushNotification> notifications = manager.shutdown(3000);
for (SimpleApnsPushNotification notification1 : notifications) {
@@ -161,31 +157,20 @@ public class APNsAdapter implements ProviderAdapter {
logger.error("Failed to shutdown from cache", ie);
}
}
- }).build(new CacheLoader<Notifier, PushManager<SimpleApnsPushNotification>>() {
+ }).build(new CacheLoader<Notifier, EntityPushManager>() {
@Override
- public PushManager<SimpleApnsPushNotification> load(Notifier notifier) {
+ public EntityPushManager load(final Notifier notifier) {
try {
- LinkedBlockingQueue<SimpleApnsPushNotification> queue = new LinkedBlockingQueue<SimpleApnsPushNotification>();
- NioEventLoopGroup group = new NioEventLoopGroup();
PushManagerConfiguration config = new PushManagerConfiguration();
config.setConcurrentConnectionCount(Runtime.getRuntime().availableProcessors() * 2);
- PushManager<SimpleApnsPushNotification> pushManager = new PushManager<>(getApnsEnvironment(notifier), getSSLContext(notifier), group,null , queue, config,notifier.getName());
+ EntityPushManager pushManager = new EntityPushManager(notifier, config);
//only tested when a message is sent
pushManager.registerRejectedNotificationListener(new RejectedAPNsListener());
//this will get tested when start is called
pushManager.registerFailedConnectionListener(new FailedConnectionListener());
+ //unregistered expired devices
+ pushManager.registerExpiredTokenListener(new ExpiredTokenListener());
- pushManager.registerExpiredTokenListener(new ExpiredTokenListener<SimpleApnsPushNotification>() {
- @Override
- public void handleExpiredTokens(PushManager<? extends SimpleApnsPushNotification> pushManager, Collection<ExpiredToken> expiredTokens) {
- Map<String,Date> map = new HashMap<String,Date>();
- for(ExpiredToken token : expiredTokens){
- String expiredToken = new String(token.getToken());
- map.put(expiredToken, token.getExpiration());
- }
- //TODO figure out way to call back and clear out em references
- }
- });
try {
if (!pushManager.isStarted()) { //ensure manager is started
pushManager.start();
@@ -253,24 +238,7 @@ public class APNsAdapter implements ProviderAdapter {
return wasDelayed;
}
- private static ApnsEnvironment getApnsEnvironment(Notifier notifier){
- return notifier.isProduction()
- ? ApnsEnvironment.getProductionEnvironment()
- : ApnsEnvironment.getSandboxEnvironment();
- }
- private static SSLContext getSSLContext(Notifier notifier) {
- try {
- KeyStore keyStore = KeyStore.getInstance("PKCS12");
- String password = notifier.getCertificatePassword();
- char[] passChars =(password != null ? password : "").toCharArray();
- InputStream stream = notifier.getP12CertificateStream();
- keyStore.load(stream,passChars);
- SSLContext context = SSLContextUtil.createDefaultSSLContext(keyStore, passChars);
- return context;
- }catch (Exception e){
- throw new RuntimeException("Error getting certificate",e);
- }
- }
}
+
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e1b2e736/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/EntityPushManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/EntityPushManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/EntityPushManager.java
new file mode 100644
index 0000000..29841f0
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/EntityPushManager.java
@@ -0,0 +1,72 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.services.notifications.apns;
+
+import com.relayrides.pushy.apns.ApnsEnvironment;
+import com.relayrides.pushy.apns.PushManager;
+import com.relayrides.pushy.apns.PushManagerConfiguration;
+import com.relayrides.pushy.apns.util.SSLContextUtil;
+import com.relayrides.pushy.apns.util.SimpleApnsPushNotification;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.entities.Notifier;
+
+import javax.net.ssl.SSLContext;
+import java.io.InputStream;
+import java.security.KeyStore;
+import java.util.concurrent.LinkedBlockingDeque;
+
+/**
+ * Classy class class.
+ */
+public class EntityPushManager extends PushManager<SimpleApnsPushNotification> {
+ private final Notifier notifier;
+
+ public EntityPushManager( Notifier notifier, PushManagerConfiguration configuration) {
+ super(getApnsEnvironment(notifier), getSSLContext(notifier), null, null, new LinkedBlockingDeque<SimpleApnsPushNotification>(), configuration, notifier.getName());
+ this.notifier = notifier;
+ }
+
+ public EntityManager getEntityManager() {
+ return notifier.getEntityManager();
+ }
+
+ public Notifier getNotifier() {
+ return notifier;
+ }
+ private static ApnsEnvironment getApnsEnvironment(Notifier notifier){
+ return notifier.isProduction()
+ ? ApnsEnvironment.getProductionEnvironment()
+ : ApnsEnvironment.getSandboxEnvironment();
+ }
+ private static SSLContext getSSLContext(Notifier notifier) {
+ try {
+ KeyStore keyStore = KeyStore.getInstance("PKCS12");
+ String password = notifier.getCertificatePassword();
+ char[] passChars =(password != null ? password : "").toCharArray();
+ InputStream stream = notifier.getP12CertificateStream();
+ keyStore.load(stream,passChars);
+ SSLContext context = SSLContextUtil.createDefaultSSLContext(keyStore, passChars);
+ return context;
+ }catch (Exception e){
+ throw new RuntimeException("Error getting certificate",e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e1b2e736/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java
new file mode 100644
index 0000000..4c38013
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.services.notifications.apns;
+
+import com.relayrides.pushy.apns.ExpiredToken;
+import com.relayrides.pushy.apns.PushManager;
+import com.relayrides.pushy.apns.util.SimpleApnsPushNotification;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.entities.Notifier;
+import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.services.notifications.ApplicationQueueManager;
+import org.apache.usergrid.services.notifications.InactiveDeviceManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Classy class class.
+ */
+public class ExpiredTokenListener implements com.relayrides.pushy.apns.ExpiredTokenListener<SimpleApnsPushNotification> {
+
+
+ @Override
+ public void handleExpiredTokens(PushManager<? extends SimpleApnsPushNotification> pushManager, Collection<ExpiredToken> expiredTokens) {
+ Map<String,Date> inactiveDeviceMap = new HashMap<>();
+ for(ExpiredToken token : expiredTokens){
+ String expiredToken = new String(token.getToken());
+ inactiveDeviceMap.put(expiredToken, token.getExpiration());
+ }
+ if(pushManager instanceof EntityPushManager){
+ EntityPushManager entityPushManager = (EntityPushManager) pushManager;
+ InactiveDeviceManager inactiveDeviceManager = new InactiveDeviceManager(entityPushManager.getNotifier());
+ inactiveDeviceManager.removeInactiveDevices(inactiveDeviceMap);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e1b2e736/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
index dca4a73..f8de5ff 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
@@ -19,6 +19,7 @@ package org.apache.usergrid.services.notifications.gcm;
import com.google.android.gcm.server.*;
import org.apache.usergrid.persistence.entities.Notification;
import org.apache.usergrid.persistence.entities.Notifier;
+import org.apache.usergrid.services.notifications.InactiveDeviceManager;
import org.mortbay.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,14 +88,16 @@ public class GCMAdapter implements ProviderAdapter {
}
@Override
- public Map<String, Date> getInactiveDevices(Notifier notifier,
+ public void removeInactiveDevices(Notifier notifier,
EntityManager em) throws Exception {
Batch batch = getBatch(notifier, null);
Map<String,Date> map = null;
if(batch != null) {
map = batch.getAndClearInactiveDevices();
+ InactiveDeviceManager deviceManager = new InactiveDeviceManager(notifier);
+ deviceManager.removeInactiveDevices(map);
}
- return map;
+
}
@Override
[07/22] git commit: fix config of inactive devices
Posted by sn...@apache.org.
fix config of inactive devices
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/bbed017f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/bbed017f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/bbed017f
Branch: refs/heads/two-dot-o-events
Commit: bbed017f80d04fb09b925304d0ae838ecfd3d329
Parents: f3343f1
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Oct 27 17:23:20 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Oct 27 17:23:20 2014 -0600
----------------------------------------------------------------------
.../org/apache/usergrid/services/notifications/QueueListener.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bbed017f/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 286daf1..ef87b3a 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -209,7 +209,7 @@ public class QueueListener {
LOG.info("sleep between rounds...sleep...{}", sleepBetweenRuns);
Thread.sleep(sleepBetweenRuns);
}
- if(runCount++ % consecutiveCallsToRemoveDevices == 0){
+ if(++runCount % consecutiveCallsToRemoveDevices == 0){
for(ApplicationQueueManager applicationQueueManager : queueManagerMap.values()){
try {
applicationQueueManager.asyncCheckForInactiveDevices();
[18/22] git commit: remove em
Posted by sn...@apache.org.
remove em
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a62df3a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a62df3a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a62df3a3
Branch: refs/heads/two-dot-o-events
Commit: a62df3a317e0a4d4b100e73025959396d6f30710
Parents: c51c0ce
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Oct 29 11:13:48 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Oct 29 11:13:48 2014 -0600
----------------------------------------------------------------------
.../java/org/apache/usergrid/persistence/entities/Notifier.java | 3 ---
1 file changed, 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a62df3a3/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notifier.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notifier.java
index 819ccde..1de2753 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notifier.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notifier.java
@@ -18,7 +18,6 @@ package org.apache.usergrid.persistence.entities;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.TypedEntity;
import org.apache.usergrid.persistence.annotations.EntityProperty;
@@ -35,8 +34,6 @@ public class Notifier extends TypedEntity {
public static final String ENTITY_TYPE = "notifier";
- protected EntityManager entityManager;
-
@EntityProperty(aliasProperty = true, unique = true, basic = true)
protected String name;
[04/22] git commit: fix build error
Posted by sn...@apache.org.
fix build error
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/e30a7b32
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/e30a7b32
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/e30a7b32
Branch: refs/heads/two-dot-o-events
Commit: e30a7b32258b95b1d5d74d6c9ba448dcc80a42f1
Parents: 19aa1a6
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Oct 24 16:19:36 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Oct 24 16:19:36 2014 -0600
----------------------------------------------------------------------
.../apns/MockSuccessfulProviderAdapter.java | 5 ++---
.../apns/NotificationsServiceIT.java | 19 +++----------------
.../gcm/MockSuccessfulProviderAdapter.java | 4 ++--
3 files changed, 7 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e30a7b32/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/MockSuccessfulProviderAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/MockSuccessfulProviderAdapter.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/MockSuccessfulProviderAdapter.java
index 8c26ef4..e864e4f 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/MockSuccessfulProviderAdapter.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/MockSuccessfulProviderAdapter.java
@@ -74,9 +74,8 @@ public class MockSuccessfulProviderAdapter implements ProviderAdapter {
}
@Override
- public Map<String, Date> getInactiveDevices(Notifier notifier,
- EntityManager em) throws Exception {
- return null;
+ public void removeInactiveDevices(Notifier notifier,
+ EntityManager em) {
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e30a7b32/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
index a834400..31a653c 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
@@ -686,15 +686,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
// mock action (based on verified actual behavior) //
if (!USE_REAL_CONNECTIONS) {
ns.providerAdapters.put("apple",
- new MockSuccessfulProviderAdapter() {
- @Override
- public Map<String, Date> getInactiveDevices(
- Notifier notifier, EntityManager em)
- throws Exception {
- return Collections.singletonMap(PUSH_TOKEN,
- new Date());
- }
- });
+ new MockSuccessfulProviderAdapter());
}
// create push notification //
@@ -716,9 +708,6 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
assertEquals(
notification.getPayloads().get(notifier.getUuid().toString()),
payload);
-//
-// ns.addDevice(notification, device1);
-// ns.addDevice(notification, device2);
assertNotNull(device1.getProperty(notifier.getName()
+ NOTIFIER_ID_POSTFIX));
@@ -731,11 +720,9 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
// check provider IDs //
device1 = app.getEm().get(device1, Device.class);
- assertNull(device1
- .getProperty(notifier.getName() + NOTIFIER_ID_POSTFIX));
+ assertNull(device1 .getProperty(notifier.getName() + NOTIFIER_ID_POSTFIX));
device2 = app.getEm().get(device2, Device.class);
- assertNull(device2
- .getProperty(notifier.getName() + NOTIFIER_ID_POSTFIX));
+ assertNull(device2 .getProperty(notifier.getName() + NOTIFIER_ID_POSTFIX));
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e30a7b32/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/MockSuccessfulProviderAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/MockSuccessfulProviderAdapter.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/MockSuccessfulProviderAdapter.java
index 4c258fe..3836fe8 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/MockSuccessfulProviderAdapter.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/MockSuccessfulProviderAdapter.java
@@ -57,9 +57,9 @@ public class MockSuccessfulProviderAdapter implements ProviderAdapter {
}
@Override
- public Map<String, Date> getInactiveDevices(Notifier notifier,
+ public void removeInactiveDevices(Notifier notifier,
EntityManager em) throws Exception {
- return null;
+
}
@Override
[10/22] git commit: add comments
Posted by sn...@apache.org.
add comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ad0cce91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ad0cce91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ad0cce91
Branch: refs/heads/two-dot-o-events
Commit: ad0cce91dadc20525a2442a8c8d0a102cbeb83ad
Parents: bb25b5b
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Oct 28 08:48:59 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Oct 28 08:48:59 2014 -0600
----------------------------------------------------------------------
.../org/apache/usergrid/services/notifications/QueueListener.java | 2 +-
.../usergrid/services/notifications/apns/EntityPushManager.java | 2 +-
.../usergrid/services/notifications/apns/ExpiredTokenListener.java | 2 +-
3 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ad0cce91/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index ef87b3a..fffc8cd 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -97,10 +97,10 @@ public class QueueListener {
sleepBetweenRuns = new Long(properties.getProperty("usergrid.notifications.listener.sleep.between", ""+sleepBetweenRuns)).longValue();
sleepWhenNoneFound = new Long(properties.getProperty("usergrid.notifications.listener.sleep.after", ""+DEFAULT_SLEEP)).longValue();
batchSize = new Integer(properties.getProperty("usergrid.notifications.listener.batchSize", (""+batchSize)));
+ consecutiveCallsToRemoveDevices = new Integer(properties.getProperty("usergrid.notifications.inactive.interval", ""+200));
queueName = ApplicationQueueManager.getQueueNames(properties);
int maxThreads = new Integer(properties.getProperty("usergrid.notifications.listener.maxThreads", ""+MAX_THREADS));
- consecutiveCallsToRemoveDevices = new Integer(properties.getProperty("usergrid.notifications.inactive.interval", ""+100));
futures = new ArrayList<Future>(maxThreads);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ad0cce91/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/EntityPushManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/EntityPushManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/EntityPushManager.java
index 29841f0..8283458 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/EntityPushManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/EntityPushManager.java
@@ -34,7 +34,7 @@ import java.security.KeyStore;
import java.util.concurrent.LinkedBlockingDeque;
/**
- * Classy class class.
+ * Store notifier within PushManager so it can be retrieved later. Need this for the async token listener
*/
public class EntityPushManager extends PushManager<SimpleApnsPushNotification> {
private final Notifier notifier;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ad0cce91/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java
index 4c38013..3daa20c 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java
@@ -39,7 +39,7 @@ import java.util.HashMap;
import java.util.Map;
/**
- * Classy class class.
+ * Listen for token expirations and remove
*/
public class ExpiredTokenListener implements com.relayrides.pushy.apns.ExpiredTokenListener<SimpleApnsPushNotification> {
[09/22] git commit: log failure
Posted by sn...@apache.org.
log failure
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/bb25b5b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/bb25b5b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/bb25b5b3
Branch: refs/heads/two-dot-o-events
Commit: bb25b5b38be755c4183b803189183ff953a712c1
Parents: 45d3ce4
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Oct 27 17:54:01 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Oct 27 17:54:01 2014 -0600
----------------------------------------------------------------------
.../usergrid/persistence/queue/impl/SQSQueueManagerImpl.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb25b5b3/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index 888370c..8c8ffff 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -207,7 +207,9 @@ public class SQSQueueManagerImpl implements QueueManager {
DeleteMessageBatchResult result = sqs.deleteMessageBatch(request);
boolean successful = result.getFailed().size() <= 0;
if(!successful){
- LOG.error("Commit failed {} messages", result.getFailed().size());
+ for( BatchResultErrorEntry failed : result.getFailed()) {
+ LOG.error("Commit failed reason: {} messages id: {}", failed.getMessage(),failed.getId());
+ }
}
}
[08/22] git commit: Merge branch 'two-dot-o' of
https://git-wip-us.apache.org/repos/asf/incubator-usergrid into pushy_4-0
Posted by sn...@apache.org.
Merge branch 'two-dot-o' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into pushy_4-0
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/45d3ce45
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/45d3ce45
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/45d3ce45
Branch: refs/heads/two-dot-o-events
Commit: 45d3ce454fa3897ab4ddffa45967da4927c1f7b2
Parents: bbed017 960ce6b
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Oct 27 17:39:51 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Oct 27 17:39:51 2014 -0600
----------------------------------------------------------------------
stack/loadtests/README.md | 52 -----
stack/loadtests/gatling/LICENSE | 202 -------------------
stack/loadtests/gatling/conf/application.conf | 21 --
stack/loadtests/gatling/conf/gatling.conf | 162 ---------------
stack/loadtests/gatling/conf/logback.xml | 35 ----
stack/loadtests/gatling/conf/recorder.conf | 51 -----
.../gatling/lib/Saxon-HE-9.5.1-6-compressed.jar | Bin 3813075 -> 0 bytes
.../gatling/lib/akka-actor_2.10-2.3.6.jar | Bin 2583959 -> 0 bytes
.../lib/async-http-client-1.9.0-BETA13.jar | Bin 579954 -> 0 bytes
stack/loadtests/gatling/lib/boon-0.26.jar | Bin 1026950 -> 0 bytes
.../loadtests/gatling/lib/commons-pool-1.6.jar | Bin 111119 -> 0 bytes
.../lib/compiler-interface-0.13.5-sources.jar | Bin 30056 -> 0 bytes
.../lib/concurrentlinkedhashmap-lru-1.4.jar | Bin 116575 -> 0 bytes
stack/loadtests/gatling/lib/config-1.2.1.jar | Bin 219554 -> 0 bytes
.../gatling/lib/fastring_2.10-0.2.4.jar | Bin 98640 -> 0 bytes
.../gatling/lib/gatling-app-2.0.0-RC5.jar | Bin 73052 -> 0 bytes
.../gatling/lib/gatling-charts-2.0.0-RC5.jar | Bin 500609 -> 0 bytes
.../lib/gatling-charts-highcharts-2.0.0-RC5.jar | Bin 214683 -> 0 bytes
.../gatling/lib/gatling-core-2.0.0-RC5.jar | Bin 1678475 -> 0 bytes
.../gatling/lib/gatling-http-2.0.0-RC5.jar | Bin 1222752 -> 0 bytes
.../gatling/lib/gatling-jdbc-2.0.0-RC5.jar | Bin 41648 -> 0 bytes
.../gatling/lib/gatling-jms-2.0.0-RC5.jar | Bin 174279 -> 0 bytes
.../gatling/lib/gatling-metrics-2.0.0-RC5.jar | Bin 72446 -> 0 bytes
.../gatling/lib/gatling-recorder-2.0.0-RC5.jar | Bin 815471 -> 0 bytes
.../gatling/lib/gatling-redis-2.0.0-RC5.jar | Bin 19970 -> 0 bytes
.../gatling/lib/geronimo-jms_1.1_spec-1.1.1.jar | Bin 32359 -> 0 bytes
.../gatling/lib/incremental-compiler-0.13.5.jar | Bin 2214694 -> 0 bytes
.../gatling/lib/jackson-annotations-2.4.0.jar | Bin 38605 -> 0 bytes
.../gatling/lib/jackson-core-2.4.2.jar | Bin 225316 -> 0 bytes
.../gatling/lib/jackson-databind-2.4.2.jar | Bin 1075759 -> 0 bytes
stack/loadtests/gatling/lib/jodd-core-3.6.jar | Bin 373882 -> 0 bytes
.../loadtests/gatling/lib/jodd-lagarto-3.6.jar | Bin 204738 -> 0 bytes
stack/loadtests/gatling/lib/jodd-log-3.6.jar | Bin 14547 -> 0 bytes
.../gatling/lib/jsonpath_2.10-0.5.0.jar | Bin 180090 -> 0 bytes
stack/loadtests/gatling/lib/jzlib-1.1.3.jar | Bin 71976 -> 0 bytes
.../gatling/lib/logback-classic-1.1.2.jar | Bin 270750 -> 0 bytes
.../gatling/lib/logback-core-1.1.2.jar | Bin 427729 -> 0 bytes
.../loadtests/gatling/lib/netty-3.9.4.Final.jar | Bin 1310154 -> 0 bytes
stack/loadtests/gatling/lib/opencsv-2.3.jar | Bin 19827 -> 0 bytes
.../gatling/lib/redisclient_2.10-2.13.jar | Bin 712616 -> 0 bytes
.../gatling/lib/sbt-interface-0.13.5.jar | Bin 52012 -> 0 bytes
stack/loadtests/gatling/lib/scala-compiler.jar | Bin 14445780 -> 0 bytes
stack/loadtests/gatling/lib/scala-library.jar | Bin 7126372 -> 0 bytes
stack/loadtests/gatling/lib/scala-reflect.jar | Bin 3203471 -> 0 bytes
.../gatling/lib/scala-swing-2.10.4.jar | Bin 707298 -> 0 bytes
.../lib/scalalogging-slf4j_2.10-1.1.0.jar | Bin 79003 -> 0 bytes
.../loadtests/gatling/lib/scopt_2.10-3.2.0.jar | Bin 122918 -> 0 bytes
stack/loadtests/gatling/lib/slf4j-api-1.7.7.jar | Bin 29257 -> 0 bytes
stack/loadtests/gatling/lib/t-digest-3.0.jar | Bin 49754 -> 0 bytes
stack/loadtests/gatling/lib/threetenbp-1.0.jar | Bin 507797 -> 0 bytes
.../gatling/lib/uncommons-maths-1.2.3.jar | Bin 49923 -> 0 bytes
stack/loadtests/gatling/lib/zinc-0.3.5.3.jar | Bin 392810 -> 0 bytes
stack/loadtests/gatling/scripts/gatling-ug.sh | 49 -----
.../gatling/user-files/data/search.csv | 3 -
.../gatling/user-files/request-bodies/.keep | 0
stack/loadtests/loadtest_setup.sh | 45 -----
stack/loadtests/pom.xml | 156 ++++++++++++++
.../data-generators/EntityDataGenerator.scala | 57 ------
.../data-generators/FeederGenerator.scala | 101 ----------
.../scenarios/ApplicationScenarios.scala | 45 -----
.../scenarios/ConnectionScenarios.scala | 30 ---
.../usergrid/scenarios/DeviceScenarios.scala | 65 ------
.../usergrid/scenarios/GeoScenarios.scala | 43 ----
.../scenarios/NotificationScenarios.scala | 71 -------
.../usergrid/scenarios/NotifierScenarios.scala | 65 ------
.../scenarios/OrganizationScenarios.scala | 42 ----
.../usergrid/scenarios/TokenScenarios.scala | 59 ------
.../usergrid/scenarios/UserScenarios.scala | 50 -----
.../org/apache/usergrid/settings/Headers.scala | 43 ----
.../org/apache/usergrid/settings/Settings.scala | 54 -----
.../org/apache/usergrid/settings/Utils.scala | 87 --------
.../simulations/GetEntitySimulation.scala | 41 ----
.../simulations/PostDevicesSimulation.scala | 42 ----
.../simulations/PostUsersSimulation.scala | 47 -----
.../PushTargetDeviceSimulation.scala | 53 -----
.../simulations/PushTargetUserSimulation.scala | 68 -------
.../datagenerators/EntityDataGenerator.scala | 59 ++++++
.../datagenerators/FeederGenerator.scala | 114 +++++++++++
.../scenarios/ApplicationScenarios.scala | 46 +++++
.../scenarios/ConnectionScenarios.scala | 36 ++++
.../usergrid/scenarios/DeviceScenarios.scala | 85 ++++++++
.../usergrid/scenarios/GeoScenarios.scala | 44 ++++
.../scenarios/NotificationScenarios.scala | 74 +++++++
.../usergrid/scenarios/NotifierScenarios.scala | 66 ++++++
.../scenarios/OrganizationScenarios.scala | 43 ++++
.../usergrid/scenarios/TokenScenarios.scala | 60 ++++++
.../usergrid/scenarios/UserScenarios.scala | 50 +++++
.../org/apache/usergrid/settings/Headers.scala | 43 ++++
.../org/apache/usergrid/settings/Settings.scala | 50 +++++
.../org/apache/usergrid/settings/Utils.scala | 91 +++++++++
.../simulations/GetEntitySimulation.scala | 44 ++++
.../simulations/PostDevicesSimulation.scala | 45 +++++
.../simulations/PostUsersSimulation.scala | 50 +++++
...PushNotificationTargetDeviceSimulation.scala | 57 ++++++
.../PushNotificationTargetUserSimulation.scala | 72 +++++++
stack/loadtests/src/main/scripts/gatling-ug.sh | 51 +++++
stack/loadtests/src/test/resources/gatling.conf | 154 ++++++++++++++
stack/loadtests/src/test/resources/logback.xml | 37 ++++
.../loadtests/src/test/resources/recorder.conf | 37 ++++
stack/loadtests/src/test/scala/Engine.scala | 32 +++
.../src/test/scala/IDEPathHelper.scala | 37 ++++
stack/loadtests/src/test/scala/Recorder.scala | 28 +++
102 files changed, 1661 insertions(+), 1683 deletions(-)
----------------------------------------------------------------------
[16/22] git commit: move queue maanger behind interface
Posted by sn...@apache.org.
move queue maanger behind interface
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8f4720db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8f4720db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8f4720db
Branch: refs/heads/two-dot-o-events
Commit: 8f4720db79534172f964378c08b5f51884e046d0
Parents: 955a92b
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Oct 29 11:05:27 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Oct 29 11:05:27 2014 -0600
----------------------------------------------------------------------
.../notifications/ApplicationQueueManager.java | 532 +------------------
.../notifications/InactiveDeviceManager.java | 3 +-
.../notifications/NotificationsService.java | 14 +-
.../services/notifications/QueueListener.java | 7 +-
.../services/notifications/TaskManager.java | 17 +-
.../apns/ExpiredTokenListener.java | 8 -
.../impl/ApplicationQueueManagerImpl.java | 523 ++++++++++++++++++
.../apns/NotificationsServiceIT.java | 7 +-
.../gcm/NotificationsServiceIT.java | 7 +-
9 files changed, 569 insertions(+), 549 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8f4720db/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index 1058c34..8012b42 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -1,526 +1,48 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
*/
+
package org.apache.usergrid.services.notifications;
-import com.clearspring.analytics.hash.MurmurHash;
-import com.clearspring.analytics.stream.frequency.CountMinSketch;
-import com.codahale.metrics.Meter;
import org.apache.usergrid.batch.JobExecution;
-import org.apache.usergrid.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.*;
-import org.apache.usergrid.persistence.entities.Device;
import org.apache.usergrid.persistence.entities.Notification;
-import org.apache.usergrid.persistence.entities.Notifier;
-import org.apache.usergrid.persistence.entities.Receipt;
-import org.apache.usergrid.persistence.index.query.Query;
-import org.apache.usergrid.persistence.queue.QueueManager;
import org.apache.usergrid.persistence.queue.QueueMessage;
-import org.apache.usergrid.services.notifications.apns.APNsAdapter;
-import org.apache.usergrid.services.notifications.gcm.GCMAdapter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import rx.Observable;
-import rx.Subscriber;
-import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.schedulers.Schedulers;
-
-import java.security.Provider;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.List;
-public class ApplicationQueueManager {
+/**
+ * Classy class class.
+ */
+public interface ApplicationQueueManager {
- public static String DEFAULT_QUEUE_NAME = "push_v1";
public static final String DEFAULT_QUEUE_PROPERTY = "usergrid.notifications.listener.queue";
- private static final Logger LOG = LoggerFactory.getLogger(ApplicationQueueManager.class);
-
- //this is for tests, will not mark initial post complete, set to false for tests
- private static ExecutorService INACTIVE_DEVICE_CHECK_POOL = Executors.newFixedThreadPool(5);
public static final String NOTIFIER_ID_POSTFIX = ".notifier.id";
- private final EntityManager em;
- private final QueueManager qm;
- private final JobScheduler jobScheduler;
- private final MetricsFactory metricsFactory;
- private final String queueName;
-
- HashMap<Object, ProviderAdapter> notifierHashMap; // only retrieve notifiers once
-
-
- public ApplicationQueueManager(JobScheduler jobScheduler, EntityManager entityManager, QueueManager queueManager, MetricsFactory metricsFactory, Properties properties){
- this.em = entityManager;
- this.qm = queueManager;
- this.jobScheduler = jobScheduler;
- this.metricsFactory = metricsFactory;
- this.queueName = getQueueNames(properties);
-
- }
-
- public boolean scheduleQueueJob(Notification notification) throws Exception{
- return jobScheduler.scheduleQueueJob(notification);
- }
-
- public void queueNotification(final Notification notification, final JobExecution jobExecution) throws Exception {
- if(scheduleQueueJob(notification)){
- em.update(notification);
- return;
- }
- final Meter queueMeter = metricsFactory.getMeter(ApplicationQueueManager.class,"queue");
- long startTime = System.currentTimeMillis();
-
- if (notification.getCanceled() == Boolean.TRUE) {
- LOG.info("notification " + notification.getUuid() + " canceled");
- if (jobExecution != null) {
- jobExecution.killed();
- }
- return;
- }
-
- LOG.info("notification {} start queuing", notification.getUuid());
-
- final PathQuery<Device> pathQuery = notification.getPathQuery() ; //devices query
- final AtomicInteger deviceCount = new AtomicInteger(); //count devices so you can make a judgement on batching
- final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<String>(); //build up list of issues
-
- final HashMap<Object,ProviderAdapter> notifierMap = getNotifierMap();
-
- //get devices in querystring, and make sure you have access
- if (pathQuery != null) {
- LOG.info("notification {} start query", notification.getUuid());
- final Iterator<Device> iterator = pathQuery.iterator(em);
- //if there are more pages (defined by PAGE_SIZE) you probably want this to be async, also if this is already a job then don't reschedule
- if (iterator instanceof ResultsIterator && ((ResultsIterator) iterator).hasPages() && jobExecution == null) {
- jobScheduler.scheduleQueueJob(notification, true);
- em.update(notification);
- return;
- }
- final CountMinSketch sketch = new CountMinSketch(0.0001,.99,7364181); //add probablistic counter to find dups
- final UUID appId = em.getApplication().getUuid();
- final Map<String,Object> payloads = notification.getPayloads();
-
- final Func1<Entity,Entity> entityListFunct = new Func1<Entity, Entity>() {
- @Override
- public Entity call(Entity entity) {
-
- try {
-
- long now = System.currentTimeMillis();
- List<EntityRef> devicesRef = getDevices(entity); // resolve group
-
- LOG.info("notification {} queue {} devices, duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), devicesRef.size());
-
- for (EntityRef deviceRef : devicesRef) {
- LOG.info("notification {} starting to queue device {} ", notification.getUuid(), deviceRef.getUuid());
- long hash = MurmurHash.hash(deviceRef.getUuid());
- if (sketch.estimateCount(hash) > 0) { //look for duplicates
- LOG.warn("Maybe Found duplicate device: {}", deviceRef.getUuid());
- continue;
- } else {
- sketch.add(hash, 1);
- }
- String notifierId = null;
- String notifierKey = null;
-
- //find the device notifier info, match it to the payload
- for (Map.Entry<String, Object> entry : payloads.entrySet()) {
- ProviderAdapter adapter = notifierMap.get(entry.getKey().toLowerCase());
- now = System.currentTimeMillis();
- String providerId = getProviderId(deviceRef, adapter.getNotifier());
- if (providerId != null) {
- notifierId = providerId;
- notifierKey = entry.getKey().toLowerCase();
- break;
- }
- LOG.info("Provider query for notification {} device {} took "+(System.currentTimeMillis()-now)+" ms",notification.getUuid(),deviceRef.getUuid());
- }
-
- if (notifierId == null) {
- LOG.info("Notifier did not match for device {} ", deviceRef);
- continue;
- }
-
- ApplicationQueueMessage message = new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), notifierKey, notifierId);
- if (notification.getQueued() == null) {
- // update queued time
- now = System.currentTimeMillis();
- notification.setQueued(System.currentTimeMillis());
- LOG.info("notification {} device {} queue time set. duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), deviceRef.getUuid());
- }
- now = System.currentTimeMillis();
- qm.sendMessage(message);
- LOG.info("notification {} post-queue to device {} duration " + (System.currentTimeMillis() - now) + " ms "+queueName+" queue", notification.getUuid(), deviceRef.getUuid());
- deviceCount.incrementAndGet();
- queueMeter.mark();
- }
- } catch (Exception deviceLoopException) {
- LOG.error("Failed to add devices", deviceLoopException);
- errorMessages.add("Failed to add devices for entity: " + entity.getUuid() + " error:" + deviceLoopException);
- }
- return entity;
- }
- };
-
- long now = System.currentTimeMillis();
- Observable o = rx.Observable.create(new IteratorObservable<Entity>(iterator))
- .parallel(new Func1<Observable<Entity>, Observable<Entity>>() {
- @Override
- public rx.Observable<Entity> call(rx.Observable<Entity> deviceObservable) {
- return deviceObservable.map(entityListFunct);
- }
- }, Schedulers.io())
- .doOnError(new Action1<Throwable>() {
- @Override
- public void call(Throwable throwable) {
- LOG.error("Failed while writing", throwable);
- }
- });
- o.toBlocking().lastOrDefault(null);
- LOG.info("notification {} done queueing duration {} ms", notification.getUuid(), System.currentTimeMillis() - now);
- }
-
- // update queued time
- Map<String, Object> properties = new HashMap<String, Object>(2);
- properties.put("queued", notification.getQueued());
- properties.put("state", notification.getState());
- if(errorMessages.size()>0){
- if (notification.getErrorMessage() == null) {
- notification.setErrorMessage("There was a problem delivering all of your notifications. See deliveryErrors in properties");
- }
- }
-
- notification.setExpectedCount(deviceCount.get());
- notification.addProperties(properties);
- long now = System.currentTimeMillis();
-
-
- LOG.info("notification {} updated notification duration {} ms", notification.getUuid(), System.currentTimeMillis() - now);
-
- //do i have devices, and have i already started batching.
- if (deviceCount.get() <= 0 || !notification.getDebug()) {
- TaskManager taskManager = new TaskManager(em, this, notification);
- //if i'm in a test value will be false, do not mark finished for test orchestration, not ideal need real tests
- taskManager.finishedBatch(false,true);
- }else {
- em.update(notification);
- }
-
- long elapsed = notification.getQueued() != null ? notification.getQueued() - startTime : 0;
- LOG.info("notification {} done queuing to {} devices in " + elapsed + " ms", notification.getUuid().toString(), deviceCount.get());
- }
-
- /**
- * only need to get notifiers once. will reset on next batch
- * @return
- */
- public HashMap<Object,ProviderAdapter> getNotifierMap(){
- if(notifierHashMap == null) {
- long now = System.currentTimeMillis();
- notifierHashMap = new HashMap<Object, ProviderAdapter>();
- Query query = new Query();
- query.setCollection("notifiers");
- query.setLimit(100);
- PathQuery<Notifier> pathQuery = new PathQuery<Notifier>(
- new SimpleEntityRef(em.getApplicationRef()),
- query
- );
- Iterator<Notifier> notifierIterator = pathQuery.iterator(em);
- int count = 0;
- while (notifierIterator.hasNext()) {
- Notifier notifier = notifierIterator.next();
- String name = notifier.getName() != null ? notifier.getName() : "";
- UUID uuid = notifier.getUuid() != null ? notifier.getUuid() : UUID.randomUUID();
- ProviderAdapter providerAdapter = ProviderAdapterFactory.getProviderAdapter(notifier,em);
- notifierHashMap.put(name.toLowerCase(), providerAdapter);
- notifierHashMap.put(uuid, providerAdapter);
- notifierHashMap.put(uuid.toString(), providerAdapter);
- if(count++ >= 100){
- LOG.error("ApplicationQueueManager: too many notifiers...breaking out ", notifierHashMap.size());
- break;
- }
- }
- LOG.info("ApplicationQueueManager: fetching notifiers finished size={}, duration {} ms", notifierHashMap.size(),System.currentTimeMillis() - now);
- }
- return notifierHashMap;
- }
-
- /**
- * send batches of notifications to provider
- * @param messages
- * @throws Exception
- */
- public Observable sendBatchToProviders( final List<QueueMessage> messages, final String queuePath) {
- LOG.info("sending batch of {} notifications.", messages.size());
- final Meter sendMeter = metricsFactory.getMeter(NotificationsService.class, "send");
-
- final Map<Object, ProviderAdapter> notifierMap = getNotifierMap();
- final ApplicationQueueManager proxy = this;
- final ConcurrentHashMap<UUID,TaskManager> taskMap = new ConcurrentHashMap<UUID, TaskManager>(messages.size());
- final ConcurrentHashMap<UUID,Notification> notificationMap = new ConcurrentHashMap<UUID, Notification>(messages.size());
-
- final Func1<QueueMessage, ApplicationQueueMessage> func = new Func1<QueueMessage, ApplicationQueueMessage>() {
- @Override
- public ApplicationQueueMessage call(QueueMessage queueMessage) {
- boolean messageCommitted = false;
- ApplicationQueueMessage message = null;
- try {
- message = (ApplicationQueueMessage) queueMessage.getBody();
- LOG.info("start sending notification for device {} for Notification: {} on thread "+Thread.currentThread().getId(), message.getDeviceId(), message.getNotificationId());
-
- UUID deviceUUID = message.getDeviceId();
-
- Notification notification = notificationMap.get(message.getNotificationId());
- if (notification == null) {
- notification = em.get(message.getNotificationId(), Notification.class);
- notificationMap.put(message.getNotificationId(), notification);
- }
- TaskManager taskManager = taskMap.get(message.getNotificationId());
- if (taskManager == null) {
- taskManager = new TaskManager(em, proxy, notification);
- taskMap.putIfAbsent(message.getNotificationId(), taskManager);
- taskManager = taskMap.get(message.getNotificationId());
- }
-
- final Map<String, Object> payloads = notification.getPayloads();
- final Map<String, Object> translatedPayloads = translatePayloads(payloads, notifierMap);
- LOG.info("sending notification for device {} for Notification: {}", deviceUUID, notification.getUuid());
-
- try {
- String notifierName = message.getNotifierKey().toLowerCase();
- ProviderAdapter providerAdapter = notifierMap.get(notifierName.toLowerCase());
- Object payload = translatedPayloads.get(notifierName);
- Receipt receipt = new Receipt(notification.getUuid(), message.getNotifierId(), payload, deviceUUID);
- TaskTracker tracker = new TaskTracker(providerAdapter.getNotifier(), taskManager, receipt, deviceUUID);
- if(!isOkToSend(notification)){
- tracker.failed(0, "Notification is duplicate/expired/cancelled.");
- }else {
- if (payload == null) {
- LOG.debug("selected device {} for notification {} doesn't have a valid payload. skipping.", deviceUUID, notification.getUuid());
- tracker.failed(0, "failed to match payload to " + message.getNotifierId() + " notifier");
- } else {
- long now = System.currentTimeMillis();
- try {
- providerAdapter.sendNotification(message.getNotifierId(), payload, notification, tracker);
- } catch (Exception e) {
- tracker.failed(0, e.getMessage());
- } finally {
- LOG.info("sending to device {} for Notification: {} duration " + (System.currentTimeMillis() - now) + " ms", deviceUUID, notification.getUuid());
- }
- }
- }
- messageCommitted = true;
- } finally {
- sendMeter.mark();
- }
-
- } catch (Exception e) {
- LOG.error("Failure while sending",e);
- try {
- if(!messageCommitted && queuePath != null) {
- qm.commitMessage(queueMessage);
- }
- }catch (Exception queueException){
- LOG.error("Failed to commit message.",queueException);
- }
- }
- return message;
- }
- };
- Observable o = rx.Observable.from(messages)
- .parallel(new Func1<rx.Observable<QueueMessage>, rx.Observable<ApplicationQueueMessage>>() {
- @Override
- public rx.Observable<ApplicationQueueMessage> call(rx.Observable<QueueMessage> messageObservable) {
- return messageObservable.map(func);
- }
- }, Schedulers.io())
- .buffer(messages.size())
- .map(new Func1<List<ApplicationQueueMessage>, HashMap<UUID, ApplicationQueueMessage>>() {
- @Override
- public HashMap<UUID, ApplicationQueueMessage> call(List<ApplicationQueueMessage> queueMessages) {
- //for gcm this will actually send notification
- for (ProviderAdapter providerAdapter : notifierMap.values()) {
- try {
- providerAdapter.doneSendingNotifications();
- } catch (Exception e) {
- LOG.error("providerAdapter.doneSendingNotifications: ", e);
- }
- }
- //TODO: check if a notification is done and mark it
- HashMap<UUID, ApplicationQueueMessage> notifications = new HashMap<UUID, ApplicationQueueMessage>();
- for (ApplicationQueueMessage message : queueMessages) {
- if (notifications.get(message.getNotificationId()) == null) {
- try {
- TaskManager taskManager = taskMap.get(message.getNotificationId());
- notifications.put(message.getNotificationId(), message);
- taskManager.finishedBatch();
- } catch (Exception e) {
- LOG.error("Failed to finish batch", e);
- }
- }
-
- }
- return notifications;
- }
- })
- .doOnError(new Action1<Throwable>() {
- @Override
- public void call(Throwable throwable) {
- LOG.error("Failed while sending",throwable);
- }
- });
- return o;
- }
-
- public void stop(){
- for(ProviderAdapter adapter : getNotifierMap().values()){
- try {
- adapter.stop();
- }catch (Exception e){
- LOG.error("failed to stop adapter",e);
- }
- }
- }
-
-
- /**
- * Call the adapter with the notifier
- */
- private Map<String, Object> translatePayloads(Map<String, Object> payloads, Map<Object, ProviderAdapter> notifierMap) throws Exception {
- Map<String, Object> translatedPayloads = new HashMap<String, Object>( payloads.size());
- for (Map.Entry<String, Object> entry : payloads.entrySet()) {
- String payloadKey = entry.getKey().toLowerCase();
- Object payloadValue = entry.getValue();
- ProviderAdapter providerAdapter = notifierMap.get(payloadKey);
- if (providerAdapter != null) {
- Object translatedPayload = payloadValue != null ? providerAdapter.translatePayload(payloadValue) : null;
- if (translatedPayload != null) {
- translatedPayloads.put(payloadKey, translatedPayload);
- }
- }
- }
- return translatedPayloads;
- }
-
- public static String getQueueNames(Properties properties) {
- String name = properties.getProperty(ApplicationQueueManager.DEFAULT_QUEUE_PROPERTY,ApplicationQueueManager.DEFAULT_QUEUE_NAME);
- return name;
- }
-
- private static final class IteratorObservable<T> implements rx.Observable.OnSubscribe<T> {
- private final Iterator<T> input;
- private IteratorObservable( final Iterator input ) {this.input = input;}
-
- @Override
- public void call( final Subscriber<? super T> subscriber ) {
-
- /**
- * You would replace this code with your file reading. Instead of emitting from an iterator,
- * you would create a bean object that represents the entity, and then emit it
- */
-
- try {
- while ( !subscriber.isUnsubscribed() && input.hasNext() ) {
- //send our input to the next
- subscriber.onNext( (T) input.next() );
- }
-
- //tell the subscriber we don't have any more data
- subscriber.onCompleted();
- }
- catch ( Throwable t ) {
- LOG.error("failed on subscriber",t);
- subscriber.onError( t );
- }
- }
- }
-
- public void asyncCheckForInactiveDevices() throws Exception {
- Collection<ProviderAdapter> providerAdapters = getNotifierMap().values();
- for (final ProviderAdapter providerAdapter : providerAdapters) {
- try {
- if (providerAdapter != null) {
- LOG.debug("checking notifier {} for inactive devices", providerAdapter.getNotifier());
- providerAdapter.removeInactiveDevices();
-
- LOG.debug("finished checking notifier {} for inactive devices",providerAdapter.getNotifier());
- }
- } catch (Exception e) {
- LOG.error("checkForInactiveDevices", e); // not
- // essential so
- // don't fail,
- // but log
- }
- }
- }
-
-
- private boolean isOkToSend(Notification notification) {
- Map<String,Long> stats = notification.getStatistics();
- if (stats != null && notification.getExpectedCount() == (stats.get("sent")+ stats.get("errors"))) {
- LOG.info("notification {} already processed. not sending.",
- notification.getUuid());
- return false;
- }
- if (notification.getCanceled() == Boolean.TRUE) {
- LOG.info("notification {} canceled. not sending.",
- notification.getUuid());
- return false;
- }
- if (notification.isExpired()) {
- LOG.info("notification {} expired. not sending.",
- notification.getUuid());
- return false;
- }
- return true;
- }
-
- private List<EntityRef> getDevices(EntityRef ref) throws Exception {
- List<EntityRef> devices = Collections.EMPTY_LIST;
- if ("device".equals(ref.getType())) {
- devices = Collections.singletonList(ref);
- } else if ("user".equals(ref.getType())) {
- devices = em.getCollection(ref, "devices", null, Query.MAX_LIMIT,
- Query.Level.REFS, false).getRefs();
- } else if ("group".equals(ref.getType())) {
- devices = new ArrayList<EntityRef>();
- for (EntityRef r : em.getCollection(ref, "users", null,
- Query.MAX_LIMIT, Query.Level.REFS, false).getRefs()) {
- devices.addAll(getDevices(r));
- }
- }
- return devices;
- }
+ public static final String DEFAULT_QUEUE_NAME = "push_v1";
+ void queueNotification(Notification notification, JobExecution jobExecution) throws Exception;
- private String getProviderId(EntityRef device, Notifier notifier) throws Exception {
- try {
- Object value = em.getProperty(device, notifier.getName() + NOTIFIER_ID_POSTFIX);
- if (value == null) {
- value = em.getProperty(device, notifier.getUuid() + NOTIFIER_ID_POSTFIX);
- }
- return value != null ? value.toString() : null;
- } catch (Exception e) {
- LOG.error("Errer getting provider ID, proceding with rest of batch", e);
- return null;
- }
- }
+ Observable sendBatchToProviders(List<QueueMessage> messages, String queuePath);
+ void stop();
-}
\ No newline at end of file
+ void asyncCheckForInactiveDevices() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8f4720db/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
index 82841d2..108a4a0 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
@@ -25,6 +25,7 @@ import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.Results;
import org.apache.usergrid.persistence.entities.Notifier;
import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +46,7 @@ public class InactiveDeviceManager {
this.entityManager = entityManager;
}
public void removeInactiveDevices( Map<String,Date> inactiveDeviceMap ){
- final String notfierPostFix = ApplicationQueueManager.NOTIFIER_ID_POSTFIX;
+ final String notfierPostFix = ApplicationQueueManagerImpl.NOTIFIER_ID_POSTFIX;
if (inactiveDeviceMap != null && inactiveDeviceMap.size() > 0) {
LOG.debug("processing {} inactive devices", inactiveDeviceMap.size());
Map<String, Object> clearPushtokenMap = new HashMap<String, Object>( 2);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8f4720db/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index a64704c..5de4d7d 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -29,13 +29,12 @@ import org.apache.usergrid.persistence.entities.Notifier;
import org.apache.usergrid.persistence.entities.Receipt;
import org.apache.usergrid.persistence.index.query.Identifier;
import org.apache.usergrid.persistence.index.query.Query;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.persistence.queue.QueueManager;
import org.apache.usergrid.persistence.queue.QueueManagerFactory;
import org.apache.usergrid.persistence.queue.QueueScope;
import org.apache.usergrid.persistence.queue.QueueScopeFactory;
-import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
import org.apache.usergrid.services.*;
+import org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,9 +43,6 @@ import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundExcept
import org.apache.usergrid.services.exceptions.ForbiddenServiceOperationException;
import static org.apache.usergrid.utils.InflectionUtils.pluralize;
-import org.apache.usergrid.services.notifications.apns.APNsAdapter;
-import org.apache.usergrid.services.notifications.gcm.GCMAdapter;
-import org.springframework.beans.factory.annotation.Autowired;
import rx.Observable;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
@@ -63,8 +59,6 @@ public class NotificationsService extends AbstractCollectionService {
//need a mocking framework, this is to substitute for no mocking
public static QueueManager TEST_QUEUE_MANAGER = null;
- public static final String NOTIFIER_ID_POSTFIX = ".notifier.id";
-
static final String MESSAGE_PROPERTY_DEVICE_UUID = "deviceUUID";
static {
@@ -94,12 +88,12 @@ public class NotificationsService extends AbstractCollectionService {
postMeter = metricsService.getMeter(NotificationsService.class, "requests");
postTimer = metricsService.getTimer(this.getClass(), "execution_rest");
JobScheduler jobScheduler = new JobScheduler(sm,em);
- String name = ApplicationQueueManager.getQueueNames(props);
+ String name = ApplicationQueueManagerImpl.getQueueNames(props);
QueueScopeFactory queueScopeFactory = CpSetup.getInjector().getInstance(QueueScopeFactory.class);
QueueScope queueScope = queueScopeFactory.getScope(smf.getManagementAppId(), name);
queueManagerFactory = CpSetup.getInjector().getInstance(QueueManagerFactory.class);
QueueManager queueManager = TEST_QUEUE_MANAGER !=null ? TEST_QUEUE_MANAGER : queueManagerFactory.getQueueManager(queueScope);
- notificationQueueManager = new ApplicationQueueManager(jobScheduler,em,queueManager,metricsService,props);
+ notificationQueueManager = new ApplicationQueueManagerImpl(jobScheduler,em,queueManager,metricsService,props);
gracePeriod = jobScheduler.SCHEDULER_GRACE_PERIOD;
}
@@ -262,7 +256,7 @@ public class NotificationsService extends AbstractCollectionService {
throw new IllegalArgumentException("notifier \""
+ notifierId + "\" not found");
}
- ProviderAdapter providerAdapter = ProviderAdapterFactory.getProviderAdapter(notifier,em);
+ ProviderAdapter providerAdapter = ProviderAdapterFactory.getProviderAdapter(notifier, em);
Object payload = entry.getValue();
try {
return providerAdapter.translatePayload(payload); // validate
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8f4720db/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 3c788a9..b5aaeda 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -25,12 +25,11 @@ import org.apache.usergrid.metrics.MetricsFactory;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.EntityManagerFactory;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.persistence.queue.*;
import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
import org.apache.usergrid.services.ServiceManager;
import org.apache.usergrid.services.ServiceManagerFactory;
+import org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
@@ -96,7 +95,7 @@ public class QueueListener {
sleepWhenNoneFound = new Long(properties.getProperty("usergrid.notifications.listener.sleep.after", ""+DEFAULT_SLEEP)).longValue();
batchSize = new Integer(properties.getProperty("usergrid.notifications.listener.batchSize", (""+batchSize)));
consecutiveCallsToRemoveDevices = new Integer(properties.getProperty("usergrid.notifications.inactive.interval", ""+200));
- queueName = ApplicationQueueManager.getQueueNames(properties);
+ queueName = ApplicationQueueManagerImpl.getQueueNames(properties);
int maxThreads = new Integer(properties.getProperty("usergrid.notifications.listener.maxThreads", ""+MAX_THREADS));
@@ -251,7 +250,7 @@ public class QueueListener {
EntityManager entityManager = emf.getEntityManager(applicationId);
ServiceManager serviceManager = smf.getServiceManager(applicationId);
- ApplicationQueueManager manager = new ApplicationQueueManager(
+ ApplicationQueueManagerImpl manager = new ApplicationQueueManagerImpl(
new JobScheduler(serviceManager, entityManager),
entityManager,
queueManager,
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8f4720db/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
index 5902a93..148a2dc 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
@@ -23,19 +23,15 @@ import org.apache.usergrid.persistence.entities.Device;
import org.apache.usergrid.persistence.entities.Notification;
import org.apache.usergrid.persistence.entities.Notifier;
import org.apache.usergrid.persistence.entities.Receipt;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.QueueMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
public class TaskManager {
private static final Logger LOG = LoggerFactory.getLogger(TaskManager.class);
- private final ApplicationQueueManager proxy;
private Notification notification;
private AtomicLong successes = new AtomicLong();
@@ -43,10 +39,9 @@ public class TaskManager {
private EntityManager em;
private boolean hasFinished;
- public TaskManager(EntityManager em,ApplicationQueueManager proxy, Notification notification) {
+ public TaskManager(EntityManager em, Notification notification) {
this.em = em;
this.notification = notification;
- this.proxy = proxy;
hasFinished = false;
}
@@ -132,14 +127,14 @@ public class TaskManager {
protected void replaceProviderId(EntityRef device, Notifier notifier,
String newProviderId) throws Exception {
Object value = em.getProperty(device, notifier.getName()
- + NotificationsService.NOTIFIER_ID_POSTFIX);
+ + ApplicationQueueManager.NOTIFIER_ID_POSTFIX);
if (value != null) {
- em.setProperty(device, notifier.getName() + NotificationsService.NOTIFIER_ID_POSTFIX, newProviderId);
+ em.setProperty(device, notifier.getName() + ApplicationQueueManager.NOTIFIER_ID_POSTFIX, newProviderId);
} else {
value = em.getProperty(device, notifier.getUuid()
- + NotificationsService.NOTIFIER_ID_POSTFIX);
+ + ApplicationQueueManager.NOTIFIER_ID_POSTFIX);
if (value != null) {
- em.setProperty(device, notifier.getUuid() + NotificationsService.NOTIFIER_ID_POSTFIX, newProviderId);
+ em.setProperty(device, notifier.getUuid() + ApplicationQueueManager.NOTIFIER_ID_POSTFIX, newProviderId);
}
}
}
@@ -183,7 +178,7 @@ public class TaskManager {
LOG.info("notification finished batch: {} of {} devices in " + latency + "ms", notification.getUuid(), totals);
em.update(notification);
-// Set<Notifier> notifiers = new HashSet<>(proxy.getNotifierMap().values()); // remove dups
+// Set<Notifier> notifiers = new HashSet<>(proxy.getAdapterMap().values()); // remove dups
// proxy.asyncCheckForInactiveDevices(notifiers);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8f4720db/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java
index 6408dfd..1f7984a 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java
@@ -23,15 +23,7 @@ package org.apache.usergrid.services.notifications.apns;
import com.relayrides.pushy.apns.ExpiredToken;
import com.relayrides.pushy.apns.PushManager;
import com.relayrides.pushy.apns.util.SimpleApnsPushNotification;
-import org.apache.usergrid.persistence.Entity;
-import org.apache.usergrid.persistence.EntityManager;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.entities.Notifier;
-import org.apache.usergrid.persistence.index.query.Query;
-import org.apache.usergrid.services.notifications.ApplicationQueueManager;
import org.apache.usergrid.services.notifications.InactiveDeviceManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Date;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8f4720db/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
new file mode 100644
index 0000000..c8c5165
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications.impl;
+
+import com.clearspring.analytics.hash.MurmurHash;
+import com.clearspring.analytics.stream.frequency.CountMinSketch;
+import com.codahale.metrics.Meter;
+import org.apache.usergrid.batch.JobExecution;
+import org.apache.usergrid.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.entities.Device;
+import org.apache.usergrid.persistence.entities.Notification;
+import org.apache.usergrid.persistence.entities.Notifier;
+import org.apache.usergrid.persistence.entities.Receipt;
+import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.services.notifications.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Subscriber;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ApplicationQueueManagerImpl.class);
+
+ //this is for tests, will not mark initial post complete, set to false for tests
+
+ private final EntityManager em;
+ private final QueueManager qm;
+ private final JobScheduler jobScheduler;
+ private final MetricsFactory metricsFactory;
+ private final String queueName;
+
+ HashMap<Object, ProviderAdapter> notifierHashMap; // only retrieve notifiers once
+
+
+ public ApplicationQueueManagerImpl(JobScheduler jobScheduler, EntityManager entityManager, QueueManager queueManager, MetricsFactory metricsFactory, Properties properties){
+ this.em = entityManager;
+ this.qm = queueManager;
+ this.jobScheduler = jobScheduler;
+ this.metricsFactory = metricsFactory;
+ this.queueName = getQueueNames(properties);
+
+ }
+
+ private boolean scheduleQueueJob(Notification notification) throws Exception{
+ return jobScheduler.scheduleQueueJob(notification);
+ }
+
+ @Override
+ public void queueNotification(final Notification notification, final JobExecution jobExecution) throws Exception {
+ if(scheduleQueueJob(notification)){
+ em.update(notification);
+ return;
+ }
+ final Meter queueMeter = metricsFactory.getMeter(ApplicationQueueManagerImpl.class,"queue");
+ long startTime = System.currentTimeMillis();
+
+ if (notification.getCanceled() == Boolean.TRUE) {
+ LOG.info("notification " + notification.getUuid() + " canceled");
+ if (jobExecution != null) {
+ jobExecution.killed();
+ }
+ return;
+ }
+
+ LOG.info("notification {} start queuing", notification.getUuid());
+
+ final PathQuery<Device> pathQuery = notification.getPathQuery() ; //devices query
+ final AtomicInteger deviceCount = new AtomicInteger(); //count devices so you can make a judgement on batching
+ final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<String>(); //build up list of issues
+
+
+ //get devices in querystring, and make sure you have access
+ if (pathQuery != null) {
+ final HashMap<Object,ProviderAdapter> notifierMap = getAdapterMap();
+ LOG.info("notification {} start query", notification.getUuid());
+ final Iterator<Device> iterator = pathQuery.iterator(em);
+ //if there are more pages (defined by PAGE_SIZE) you probably want this to be async, also if this is already a job then don't reschedule
+ if (iterator instanceof ResultsIterator && ((ResultsIterator) iterator).hasPages() && jobExecution == null) {
+ jobScheduler.scheduleQueueJob(notification, true);
+ em.update(notification);
+ return;
+ }
+ final CountMinSketch sketch = new CountMinSketch(0.0001,.99,7364181); //add probablistic counter to find dups
+ final UUID appId = em.getApplication().getUuid();
+ final Map<String,Object> payloads = notification.getPayloads();
+
+ final Func1<Entity,Entity> entityListFunct = new Func1<Entity, Entity>() {
+ @Override
+ public Entity call(Entity entity) {
+
+ try {
+
+ long now = System.currentTimeMillis();
+ List<EntityRef> devicesRef = getDevices(entity); // resolve group
+
+ LOG.info("notification {} queue {} devices, duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), devicesRef.size());
+
+ for (EntityRef deviceRef : devicesRef) {
+ LOG.info("notification {} starting to queue device {} ", notification.getUuid(), deviceRef.getUuid());
+ long hash = MurmurHash.hash(deviceRef.getUuid());
+ if (sketch.estimateCount(hash) > 0) { //look for duplicates
+ LOG.warn("Maybe Found duplicate device: {}", deviceRef.getUuid());
+ continue;
+ } else {
+ sketch.add(hash, 1);
+ }
+ String notifierId = null;
+ String notifierKey = null;
+
+ //find the device notifier info, match it to the payload
+ for (Map.Entry<String, Object> entry : payloads.entrySet()) {
+ ProviderAdapter adapter = notifierMap.get(entry.getKey().toLowerCase());
+ now = System.currentTimeMillis();
+ String providerId = getProviderId(deviceRef, adapter.getNotifier());
+ if (providerId != null) {
+ notifierId = providerId;
+ notifierKey = entry.getKey().toLowerCase();
+ break;
+ }
+ LOG.info("Provider query for notification {} device {} took "+(System.currentTimeMillis()-now)+" ms",notification.getUuid(),deviceRef.getUuid());
+ }
+
+ if (notifierId == null) {
+ LOG.info("Notifier did not match for device {} ", deviceRef);
+ continue;
+ }
+
+ ApplicationQueueMessage message = new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), notifierKey, notifierId);
+ if (notification.getQueued() == null) {
+ // update queued time
+ now = System.currentTimeMillis();
+ notification.setQueued(System.currentTimeMillis());
+ LOG.info("notification {} device {} queue time set. duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), deviceRef.getUuid());
+ }
+ now = System.currentTimeMillis();
+ qm.sendMessage(message);
+ LOG.info("notification {} post-queue to device {} duration " + (System.currentTimeMillis() - now) + " ms "+queueName+" queue", notification.getUuid(), deviceRef.getUuid());
+ deviceCount.incrementAndGet();
+ queueMeter.mark();
+ }
+ } catch (Exception deviceLoopException) {
+ LOG.error("Failed to add devices", deviceLoopException);
+ errorMessages.add("Failed to add devices for entity: " + entity.getUuid() + " error:" + deviceLoopException);
+ }
+ return entity;
+ }
+ };
+
+ long now = System.currentTimeMillis();
+ Observable o = rx.Observable.create(new IteratorObservable<Entity>(iterator))
+ .parallel(new Func1<Observable<Entity>, Observable<Entity>>() {
+ @Override
+ public rx.Observable<Entity> call(rx.Observable<Entity> deviceObservable) {
+ return deviceObservable.map(entityListFunct);
+ }
+ }, Schedulers.io())
+ .doOnError(new Action1<Throwable>() {
+ @Override
+ public void call(Throwable throwable) {
+ LOG.error("Failed while writing", throwable);
+ }
+ });
+ o.toBlocking().lastOrDefault(null);
+ LOG.info("notification {} done queueing duration {} ms", notification.getUuid(), System.currentTimeMillis() - now);
+ }
+
+ // update queued time
+ Map<String, Object> properties = new HashMap<String, Object>(2);
+ properties.put("queued", notification.getQueued());
+ properties.put("state", notification.getState());
+ if(errorMessages.size()>0){
+ if (notification.getErrorMessage() == null) {
+ notification.setErrorMessage("There was a problem delivering all of your notifications. See deliveryErrors in properties");
+ }
+ }
+
+ notification.setExpectedCount(deviceCount.get());
+ notification.addProperties(properties);
+ long now = System.currentTimeMillis();
+
+
+ LOG.info("notification {} updated notification duration {} ms", notification.getUuid(), System.currentTimeMillis() - now);
+
+ //do i have devices, and have i already started batching.
+ if (deviceCount.get() <= 0 || !notification.getDebug()) {
+ TaskManager taskManager = new TaskManager(em, notification);
+ //if i'm in a test value will be false, do not mark finished for test orchestration, not ideal need real tests
+ taskManager.finishedBatch(false,true);
+ }else {
+ em.update(notification);
+ }
+
+ long elapsed = notification.getQueued() != null ? notification.getQueued() - startTime : 0;
+ LOG.info("notification {} done queuing to {} devices in " + elapsed + " ms", notification.getUuid().toString(), deviceCount.get());
+ }
+
+ /**
+ * only need to get notifiers once. will reset on next batch
+ * @return
+ */
+ private HashMap<Object,ProviderAdapter> getAdapterMap(){
+ if(notifierHashMap == null) {
+ long now = System.currentTimeMillis();
+ notifierHashMap = new HashMap<Object, ProviderAdapter>();
+ Query query = new Query();
+ query.setCollection("notifiers");
+ query.setLimit(100);
+ PathQuery<Notifier> pathQuery = new PathQuery<Notifier>(
+ new SimpleEntityRef(em.getApplicationRef()),
+ query
+ );
+ Iterator<Notifier> notifierIterator = pathQuery.iterator(em);
+ int count = 0;
+ while (notifierIterator.hasNext()) {
+ Notifier notifier = notifierIterator.next();
+ String name = notifier.getName() != null ? notifier.getName() : "";
+ UUID uuid = notifier.getUuid() != null ? notifier.getUuid() : UUID.randomUUID();
+ ProviderAdapter providerAdapter = ProviderAdapterFactory.getProviderAdapter(notifier,em);
+ notifierHashMap.put(name.toLowerCase(), providerAdapter);
+ notifierHashMap.put(uuid, providerAdapter);
+ notifierHashMap.put(uuid.toString(), providerAdapter);
+ if(count++ >= 100){
+ LOG.error("ApplicationQueueManager: too many notifiers...breaking out ", notifierHashMap.size());
+ break;
+ }
+ }
+ LOG.info("ApplicationQueueManager: fetching notifiers finished size={}, duration {} ms", notifierHashMap.size(),System.currentTimeMillis() - now);
+ }
+ return notifierHashMap;
+ }
+
+ /**
+ * send batches of notifications to provider
+ * @param messages
+ * @throws Exception
+ */
+ @Override
+ public Observable sendBatchToProviders(final List<QueueMessage> messages, final String queuePath) {
+ LOG.info("sending batch of {} notifications.", messages.size());
+ final Meter sendMeter = metricsFactory.getMeter(NotificationsService.class, "send");
+
+ final Map<Object, ProviderAdapter> notifierMap = getAdapterMap();
+ final ApplicationQueueManagerImpl proxy = this;
+ final ConcurrentHashMap<UUID,TaskManager> taskMap = new ConcurrentHashMap<UUID, TaskManager>(messages.size());
+ final ConcurrentHashMap<UUID,Notification> notificationMap = new ConcurrentHashMap<UUID, Notification>(messages.size());
+
+ final Func1<QueueMessage, ApplicationQueueMessage> func = new Func1<QueueMessage, ApplicationQueueMessage>() {
+ @Override
+ public ApplicationQueueMessage call(QueueMessage queueMessage) {
+ boolean messageCommitted = false;
+ ApplicationQueueMessage message = null;
+ try {
+ message = (ApplicationQueueMessage) queueMessage.getBody();
+ LOG.info("start sending notification for device {} for Notification: {} on thread "+Thread.currentThread().getId(), message.getDeviceId(), message.getNotificationId());
+
+ UUID deviceUUID = message.getDeviceId();
+
+ Notification notification = notificationMap.get(message.getNotificationId());
+ if (notification == null) {
+ notification = em.get(message.getNotificationId(), Notification.class);
+ notificationMap.put(message.getNotificationId(), notification);
+ }
+ TaskManager taskManager = taskMap.get(message.getNotificationId());
+ if (taskManager == null) {
+ taskManager = new TaskManager(em, notification);
+ taskMap.putIfAbsent(message.getNotificationId(), taskManager);
+ taskManager = taskMap.get(message.getNotificationId());
+ }
+
+ final Map<String, Object> payloads = notification.getPayloads();
+ final Map<String, Object> translatedPayloads = translatePayloads(payloads, notifierMap);
+ LOG.info("sending notification for device {} for Notification: {}", deviceUUID, notification.getUuid());
+
+ try {
+ String notifierName = message.getNotifierKey().toLowerCase();
+ ProviderAdapter providerAdapter = notifierMap.get(notifierName.toLowerCase());
+ Object payload = translatedPayloads.get(notifierName);
+ Receipt receipt = new Receipt(notification.getUuid(), message.getNotifierId(), payload, deviceUUID);
+ TaskTracker tracker = new TaskTracker(providerAdapter.getNotifier(), taskManager, receipt, deviceUUID);
+ if(!isOkToSend(notification)){
+ tracker.failed(0, "Notification is duplicate/expired/cancelled.");
+ }else {
+ if (payload == null) {
+ LOG.debug("selected device {} for notification {} doesn't have a valid payload. skipping.", deviceUUID, notification.getUuid());
+ tracker.failed(0, "failed to match payload to " + message.getNotifierId() + " notifier");
+ } else {
+ long now = System.currentTimeMillis();
+ try {
+ providerAdapter.sendNotification(message.getNotifierId(), payload, notification, tracker);
+ } catch (Exception e) {
+ tracker.failed(0, e.getMessage());
+ } finally {
+ LOG.info("sending to device {} for Notification: {} duration " + (System.currentTimeMillis() - now) + " ms", deviceUUID, notification.getUuid());
+ }
+ }
+ }
+ messageCommitted = true;
+ } finally {
+ sendMeter.mark();
+ }
+
+ } catch (Exception e) {
+ LOG.error("Failure while sending",e);
+ try {
+ if(!messageCommitted && queuePath != null) {
+ qm.commitMessage(queueMessage);
+ }
+ }catch (Exception queueException){
+ LOG.error("Failed to commit message.",queueException);
+ }
+ }
+ return message;
+ }
+ };
+ Observable o = rx.Observable.from(messages)
+ .parallel(new Func1<rx.Observable<QueueMessage>, rx.Observable<ApplicationQueueMessage>>() {
+ @Override
+ public rx.Observable<ApplicationQueueMessage> call(rx.Observable<QueueMessage> messageObservable) {
+ return messageObservable.map(func);
+ }
+ }, Schedulers.io())
+ .buffer(messages.size())
+ .map(new Func1<List<ApplicationQueueMessage>, HashMap<UUID, ApplicationQueueMessage>>() {
+ @Override
+ public HashMap<UUID, ApplicationQueueMessage> call(List<ApplicationQueueMessage> queueMessages) {
+ //for gcm this will actually send notification
+ for (ProviderAdapter providerAdapter : notifierMap.values()) {
+ try {
+ providerAdapter.doneSendingNotifications();
+ } catch (Exception e) {
+ LOG.error("providerAdapter.doneSendingNotifications: ", e);
+ }
+ }
+ //TODO: check if a notification is done and mark it
+ HashMap<UUID, ApplicationQueueMessage> notifications = new HashMap<UUID, ApplicationQueueMessage>();
+ for (ApplicationQueueMessage message : queueMessages) {
+ if (notifications.get(message.getNotificationId()) == null) {
+ try {
+ TaskManager taskManager = taskMap.get(message.getNotificationId());
+ notifications.put(message.getNotificationId(), message);
+ taskManager.finishedBatch();
+ } catch (Exception e) {
+ LOG.error("Failed to finish batch", e);
+ }
+ }
+
+ }
+ return notifications;
+ }
+ })
+ .doOnError(new Action1<Throwable>() {
+ @Override
+ public void call(Throwable throwable) {
+ LOG.error("Failed while sending",throwable);
+ }
+ });
+ return o;
+ }
+
+ @Override
+ public void stop(){
+ for(ProviderAdapter adapter : getAdapterMap().values()){
+ try {
+ adapter.stop();
+ }catch (Exception e){
+ LOG.error("failed to stop adapter",e);
+ }
+ }
+ }
+
+
+ /**
+ * Call the adapter with the notifier
+ */
+ private Map<String, Object> translatePayloads(Map<String, Object> payloads, Map<Object, ProviderAdapter> notifierMap) throws Exception {
+ Map<String, Object> translatedPayloads = new HashMap<String, Object>( payloads.size());
+ for (Map.Entry<String, Object> entry : payloads.entrySet()) {
+ String payloadKey = entry.getKey().toLowerCase();
+ Object payloadValue = entry.getValue();
+ ProviderAdapter providerAdapter = notifierMap.get(payloadKey);
+ if (providerAdapter != null) {
+ Object translatedPayload = payloadValue != null ? providerAdapter.translatePayload(payloadValue) : null;
+ if (translatedPayload != null) {
+ translatedPayloads.put(payloadKey, translatedPayload);
+ }
+ }
+ }
+ return translatedPayloads;
+ }
+
+ public static String getQueueNames(Properties properties) {
+ String name = properties.getProperty(ApplicationQueueManagerImpl.DEFAULT_QUEUE_PROPERTY, ApplicationQueueManagerImpl.DEFAULT_QUEUE_NAME);
+ return name;
+ }
+
+ private static final class IteratorObservable<T> implements rx.Observable.OnSubscribe<T> {
+ private final Iterator<T> input;
+ private IteratorObservable( final Iterator input ) {this.input = input;}
+
+ @Override
+ public void call( final Subscriber<? super T> subscriber ) {
+
+ /**
+ * You would replace this code with your file reading. Instead of emitting from an iterator,
+ * you would create a bean object that represents the entity, and then emit it
+ */
+
+ try {
+ while ( !subscriber.isUnsubscribed() && input.hasNext() ) {
+ //send our input to the next
+ subscriber.onNext( (T) input.next() );
+ }
+
+ //tell the subscriber we don't have any more data
+ subscriber.onCompleted();
+ }
+ catch ( Throwable t ) {
+ LOG.error("failed on subscriber",t);
+ subscriber.onError( t );
+ }
+ }
+ }
+
+ @Override
+ public void asyncCheckForInactiveDevices() throws Exception {
+ Collection<ProviderAdapter> providerAdapters = getAdapterMap().values();
+ for (final ProviderAdapter providerAdapter : providerAdapters) {
+ try {
+ if (providerAdapter != null) {
+ LOG.debug("checking notifier {} for inactive devices", providerAdapter.getNotifier());
+ providerAdapter.removeInactiveDevices();
+
+ LOG.debug("finished checking notifier {} for inactive devices",providerAdapter.getNotifier());
+ }
+ } catch (Exception e) {
+ LOG.error("checkForInactiveDevices", e); // not
+ // essential so
+ // don't fail,
+ // but log
+ }
+ }
+ }
+
+
+ private boolean isOkToSend(Notification notification) {
+ Map<String,Long> stats = notification.getStatistics();
+ if (stats != null && notification.getExpectedCount() == (stats.get("sent")+ stats.get("errors"))) {
+ LOG.info("notification {} already processed. not sending.",
+ notification.getUuid());
+ return false;
+ }
+ if (notification.getCanceled() == Boolean.TRUE) {
+ LOG.info("notification {} canceled. not sending.",
+ notification.getUuid());
+ return false;
+ }
+ if (notification.isExpired()) {
+ LOG.info("notification {} expired. not sending.",
+ notification.getUuid());
+ return false;
+ }
+ return true;
+ }
+
+ private List<EntityRef> getDevices(EntityRef ref) throws Exception {
+ List<EntityRef> devices = Collections.EMPTY_LIST;
+ if ("device".equals(ref.getType())) {
+ devices = Collections.singletonList(ref);
+ } else if ("user".equals(ref.getType())) {
+ devices = em.getCollection(ref, "devices", null, Query.MAX_LIMIT,
+ Query.Level.REFS, false).getRefs();
+ } else if ("group".equals(ref.getType())) {
+ devices = new ArrayList<EntityRef>();
+ for (EntityRef r : em.getCollection(ref, "users", null,
+ Query.MAX_LIMIT, Query.Level.REFS, false).getRefs()) {
+ devices.addAll(getDevices(r));
+ }
+ }
+ return devices;
+ }
+
+
+ private String getProviderId(EntityRef device, Notifier notifier) throws Exception {
+ try {
+ Object value = em.getProperty(device, notifier.getName() + NOTIFIER_ID_POSTFIX);
+ if (value == null) {
+ value = em.getProperty(device, notifier.getUuid() + NOTIFIER_ID_POSTFIX);
+ }
+ return value != null ? value.toString() : null;
+ } catch (Exception e) {
+ LOG.error("Errer getting provider ID, proceding with rest of batch", e);
+ return null;
+ }
+ }
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8f4720db/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
index 2a4ec73..02f881d 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
@@ -16,10 +16,8 @@
*/
package org.apache.usergrid.services.notifications.apns;
-import com.relayrides.pushy.apns.*;
import com.relayrides.pushy.apns.util.*;
import org.apache.commons.io.IOUtils;
-import org.apache.usergrid.services.ServiceParameter;
import org.apache.usergrid.persistence.*;
import org.apache.usergrid.persistence.entities.*;
import org.apache.usergrid.persistence.index.query.Query;
@@ -30,7 +28,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.InputStream;
-import java.net.SocketException;
import java.util.*;
import org.apache.usergrid.services.ServiceAction;
@@ -40,7 +37,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
-import static org.apache.usergrid.services.notifications.NotificationsService.NOTIFIER_ID_POSTFIX;
+import static org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl.NOTIFIER_ID_POSTFIX;
// todo: test reschedule on delivery time change
// todo: test restart of queuing
@@ -68,7 +65,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
@BeforeClass
public static void setup(){
- ApplicationQueueManager.DEFAULT_QUEUE_NAME = "test";
+
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8f4720db/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
index 3795be1..ad1c9f2 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
@@ -18,22 +18,19 @@ package org.apache.usergrid.services.notifications.gcm;
import org.apache.usergrid.persistence.*;
import org.apache.usergrid.persistence.entities.*;
-import org.apache.usergrid.persistence.index.query.Query;
-import org.apache.usergrid.services.ServiceParameter;
import org.apache.usergrid.services.TestQueueManager;
import org.apache.usergrid.services.notifications.*;
import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.lang.reflect.Field;
import java.util.*;
import org.apache.usergrid.services.ServiceAction;
import static org.junit.Assert.*;
-import static org.apache.usergrid.services.notifications.NotificationsService.NOTIFIER_ID_POSTFIX;
+import static org.apache.usergrid.services.notifications.ApplicationQueueManager.NOTIFIER_ID_POSTFIX;
public class NotificationsServiceIT extends AbstractServiceNotificationIT {
@@ -59,7 +56,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
@BeforeClass
public static void setup(){
- ApplicationQueueManager.DEFAULT_QUEUE_NAME = "test";
+
}
@Override
@Before
[03/22] git commit: fix build error
Posted by sn...@apache.org.
fix build error
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/19aa1a62
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/19aa1a62
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/19aa1a62
Branch: refs/heads/two-dot-o-events
Commit: 19aa1a620ac40eddb7e663c5510b83aa6476da94
Parents: e1b2e73
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Oct 24 16:11:52 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Oct 24 16:11:52 2014 -0600
----------------------------------------------------------------------
.../services/notifications/QueueListener.java | 16 ++++++++++++----
.../services/notifications/TestAdapter.java | 4 +---
2 files changed, 13 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/19aa1a62/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 432ad7f..48c110b 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -144,6 +144,8 @@ public class QueueListener {
QueueScope queueScope = queueScopeFactory.getScope(smf.getManagementAppId(), queueName);
QueueManager queueManager = TEST_QUEUE_MANAGER != null ? TEST_QUEUE_MANAGER : queueManagerFactory.getQueueManager(queueScope);
// run until there are no more active jobs
+ long runCount = 0;
+ Map<UUID,ApplicationQueueManager> queueManagerMap = new ConcurrentHashMap<>(); //keep a cache of queuemangers then clear them at an interval
while ( true ) {
try {
@@ -152,7 +154,7 @@ public class QueueListener {
LOG.info("retrieved batch of {} messages from queue {} ", messages.size(),queueName);
if (messages.size() > 0) {
- Map<UUID,ApplicationQueueManager> queueManagerMap = new ConcurrentHashMap<>();
+ runCount++;
HashMap<UUID, List<QueueMessage>> messageMap = new HashMap<>(messages.size());
//group messages into hash map by app id
for (QueueMessage message : messages) {
@@ -197,9 +199,7 @@ public class QueueListener {
merge.toBlocking().lastOrDefault(null);
}
queueManager.commitMessages(messages);
- for(ApplicationQueueManager applicationQueueManager : queueManagerMap.values()){
- applicationQueueManager.asyncCheckForInactiveDevices();
- }
+
meter.mark(messages.size());
LOG.info("sent batch {} messages duration {} ms", messages.size(),System.currentTimeMillis() - now);
@@ -207,6 +207,14 @@ public class QueueListener {
LOG.info("sleep between rounds...sleep...{}", sleepBetweenRuns);
Thread.sleep(sleepBetweenRuns);
}
+ if(runCount % 100 == 0){
+ for(ApplicationQueueManager applicationQueueManager : queueManagerMap.values()){
+ applicationQueueManager.asyncCheckForInactiveDevices();
+ }
+ //clear everything
+ queueManagerMap.clear();
+ runCount=0;
+ }
}
else{
LOG.info("no messages...sleep...{}", sleepWhenNoneFound);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/19aa1a62/stack/services/src/main/java/org/apache/usergrid/services/notifications/TestAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TestAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TestAdapter.java
index 2f28364..b4eb767 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TestAdapter.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TestAdapter.java
@@ -89,10 +89,8 @@ public class TestAdapter implements ProviderAdapter {
}
@Override
- public Map<String, Date> getInactiveDevices(Notifier notifier,
- EntityManager em) throws Exception {
+ public void removeInactiveDevices(Notifier notifier, EntityManager em) throws Exception {
log.debug("getInactiveDevices()");
- return null;
}
@Override