You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/10/06 15:01:23 UTC

[01/11] usergrid git commit: Add new dispatcher for blocking io actors.

Repository: usergrid
Updated Branches:
  refs/heads/usergrid-1318-queue fbce160a1 -> 71fe06fec


Add new dispatcher for blocking io actors.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/10ac8d08
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/10ac8d08
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/10ac8d08

Branch: refs/heads/usergrid-1318-queue
Commit: 10ac8d0849b64517da7f85fc5fa3da68d73bef9d
Parents: fdb7c27
Author: Michael Russo <mr...@apigee.com>
Authored: Fri Sep 23 12:25:54 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Fri Sep 23 12:25:54 2016 -0700

----------------------------------------------------------------------
 .../persistence/actorsystem/ActorSystemFig.java | 41 +++++++++++++++++++-
 .../actorsystem/ActorSystemManagerImpl.java     | 10 +++++
 .../uniquevalues/UniqueValuesRouter.java        |  4 +-
 3 files changed, 53 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/10ac8d08/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java
index 5d7b6aa..6980e45 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java
@@ -40,6 +40,15 @@ public interface ActorSystemFig extends GuicyFig, Serializable {
 
     String CLUSTER_PORT = "usergrid.cluster.port";
 
+    String CLUSTER_IO_EXECUTOR_TYPE = "usergrid.cluster.io.executor";
+
+    String CLUSTER_IO_EXECUTOR_THREAD_POOL_SIZE = "usergrid.cluster.io.thread-pool-size";
+
+    String CLUSTER_IO_EXECUTOR_REJECTION_POLICY = "usergrid.cluster.io.rejection-policy";
+
+
+
+
 
     /**
      * Use Cluster or nah
@@ -76,8 +85,38 @@ public interface ActorSystemFig extends GuicyFig, Serializable {
     @Default("2551")
     String getPort();
 
-
+    /**
+     *  Hostname used for advertising to the cluster what itself should be reference as
+     */
     @Key("usergrid.cluster.hostname")
     @Default("")
     String getHostname();
+
+    /**
+     *  Possible executor types for any blocking IO actors in the actor system.
+     */
+    @Key(CLUSTER_IO_EXECUTOR_TYPE)
+    @Default("thread-pool-executor")
+    String getClusterIoExecutorType();
+
+    /**
+     *  Number of threads to be used when using the fixed thread pool size in the blocking IO executor
+     *  Not relevant if anything other than "thread-pool-executor" is configured.
+     */
+    @Key(CLUSTER_IO_EXECUTOR_THREAD_POOL_SIZE)
+    @Default("25")
+    int getClusterIoExecutorThreadPoolSize();
+
+    /** Only used with "thread-pool-executor" and the following values are valid:
+     *
+     *  abort-policy
+     *  caller-runs-policy
+     *  discard-oldest-policy
+     *  discard-policy
+     *
+     *  Not relevant if anything other than "thread-pool-executor" is configured.
+     */
+    @Key(CLUSTER_IO_EXECUTOR_REJECTION_POLICY)
+    @Default("caller-runs-policy")
+    String getClusterIoExecutorRejectionPolicy();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/10ac8d08/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
index ed9344c..7e7df9c 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
@@ -293,6 +293,16 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
 
                 put( "akka", new HashMap<String, Object>() {{
 
+                    put( "blocking-io-dispatcher", new HashMap<String, Object>() {{
+                        put( "type", "Dispatcher" );
+                        put( "executor", actorSystemFig.getClusterIoExecutorType() );
+                        put( actorSystemFig.getClusterIoExecutorType() , new HashMap<String, Object>() {{
+                            put( "fixed-pool-size", actorSystemFig.getClusterIoExecutorThreadPoolSize() );
+                            put( "rejection-policy",actorSystemFig.getClusterIoExecutorRejectionPolicy() );
+                        }} );
+                    }} );
+
+
                     put( "remote", new HashMap<String, Object>() {{
                         put( "netty.tcp", new HashMap<String, Object>() {{
                             put( "hostname", hostname );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/10ac8d08/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java
index 376af66..7176202 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java
@@ -42,7 +42,9 @@ public class UniqueValuesRouter extends UntypedActor {
     public UniqueValuesRouter(Injector injector ) {
 
         router = getContext().actorOf(
-            FromConfig.getInstance().props(Props.create(UniqueValueActor.class)), "router");
+            FromConfig.getInstance()
+                .props(Props.create(UniqueValueActor.class)
+                    .withDispatcher("blocking-io-dispatcher")), "router");
 
         // TODO: is there some way to pass the injector here without getting this exception:
         // NotSerializableException: No configured serialization-bindings for class [InjectorImpl]


[08/11] usergrid git commit: Logging improvements, fix to CassandraConfig

Posted by sn...@apache.org.
Logging improvements, fix to CassandraConfig


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/a254ee67
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/a254ee67
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/a254ee67

Branch: refs/heads/usergrid-1318-queue
Commit: a254ee67c5f3f3a2383ea2417e0e6d8ea88576a9
Parents: fbce160
Author: Dave Johnson <sn...@apache.org>
Authored: Thu Oct 6 10:56:02 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Thu Oct 6 10:56:02 2016 -0400

----------------------------------------------------------------------
 .../apache/usergrid/persistence/core/CassandraConfig.java   | 2 --
 .../usergrid/persistence/core/CassandraConfigImpl.java      | 2 +-
 .../persistence/core/datastax/impl/DataStaxClusterImpl.java | 9 ++++-----
 3 files changed, 5 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/a254ee67/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java
index 499561e..7bd2a74 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java
@@ -20,8 +20,6 @@ package org.apache.usergrid.persistence.core;
 
 
 import com.netflix.astyanax.model.ConsistencyLevel;
-import org.apache.cassandra.db.marshal.AbstractCompositeType;
-import org.apache.log4j.lf5.viewer.categoryexplorer.CategoryPath;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/usergrid/blob/a254ee67/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
index 9cdec95..1503093 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
@@ -205,7 +205,7 @@ public class CassandraConfigImpl implements CassandraConfig {
 
     @Override
     public String getStrategy() {
-        return cassandraFig.getStrategyLocal();
+        return cassandraFig.getStrategy();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/a254ee67/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
index 67f6123..b36adab 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
@@ -48,15 +48,13 @@ public class DataStaxClusterImpl implements DataStaxCluster {
         this.cassandraConfig = cassandraFig;
         this.cluster = buildCluster();
 
-        // always initialize the keyspaces
-        this.createApplicationKeyspace();
-
         logger.info("Initialized datastax cluster client. Hosts={}, Idle Timeout={}s,  Pool Timeout={}s",
             cluster.getMetadata().getAllHosts().toString(),
             cluster.getConfiguration().getPoolingOptions().getIdleTimeoutSeconds(),
             cluster.getConfiguration().getPoolingOptions().getPoolTimeoutMillis() / 1000);
 
-
+        // always initialize the keyspaces
+        this.createApplicationKeyspace();
     }
 
     @Override
@@ -149,7 +147,8 @@ public class DataStaxClusterImpl implements DataStaxCluster {
         final String createQueueMessageKeyspace = String.format(
             "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = %s",
             CQLUtils.quote( cassandraConfig.getApplicationLocalKeyspace()),
-            CQLUtils.getFormattedReplication( cassandraConfig.getStrategyLocal(), cassandraConfig.getStrategyOptionsLocal())
+            CQLUtils.getFormattedReplication(
+                cassandraConfig.getStrategyLocal(), cassandraConfig.getStrategyOptionsLocal())
 
         );
 


[03/11] usergrid git commit: Move failure detector settings to cluster overall. Temporarily stop downing nodes when unreachable at application layer.

Posted by sn...@apache.org.
Move failure detector settings to cluster overall. Temporarily stop downing nodes when unreachable at application layer.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/50b936d5
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/50b936d5
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/50b936d5

Branch: refs/heads/usergrid-1318-queue
Commit: 50b936d563901683bfcc1ab35f9ad34494c1c95d
Parents: 2a89d2d
Author: Michael Russo <mr...@apigee.com>
Authored: Mon Sep 26 11:14:55 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Mon Sep 26 11:14:55 2016 -0700

----------------------------------------------------------------------
 .../actorsystem/ActorSystemManagerImpl.java     | 11 +++++++++-
 .../actorsystem/ClusterListener.java            | 21 ++++++++++++++------
 .../uniquevalues/UniqueValuesServiceImpl.java   | 10 +---------
 3 files changed, 26 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/50b936d5/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
index 7e7df9c..1021b1a 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
@@ -20,6 +20,7 @@ package org.apache.usergrid.persistence.actorsystem;
 
 
 import akka.actor.*;
+import akka.cluster.Cluster;
 import akka.cluster.client.ClusterClient;
 import akka.cluster.client.ClusterClientReceptionist;
 import akka.cluster.client.ClusterClientSettings;
@@ -312,13 +313,21 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
                     }} );
 
                     put( "cluster", new HashMap<String, Object>() {{
-                        put( "max-nr-of-instances-per-node", numInstancesPerNode);
+                        put( "max-nr-of-instances-per-node", numInstancesPerNode); // this sets default if router does not set
                         put( "roles", Collections.singletonList("io") );
                         put( "seed-nodes", new ArrayList<String>() {{
                             for (String seed : seeds) {
                                 add( seed );
                             }
                         }} );
+                        put( "failure-detector", new HashMap<String, Object>() {{
+                            put( "threshold", "20" );
+                            put( "acceptable-heartbeat-pause", "6 s" );
+                            put( "heartbeat-interval", "1 s" );
+                            put( "heartbeat-request", new HashMap<String, Object>() {{
+                                put( "expected-response-after", "3 s" );
+                            }} );
+                        }} );
                     }} );
 
                 }} );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/50b936d5/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClusterListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClusterListener.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClusterListener.java
index a568295..d0a758d 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClusterListener.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClusterListener.java
@@ -78,10 +78,12 @@ public class ClusterListener extends UntypedActor {
                 java.lang.Runtime.getRuntime().exec("ping -c 1 "+hostname).waitFor() == 0;
             if(networkReachable){
 
-                logger.info("Unreachable member {} is accessible on the network, " +
-                    "application must have died. Marking member down", event.member());
+                logger.info("Unreachable member {} is accessible on the network.", event.member());
 
-                cluster.down(event.member().address());
+//                logger.info("Unreachable member {} is accessible on the network, " +
+//                    "application must have died. Marking member down", event.member());
+//
+//                cluster.down(event.member().address());
             }else{
 
                 logger.warn("Unreachable member {} is not accessible on the network, " +
@@ -95,9 +97,16 @@ public class ClusterListener extends UntypedActor {
 
         } else if (message instanceof ClusterEvent.MemberEvent) {
             ClusterEvent.MemberEvent event = (ClusterEvent.MemberEvent) message;
-            if(logger.isTraceEnabled()){
-                logger.trace("MemberEvent occurred for member: {}, Event: {}", event.member(), event.toString());
-            }
+            logger.info("MemberEvent occurred for member: {}, Event: {}", event.member(), event.toString());
+
+        } else if (message instanceof ClusterEvent.LeaderChanged) {
+            ClusterEvent.LeaderChanged event = (ClusterEvent.LeaderChanged) message;
+            logger.info("LeaderChanged occurred for leader: {}, getLeader: {}, Event: {}",
+                event.leader(), event.getLeader(), event.toString());
+
+        } else if (message instanceof ClusterEvent.MemberExited) {
+            ClusterEvent.MemberExited event = (ClusterEvent.MemberExited) message;
+            logger.info("MemberExited occurred for member: {}, Event: {}", event.member(), event.toString());
 
         } else {
             unhandled(message);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/50b936d5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
index 82dc8cc..1b13d01 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
@@ -342,15 +342,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
                 put( "enabled", "on" );
                 put( "allow-local-routees", "on" );
                 put( "use-role", "io" );
-                put( "max-nr-of-instances-per-node", numInstancesPerNode );
-                put( "failure-detector", new HashMap<String, Object>() {{
-                    put( "threshold", "20" );
-                    put( "acceptable-heartbeat-pause", "6 s" );
-                    put( "heartbeat-interval", "1 s" );
-                    put( "heartbeat-request", new HashMap<String, Object>() {{
-                        put( "expected-response-after", "3 s" );
-                    }} );
-                }} );
+                put( "max-nr-of-instances-per-node", numInstancesPerNode ); // this sets value specific to this router
             }} );
         }} );
 


[02/11] usergrid git commit: Update the threshold settings.

Posted by sn...@apache.org.
Update the threshold settings.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2a89d2d9
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2a89d2d9
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2a89d2d9

Branch: refs/heads/usergrid-1318-queue
Commit: 2a89d2d9a9f17c97724f5ca7f28822b4d6d74f0b
Parents: 10ac8d0
Author: Michael Russo <mr...@apigee.com>
Authored: Fri Sep 23 13:12:59 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Fri Sep 23 13:12:59 2016 -0700

----------------------------------------------------------------------
 .../persistence/collection/uniquevalues/UniqueValuesRouter.java  | 2 +-
 .../collection/uniquevalues/UniqueValuesServiceImpl.java         | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/2a89d2d9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java
index 7176202..47db3a5 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java
@@ -44,7 +44,7 @@ public class UniqueValuesRouter extends UntypedActor {
         router = getContext().actorOf(
             FromConfig.getInstance()
                 .props(Props.create(UniqueValueActor.class)
-                    .withDispatcher("blocking-io-dispatcher")), "router");
+                    .withDispatcher("akka.blocking-io-dispatcher")), "router");
 
         // TODO: is there some way to pass the injector here without getting this exception:
         // NotSerializableException: No configured serialization-bindings for class [InjectorImpl]

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2a89d2d9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
index 8bdb02c..82dc8cc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
@@ -344,8 +344,8 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
                 put( "use-role", "io" );
                 put( "max-nr-of-instances-per-node", numInstancesPerNode );
                 put( "failure-detector", new HashMap<String, Object>() {{
-                    put( "threshold", "10" );
-                    put( "acceptable-heartbeat-pause", "3 s" );
+                    put( "threshold", "20" );
+                    put( "acceptable-heartbeat-pause", "6 s" );
                     put( "heartbeat-interval", "1 s" );
                     put( "heartbeat-request", new HashMap<String, Object>() {{
                         put( "expected-response-after", "3 s" );


[09/11] usergrid git commit: Logging improvements, fix to CassandraConfig

Posted by sn...@apache.org.
Logging improvements, fix to CassandraConfig


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/907fd0c1
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/907fd0c1
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/907fd0c1

Branch: refs/heads/usergrid-1318-queue
Commit: 907fd0c185c10b0c25e5a250915f599a62e87d37
Parents: a254ee6
Author: Dave Johnson <sn...@apache.org>
Authored: Thu Oct 6 10:56:40 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Thu Oct 6 10:56:40 2016 -0400

----------------------------------------------------------------------
 .../queue/src/test/resources/log4j.properties               | 9 +++------
 1 file changed, 3 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/907fd0c1/stack/corepersistence/queue/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/log4j.properties b/stack/corepersistence/queue/src/test/resources/log4j.properties
index eb45d2a..2653fd6 100644
--- a/stack/corepersistence/queue/src/test/resources/log4j.properties
+++ b/stack/corepersistence/queue/src/test/resources/log4j.properties
@@ -24,10 +24,7 @@ log4j.appender.stdout.layout.ConversionPattern=%d %p (%t) %c{1} - %m%n
 log4j.logger.org.apache.cassandra=WARN
 log4j.logger.org.glassfish=WARN
 
-#log4j.logger.org.apache.usergrid.persistence.actorsystem=DEBUG
-#log4j.logger.org.apache.usergrid.persistence.actorsystem=DEBUG
-#log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG
-
-log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG
-log4j.logger.org.apache.usergrid.persistence.queue=DEBUG
+log4j.logger.org.apache.usergrid=INFO
+log4j.logger.org.apache.usergrid.persistence.qakka=INFO
+log4j.logger.org.apache.usergrid.persistence.queue=INFO
 log4j.logger.org.apache.usergrid.corepersistence.asyncevents=INFO


[07/11] usergrid git commit: Merge branch 'hotfix-20160819'

Posted by sn...@apache.org.
Merge branch 'hotfix-20160819'


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/77d20269
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/77d20269
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/77d20269

Branch: refs/heads/usergrid-1318-queue
Commit: 77d2026907b03625ad7e1ef742c8656712497c8d
Parents: 9947d48 ef8899a
Author: Michael Russo <mr...@apigee.com>
Authored: Fri Sep 30 18:15:27 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Fri Sep 30 18:15:27 2016 -0700

----------------------------------------------------------------------
 .../asyncevents/AsyncEventServiceImpl.java      | 20 +++-------
 .../index/IndexProcessorFig.java                |  2 +-
 .../index/ReIndexServiceImpl.java               | 37 ++++++++---------
 stack/corepersistence/actorsystem/pom.xml       | 12 +++---
 .../persistence/actorsystem/ActorSystemFig.java | 41 ++++++++++++++++++-
 .../actorsystem/ActorSystemManager.java         |  2 +-
 .../actorsystem/ActorSystemManagerImpl.java     | 42 +++++++++++++++-----
 .../actorsystem/ClusterListener.java            | 23 +++++++----
 stack/corepersistence/collection/pom.xml        | 12 +++---
 .../EntityCollectionManagerFactoryImpl.java     |  2 +
 .../uniquevalues/UniqueValuesRouter.java        |  4 +-
 .../uniquevalues/UniqueValuesServiceImpl.java   | 10 +----
 .../apache/usergrid/rest/ShutdownListener.java  | 11 +----
 13 files changed, 131 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/77d20269/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/usergrid/blob/77d20269/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------


[11/11] usergrid git commit: Merge branch 'master' into usergrid-1318-queue

Posted by sn...@apache.org.
Merge branch 'master' into usergrid-1318-queue


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/71fe06fe
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/71fe06fe
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/71fe06fe

Branch: refs/heads/usergrid-1318-queue
Commit: 71fe06fec4f89451b5a9000bc728a7c2e4e5cf68
Parents: 907fd0c 3709535
Author: Dave Johnson <sn...@apache.org>
Authored: Thu Oct 6 10:57:38 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Thu Oct 6 10:57:38 2016 -0400

----------------------------------------------------------------------
 .../asyncevents/AsyncEventServiceImpl.java      | 20 +++-------
 .../index/IndexProcessorFig.java                |  2 +-
 .../index/ReIndexServiceImpl.java               | 37 ++++++++---------
 stack/corepersistence/actorsystem/pom.xml       | 12 +++---
 .../persistence/actorsystem/ActorSystemFig.java | 41 ++++++++++++++++++-
 .../actorsystem/ActorSystemManager.java         |  2 +-
 .../actorsystem/ActorSystemManagerImpl.java     | 42 +++++++++++++++-----
 .../actorsystem/ClusterListener.java            | 23 +++++++----
 stack/corepersistence/collection/pom.xml        | 12 +++---
 .../EntityCollectionManagerFactoryImpl.java     |  2 +
 .../uniquevalues/UniqueValuesRouter.java        |  4 +-
 .../uniquevalues/UniqueValuesServiceImpl.java   | 10 +----
 .../apache/usergrid/rest/ShutdownListener.java  | 11 +----
 13 files changed, 131 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/71fe06fe/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/usergrid/blob/71fe06fe/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/usergrid/blob/71fe06fe/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
----------------------------------------------------------------------


[05/11] usergrid git commit: Implement better self leave for akka clustering if JVM goes down.

Posted by sn...@apache.org.
Implement better self leave for akka clustering if JVM goes down.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/33319f36
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/33319f36
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/33319f36

Branch: refs/heads/usergrid-1318-queue
Commit: 33319f36a28b08f9230d5b6daabd19762b24058a
Parents: 027e40d
Author: Michael Russo <mr...@apigee.com>
Authored: Mon Sep 26 15:41:53 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Mon Sep 26 15:41:53 2016 -0700

----------------------------------------------------------------------
 .../actorsystem/ActorSystemManager.java         |  2 +-
 .../actorsystem/ActorSystemManagerImpl.java     | 21 ++++++++++----------
 .../actorsystem/ClusterListener.java            |  6 +++---
 .../apache/usergrid/rest/ShutdownListener.java  | 11 ++--------
 4 files changed, 17 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/33319f36/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
index 17754f0..322ac6a 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
@@ -75,5 +75,5 @@ public interface ActorSystemManager {
      */
     void publishToAllRegions( String topic, Object message, ActorRef sender );
 
-    void shutdownAll();
+    void leaveCluster();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/33319f36/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
index 1021b1a..cc32d1c 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
@@ -376,13 +376,13 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
                 }
             }
 
-            // add a shutdown hook to clean all actor systems if the JVM exits without the servlet container knowing
-//            Runtime.getRuntime().addShutdownHook(new Thread() {
-//                @Override
-//                public void run() {
-//                    shutdownAll();
-//                }
-//            });
+            //add a shutdown hook to clean all actor systems if the JVM exits without the servlet container knowing
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+                @Override
+                public void run() {
+                    leaveCluster();
+                }
+            });
 
         }
 
@@ -466,10 +466,11 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
     }
 
     @Override
-    public void shutdownAll(){
+    public void leaveCluster(){
 
-        logger.info("Shutting down Akka cluster: {}", clusterSystem.name());
-        clusterSystem.shutdown();
+        Cluster cluster = Cluster.get(clusterSystem);
+        logger.info("Downing self: {} from cluster: {}", cluster.selfAddress(), clusterSystem.name());
+        cluster.leave(cluster.selfAddress());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/33319f36/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClusterListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClusterListener.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClusterListener.java
index d0a758d..15bc372 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClusterListener.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClusterListener.java
@@ -81,13 +81,13 @@ public class ClusterListener extends UntypedActor {
                 logger.info("Unreachable member {} is accessible on the network.", event.member());
 
 //                logger.info("Unreachable member {} is accessible on the network, " +
-//                    "application must have died. Marking member down", event.member());
+//                    "application must have died. Removing member ", event.member());
 //
-//                cluster.down(event.member().address());
+//                cluster.leave(event.member().address());
             }else{
 
                 logger.warn("Unreachable member {} is not accessible on the network, " +
-                    "there must be a network issue. Not marking member down", event.member());
+                    "there must be a network issue. Not removing member", event.member());
 
             }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/33319f36/stack/rest/src/main/java/org/apache/usergrid/rest/ShutdownListener.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/ShutdownListener.java b/stack/rest/src/main/java/org/apache/usergrid/rest/ShutdownListener.java
index f3707a7..8c96473 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/ShutdownListener.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/ShutdownListener.java
@@ -20,21 +20,14 @@ package org.apache.usergrid.rest;
 
 import com.google.inject.Injector;
 import org.apache.usergrid.batch.service.JobSchedulerService;
-import org.apache.usergrid.batch.service.SchedulerService;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
-import org.apache.usergrid.persistence.cassandra.CassandraService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.context.ApplicationContext;
-import org.springframework.context.ApplicationContextAware;
 import org.springframework.web.context.support.WebApplicationContextUtils;
 
 import javax.servlet.ServletContextEvent;
 import javax.servlet.ServletContextListener;
-import javax.servlet.http.HttpSessionAttributeListener;
-import javax.servlet.http.HttpSessionEvent;
-import javax.servlet.http.HttpSessionListener;
-import javax.servlet.http.HttpSessionBindingEvent;
 import java.util.Properties;
 
 
@@ -71,8 +64,8 @@ public class ShutdownListener implements ServletContextListener {
         Injector injector = ctx.getBean(Injector.class);
         ActorSystemManager actorSystemManager = injector.getInstance(ActorSystemManager.class);
 
-        // stop the Akka actor system
-        //actorSystemManager.shutdownAll();
+        // leave akka cluster
+        actorSystemManager.leaveCluster();
 
         boolean started = Boolean.parseBoolean(
             properties.getProperty(JobServiceBoostrap.START_SCHEDULER_PROP, "true"));


[10/11] usergrid git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/usergrid

Posted by sn...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/usergrid

* 'master' of https://git-wip-us.apache.org/repos/asf/usergrid:
  Fix re-index memory leak with flatmap observable and speed up re-index.
  Implement better self leave for akka clustering if JVM goes down.
  Bump akka version to 2.4.10 ( lastest current stable version)
  Move failure detector settings to cluster overall. Temporarily stop downing nodes when unreachable at application layer.
  Update the threshold settings.
  Add new dispatcher for blocking io actors.
  allow no password when creating admin users when SSO-enabled
  Load properties into Guice by using the getProperty method instead of generic putAll from Map superclass as it getProperty could be overridden
  add count to paged organizations get, and fix limit=1000
  Refactor superuser authentication/login such that permissions are handled more appropriately, allowing access to all parts of the system.
  Enhance superuser basic auth filter to login to shiro with a token just like the sysadmin tokens.
  Adding paging to get all orgs endpoint.  Enhance Akka cluster so it properly downs nodes that are restarted but still ok on the network. Don't load all orgs when validating sysadmin creds.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/3709535f
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/3709535f
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/3709535f

Branch: refs/heads/usergrid-1318-queue
Commit: 3709535f96c2bb5fde4e71f997e95f14e7cb8354
Parents: 7fdba4a 77d2026
Author: Dave Johnson <sn...@apache.org>
Authored: Thu Oct 6 10:57:12 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Thu Oct 6 10:57:12 2016 -0400

----------------------------------------------------------------------
 .../asyncevents/AsyncEventServiceImpl.java      |  20 +---
 .../index/IndexProcessorFig.java                |   2 +-
 .../index/ReIndexServiceImpl.java               |  37 +++---
 stack/corepersistence/actorsystem/pom.xml       |  12 +-
 .../persistence/actorsystem/ActorSystemFig.java |  41 ++++++-
 .../actorsystem/ActorSystemManager.java         |   2 +-
 .../actorsystem/ActorSystemManagerImpl.java     |  57 ++++++---
 .../actorsystem/ClusterListener.java            | 116 +++++++++++++++++++
 stack/corepersistence/collection/pom.xml        |  12 +-
 .../EntityCollectionManagerFactoryImpl.java     |   2 +
 .../uniquevalues/UniqueValuesRouter.java        |   4 +-
 .../uniquevalues/UniqueValuesServiceImpl.java   |  10 +-
 .../apache/usergrid/rest/ShutdownListener.java  |  11 +-
 .../organizations/OrganizationsResource.java    |  48 ++++++--
 .../security/SecuredResourceFilterFactory.java  |  35 +++---
 .../shiro/filters/BasicAuthSecurityFilter.java  |  54 ++++++++-
 .../rest/applications/ApplicationDeleteIT.java  |   1 -
 .../usergrid/rest/applications/SecurityIT.java  |  31 +++++
 .../activities/ActivityResourceIT.java          |   5 -
 .../test/resource/endpoints/NamedResource.java  |   9 ++
 .../cassandra/ManagementServiceImpl.java        |   5 +-
 .../AbstractPasswordCredentials.java            |   3 +-
 .../shiro/credentials/AdminUserPassword.java    |   2 +
 .../shiro/principals/AdminUserPrincipal.java    |  25 +---
 .../usergrid/services/AbstractService.java      |   8 ++
 25 files changed, 408 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/3709535f/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------


[06/11] usergrid git commit: Fix re-index memory leak with flatmap observable and speed up re-index.

Posted by sn...@apache.org.
Fix re-index memory leak with flatmap observable and speed up re-index.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/ef8899a1
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/ef8899a1
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/ef8899a1

Branch: refs/heads/usergrid-1318-queue
Commit: ef8899a100b8488d4dfd528ce94a1cb8bea582fe
Parents: 33319f3
Author: Michael Russo <mr...@apigee.com>
Authored: Fri Sep 30 18:14:37 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Fri Sep 30 18:14:37 2016 -0700

----------------------------------------------------------------------
 .../asyncevents/AsyncEventServiceImpl.java      | 20 ++++-------
 .../index/IndexProcessorFig.java                |  2 +-
 .../index/ReIndexServiceImpl.java               | 37 +++++++++-----------
 .../EntityCollectionManagerFactoryImpl.java     |  2 ++
 4 files changed, 25 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/ef8899a1/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 0bff887..a108e40 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -876,23 +876,15 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
     public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
 
-        IndexOperationMessage batch = new IndexOperationMessage();
+        final List<EntityIndexEvent> batch = new ArrayList<>();
+        edges.forEach(e -> {
 
-        for ( EdgeScope e : edges){
+            //change to id scope to avoid serialization issues
+            batch.add(new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince));
 
-            EntityIndexOperation entityIndexOperation =
-                new EntityIndexOperation( e.getApplicationScope(), e.getEdge().getTargetNode(), updatedSince);
-
-            IndexOperationMessage indexOperationMessage =
-                eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null);
-
-            if (indexOperationMessage != null){
-                batch.ingest(indexOperationMessage);
-            }
-
-        }
+        });
 
-        queueIndexOperationMessage(batch);
+        offerBatch( batch );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/ef8899a1/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index c05c047..1038408 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -90,7 +90,7 @@ public interface IndexProcessorFig extends GuicyFig {
     @Key(ELASTICSEARCH_QUEUE_IMPL)
     String getQueueImplementation();
 
-    @Default("100")
+    @Default("500")
     @Key(REINDEX_BUFFER_SIZE)
     int getReindexBufferSize();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/ef8899a1/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index e3b179d..19fbcfa 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -24,6 +24,7 @@ import java.time.Instant;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
@@ -113,6 +114,8 @@ public class ReIndexServiceImpl implements ReIndexService {
 
         //load our last emitted Scope if a cursor is present
 
+        final AtomicInteger count = new AtomicInteger();
+
         final Optional<EdgeScope> cursor = parseCursor( reIndexRequestBuilder.getCursor() );
 
 
@@ -161,29 +164,21 @@ public class ReIndexServiceImpl implements ReIndexService {
 
         }
 
-        Observable<List<EdgeScope>> runningReIndex = allEntityIdsObservable.getEdgesToEntities( applicationScopes,
+        allEntityIdsObservable.getEdgesToEntities( applicationScopes,
             reIndexRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() )
-
             .buffer( indexProcessorFig.getReindexBufferSize())
-            .flatMap( edgeScopes -> Observable.just(edgeScopes)
-                .doOnNext(edges -> {
-
-                    logger.info("Sending batch of {} to be indexed.", edges.size());
-                    indexService.indexBatch(edges, modifiedSince);
-                })
-                .subscribeOn( Schedulers.io() ), indexProcessorFig.getReindexConcurrencyFactor());
-
-
-        // start our sampler and state persistence
-        // take a sample every sample interval to allow us to resume state with minimal loss
-        // create our flushing collector and flush the edge scopes to it
-        runningReIndex.collect(() -> new FlushingCollector(jobId),
-            ((flushingCollector, edgeScopes) -> flushingCollector.flushBuffer(edgeScopes)))
-                .doOnNext( flushingCollector-> flushingCollector.complete() )
-                //subscribe on our I/O scheduler and run the task
-                .subscribeOn( Schedulers.io() ).subscribe(); //want reindex to continually run so leave subscribe.
-
-
+            .doOnNext( edgeScopes -> {
+                logger.info("Sending batch of {} to be indexed.", edgeScopes.size());
+                indexService.indexBatch(edgeScopes, modifiedSince);
+                count.addAndGet(edgeScopes.size() );
+                if( edgeScopes.size() > 0 ) {
+                    writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1));
+                }
+                writeStateMeta( jobId, Status.INPROGRESS, count.get(), System.currentTimeMillis() ); })
+            .doOnCompleted(() -> writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() ))
+            .subscribeOn( Schedulers.io() ).subscribe();
+
+        
         return new ReIndexStatus( jobId, Status.STARTED, 0, 0 );
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/ef8899a1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
index fcaa51d..aa962dd 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@ -44,6 +44,8 @@ import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.ExecutionException;
 


[04/11] usergrid git commit: Bump akka version to 2.4.10 ( lastest current stable version)

Posted by sn...@apache.org.
Bump akka version to 2.4.10 ( lastest current stable version)


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/027e40dd
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/027e40dd
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/027e40dd

Branch: refs/heads/usergrid-1318-queue
Commit: 027e40ddaebffbc365850158c527fa930a0e175f
Parents: 50b936d
Author: Michael Russo <mr...@apigee.com>
Authored: Mon Sep 26 12:28:14 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Mon Sep 26 12:28:14 2016 -0700

----------------------------------------------------------------------
 stack/corepersistence/actorsystem/pom.xml | 12 ++++++------
 stack/corepersistence/collection/pom.xml  | 12 ++++++------
 2 files changed, 12 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/027e40dd/stack/corepersistence/actorsystem/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/pom.xml b/stack/corepersistence/actorsystem/pom.xml
index 1933c65..a17c6d3 100644
--- a/stack/corepersistence/actorsystem/pom.xml
+++ b/stack/corepersistence/actorsystem/pom.xml
@@ -35,37 +35,37 @@
         <dependency>
             <groupId>com.typesafe.akka</groupId>
             <artifactId>akka-actor_2.11</artifactId>
-            <version>2.4.0</version>
+            <version>2.4.10</version>
         </dependency>
 
         <dependency>
             <groupId>com.typesafe.akka</groupId>
             <artifactId>akka-remote_2.11</artifactId>
-            <version>2.4.0</version>
+            <version>2.4.10</version>
         </dependency>
 
         <dependency>
             <groupId>com.typesafe.akka</groupId>
             <artifactId>akka-cluster_2.11</artifactId>
-            <version>2.4.0</version>
+            <version>2.4.10</version>
         </dependency>
 
         <dependency>
             <groupId>com.typesafe.akka</groupId>
             <artifactId>akka-cluster-tools_2.11</artifactId>
-            <version>2.4.0</version>
+            <version>2.4.10</version>
         </dependency>
 
         <dependency>
             <groupId>com.typesafe.akka</groupId>
             <artifactId>akka-cluster-metrics_2.11</artifactId>
-            <version>2.4.0</version>
+            <version>2.4.10</version>
         </dependency>
 
         <dependency>
             <groupId>com.typesafe.akka</groupId>
             <artifactId>akka-slf4j_2.11</artifactId>
-            <version>2.4.0</version>
+            <version>2.4.10</version>
         </dependency>
 
         <dependency>

http://git-wip-us.apache.org/repos/asf/usergrid/blob/027e40dd/stack/corepersistence/collection/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/pom.xml b/stack/corepersistence/collection/pom.xml
index 20f8612..2c7702b 100644
--- a/stack/corepersistence/collection/pom.xml
+++ b/stack/corepersistence/collection/pom.xml
@@ -50,37 +50,37 @@
         <dependency>
             <groupId>com.typesafe.akka</groupId>
             <artifactId>akka-actor_2.11</artifactId>
-            <version>2.4.0</version>
+            <version>2.4.10</version>
         </dependency>
 
         <dependency>
             <groupId>com.typesafe.akka</groupId>
             <artifactId>akka-remote_2.11</artifactId>
-            <version>2.4.0</version>
+            <version>2.4.10</version>
         </dependency>
 
         <dependency>
             <groupId>com.typesafe.akka</groupId>
             <artifactId>akka-cluster_2.11</artifactId>
-            <version>2.4.0</version>
+            <version>2.4.10</version>
         </dependency>
 
         <dependency>
             <groupId>com.typesafe.akka</groupId>
             <artifactId>akka-cluster-tools_2.11</artifactId>
-            <version>2.4.0</version>
+            <version>2.4.10</version>
         </dependency>
 
         <dependency>
             <groupId>com.typesafe.akka</groupId>
             <artifactId>akka-cluster-metrics_2.11</artifactId>
-            <version>2.4.0</version>
+            <version>2.4.10</version>
         </dependency>
 
         <dependency>
             <groupId>com.typesafe.akka</groupId>
             <artifactId>akka-slf4j_2.11</artifactId>
-            <version>2.4.0</version>
+            <version>2.4.10</version>
         </dependency>
 
         <!-- tests -->