You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/10/01 01:15:38 UTC
[1/7] usergrid git commit: Add new dispatcher for blocking io actors.
Repository: usergrid
Updated Branches:
refs/heads/master 9947d488d -> 77d202690
Add new dispatcher for blocking io actors.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/10ac8d08
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/10ac8d08
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/10ac8d08
Branch: refs/heads/master
Commit: 10ac8d0849b64517da7f85fc5fa3da68d73bef9d
Parents: fdb7c27
Author: Michael Russo <mr...@apigee.com>
Authored: Fri Sep 23 12:25:54 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Fri Sep 23 12:25:54 2016 -0700
----------------------------------------------------------------------
.../persistence/actorsystem/ActorSystemFig.java | 41 +++++++++++++++++++-
.../actorsystem/ActorSystemManagerImpl.java | 10 +++++
.../uniquevalues/UniqueValuesRouter.java | 4 +-
3 files changed, 53 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/10ac8d08/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java
index 5d7b6aa..6980e45 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java
@@ -40,6 +40,15 @@ public interface ActorSystemFig extends GuicyFig, Serializable {
String CLUSTER_PORT = "usergrid.cluster.port";
+ String CLUSTER_IO_EXECUTOR_TYPE = "usergrid.cluster.io.executor";
+
+ String CLUSTER_IO_EXECUTOR_THREAD_POOL_SIZE = "usergrid.cluster.io.thread-pool-size";
+
+ String CLUSTER_IO_EXECUTOR_REJECTION_POLICY = "usergrid.cluster.io.rejection-policy";
+
+
+
+
/**
* Use Cluster or nah
@@ -76,8 +85,38 @@ public interface ActorSystemFig extends GuicyFig, Serializable {
@Default("2551")
String getPort();
-
+ /**
+ * Hostname used for advertising to the cluster what itself should be reference as
+ */
@Key("usergrid.cluster.hostname")
@Default("")
String getHostname();
+
+ /**
+ * Possible executor types for any blocking IO actors in the actor system.
+ */
+ @Key(CLUSTER_IO_EXECUTOR_TYPE)
+ @Default("thread-pool-executor")
+ String getClusterIoExecutorType();
+
+ /**
+ * Number of threads to be used when using the fixed thread pool size in the blocking IO executor
+ * Not relevant if anything other than "thread-pool-executor" is configured.
+ */
+ @Key(CLUSTER_IO_EXECUTOR_THREAD_POOL_SIZE)
+ @Default("25")
+ int getClusterIoExecutorThreadPoolSize();
+
+ /** Only used with "thread-pool-executor" and the following values are valid:
+ *
+ * abort-policy
+ * caller-runs-policy
+ * discard-oldest-policy
+ * discard-policy
+ *
+ * Not relevant if anything other than "thread-pool-executor" is configured.
+ */
+ @Key(CLUSTER_IO_EXECUTOR_REJECTION_POLICY)
+ @Default("caller-runs-policy")
+ String getClusterIoExecutorRejectionPolicy();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/10ac8d08/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
index ed9344c..7e7df9c 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
@@ -293,6 +293,16 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
put( "akka", new HashMap<String, Object>() {{
+ put( "blocking-io-dispatcher", new HashMap<String, Object>() {{
+ put( "type", "Dispatcher" );
+ put( "executor", actorSystemFig.getClusterIoExecutorType() );
+ put( actorSystemFig.getClusterIoExecutorType() , new HashMap<String, Object>() {{
+ put( "fixed-pool-size", actorSystemFig.getClusterIoExecutorThreadPoolSize() );
+ put( "rejection-policy",actorSystemFig.getClusterIoExecutorRejectionPolicy() );
+ }} );
+ }} );
+
+
put( "remote", new HashMap<String, Object>() {{
put( "netty.tcp", new HashMap<String, Object>() {{
put( "hostname", hostname );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/10ac8d08/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java
index 376af66..7176202 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java
@@ -42,7 +42,9 @@ public class UniqueValuesRouter extends UntypedActor {
public UniqueValuesRouter(Injector injector ) {
router = getContext().actorOf(
- FromConfig.getInstance().props(Props.create(UniqueValueActor.class)), "router");
+ FromConfig.getInstance()
+ .props(Props.create(UniqueValueActor.class)
+ .withDispatcher("blocking-io-dispatcher")), "router");
// TODO: is there some way to pass the injector here without getting this exception:
// NotSerializableException: No configured serialization-bindings for class [InjectorImpl]
[5/7] usergrid git commit: Implement better self leave for akka
clustering if JVM goes down.
Posted by mr...@apache.org.
Implement better self leave for akka clustering if JVM goes down.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/33319f36
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/33319f36
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/33319f36
Branch: refs/heads/master
Commit: 33319f36a28b08f9230d5b6daabd19762b24058a
Parents: 027e40d
Author: Michael Russo <mr...@apigee.com>
Authored: Mon Sep 26 15:41:53 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Mon Sep 26 15:41:53 2016 -0700
----------------------------------------------------------------------
.../actorsystem/ActorSystemManager.java | 2 +-
.../actorsystem/ActorSystemManagerImpl.java | 21 ++++++++++----------
.../actorsystem/ClusterListener.java | 6 +++---
.../apache/usergrid/rest/ShutdownListener.java | 11 ++--------
4 files changed, 17 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/33319f36/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
index 17754f0..322ac6a 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
@@ -75,5 +75,5 @@ public interface ActorSystemManager {
*/
void publishToAllRegions( String topic, Object message, ActorRef sender );
- void shutdownAll();
+ void leaveCluster();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/33319f36/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
index 1021b1a..cc32d1c 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
@@ -376,13 +376,13 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
}
}
- // add a shutdown hook to clean all actor systems if the JVM exits without the servlet container knowing
-// Runtime.getRuntime().addShutdownHook(new Thread() {
-// @Override
-// public void run() {
-// shutdownAll();
-// }
-// });
+ //add a shutdown hook to clean all actor systems if the JVM exits without the servlet container knowing
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ leaveCluster();
+ }
+ });
}
@@ -466,10 +466,11 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
}
@Override
- public void shutdownAll(){
+ public void leaveCluster(){
- logger.info("Shutting down Akka cluster: {}", clusterSystem.name());
- clusterSystem.shutdown();
+ Cluster cluster = Cluster.get(clusterSystem);
+ logger.info("Downing self: {} from cluster: {}", cluster.selfAddress(), clusterSystem.name());
+ cluster.leave(cluster.selfAddress());
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/33319f36/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClusterListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClusterListener.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClusterListener.java
index d0a758d..15bc372 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClusterListener.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClusterListener.java
@@ -81,13 +81,13 @@ public class ClusterListener extends UntypedActor {
logger.info("Unreachable member {} is accessible on the network.", event.member());
// logger.info("Unreachable member {} is accessible on the network, " +
-// "application must have died. Marking member down", event.member());
+// "application must have died. Removing member ", event.member());
//
-// cluster.down(event.member().address());
+// cluster.leave(event.member().address());
}else{
logger.warn("Unreachable member {} is not accessible on the network, " +
- "there must be a network issue. Not marking member down", event.member());
+ "there must be a network issue. Not removing member", event.member());
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/33319f36/stack/rest/src/main/java/org/apache/usergrid/rest/ShutdownListener.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/ShutdownListener.java b/stack/rest/src/main/java/org/apache/usergrid/rest/ShutdownListener.java
index f3707a7..8c96473 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/ShutdownListener.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/ShutdownListener.java
@@ -20,21 +20,14 @@ package org.apache.usergrid.rest;
import com.google.inject.Injector;
import org.apache.usergrid.batch.service.JobSchedulerService;
-import org.apache.usergrid.batch.service.SchedulerService;
import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
-import org.apache.usergrid.persistence.cassandra.CassandraService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
-import org.springframework.context.ApplicationContextAware;
import org.springframework.web.context.support.WebApplicationContextUtils;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
-import javax.servlet.http.HttpSessionAttributeListener;
-import javax.servlet.http.HttpSessionEvent;
-import javax.servlet.http.HttpSessionListener;
-import javax.servlet.http.HttpSessionBindingEvent;
import java.util.Properties;
@@ -71,8 +64,8 @@ public class ShutdownListener implements ServletContextListener {
Injector injector = ctx.getBean(Injector.class);
ActorSystemManager actorSystemManager = injector.getInstance(ActorSystemManager.class);
- // stop the Akka actor system
- //actorSystemManager.shutdownAll();
+ // leave akka cluster
+ actorSystemManager.leaveCluster();
boolean started = Boolean.parseBoolean(
properties.getProperty(JobServiceBoostrap.START_SCHEDULER_PROP, "true"));
[6/7] usergrid git commit: Fix re-index memory leak with flatmap
observable and speed up re-index.
Posted by mr...@apache.org.
Fix re-index memory leak with flatmap observable and speed up re-index.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/ef8899a1
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/ef8899a1
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/ef8899a1
Branch: refs/heads/master
Commit: ef8899a100b8488d4dfd528ce94a1cb8bea582fe
Parents: 33319f3
Author: Michael Russo <mr...@apigee.com>
Authored: Fri Sep 30 18:14:37 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Fri Sep 30 18:14:37 2016 -0700
----------------------------------------------------------------------
.../asyncevents/AsyncEventServiceImpl.java | 20 ++++-------
.../index/IndexProcessorFig.java | 2 +-
.../index/ReIndexServiceImpl.java | 37 +++++++++-----------
.../EntityCollectionManagerFactoryImpl.java | 2 ++
4 files changed, 25 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/ef8899a1/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 0bff887..a108e40 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -876,23 +876,15 @@ public class AsyncEventServiceImpl implements AsyncEventService {
public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
- IndexOperationMessage batch = new IndexOperationMessage();
+ final List<EntityIndexEvent> batch = new ArrayList<>();
+ edges.forEach(e -> {
- for ( EdgeScope e : edges){
+ //change to id scope to avoid serialization issues
+ batch.add(new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince));
- EntityIndexOperation entityIndexOperation =
- new EntityIndexOperation( e.getApplicationScope(), e.getEdge().getTargetNode(), updatedSince);
-
- IndexOperationMessage indexOperationMessage =
- eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null);
-
- if (indexOperationMessage != null){
- batch.ingest(indexOperationMessage);
- }
-
- }
+ });
- queueIndexOperationMessage(batch);
+ offerBatch( batch );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/ef8899a1/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index c05c047..1038408 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -90,7 +90,7 @@ public interface IndexProcessorFig extends GuicyFig {
@Key(ELASTICSEARCH_QUEUE_IMPL)
String getQueueImplementation();
- @Default("100")
+ @Default("500")
@Key(REINDEX_BUFFER_SIZE)
int getReindexBufferSize();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/ef8899a1/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index e3b179d..19fbcfa 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -24,6 +24,7 @@ import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
@@ -113,6 +114,8 @@ public class ReIndexServiceImpl implements ReIndexService {
//load our last emitted Scope if a cursor is present
+ final AtomicInteger count = new AtomicInteger();
+
final Optional<EdgeScope> cursor = parseCursor( reIndexRequestBuilder.getCursor() );
@@ -161,29 +164,21 @@ public class ReIndexServiceImpl implements ReIndexService {
}
- Observable<List<EdgeScope>> runningReIndex = allEntityIdsObservable.getEdgesToEntities( applicationScopes,
+ allEntityIdsObservable.getEdgesToEntities( applicationScopes,
reIndexRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() )
-
.buffer( indexProcessorFig.getReindexBufferSize())
- .flatMap( edgeScopes -> Observable.just(edgeScopes)
- .doOnNext(edges -> {
-
- logger.info("Sending batch of {} to be indexed.", edges.size());
- indexService.indexBatch(edges, modifiedSince);
- })
- .subscribeOn( Schedulers.io() ), indexProcessorFig.getReindexConcurrencyFactor());
-
-
- // start our sampler and state persistence
- // take a sample every sample interval to allow us to resume state with minimal loss
- // create our flushing collector and flush the edge scopes to it
- runningReIndex.collect(() -> new FlushingCollector(jobId),
- ((flushingCollector, edgeScopes) -> flushingCollector.flushBuffer(edgeScopes)))
- .doOnNext( flushingCollector-> flushingCollector.complete() )
- //subscribe on our I/O scheduler and run the task
- .subscribeOn( Schedulers.io() ).subscribe(); //want reindex to continually run so leave subscribe.
-
-
+ .doOnNext( edgeScopes -> {
+ logger.info("Sending batch of {} to be indexed.", edgeScopes.size());
+ indexService.indexBatch(edgeScopes, modifiedSince);
+ count.addAndGet(edgeScopes.size() );
+ if( edgeScopes.size() > 0 ) {
+ writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1));
+ }
+ writeStateMeta( jobId, Status.INPROGRESS, count.get(), System.currentTimeMillis() ); })
+ .doOnCompleted(() -> writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() ))
+ .subscribeOn( Schedulers.io() ).subscribe();
+
+
return new ReIndexStatus( jobId, Status.STARTED, 0, 0 );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/ef8899a1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
index fcaa51d..aa962dd 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@ -44,6 +44,8 @@ import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutionException;
[3/7] usergrid git commit: Move failure detector settings to cluster
overall. Temporarily stop downing nodes when unreachable at application
layer.
Posted by mr...@apache.org.
Move failure detector settings to cluster overall. Temporarily stop downing nodes when unreachable at application layer.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/50b936d5
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/50b936d5
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/50b936d5
Branch: refs/heads/master
Commit: 50b936d563901683bfcc1ab35f9ad34494c1c95d
Parents: 2a89d2d
Author: Michael Russo <mr...@apigee.com>
Authored: Mon Sep 26 11:14:55 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Mon Sep 26 11:14:55 2016 -0700
----------------------------------------------------------------------
.../actorsystem/ActorSystemManagerImpl.java | 11 +++++++++-
.../actorsystem/ClusterListener.java | 21 ++++++++++++++------
.../uniquevalues/UniqueValuesServiceImpl.java | 10 +---------
3 files changed, 26 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/50b936d5/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
index 7e7df9c..1021b1a 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
@@ -20,6 +20,7 @@ package org.apache.usergrid.persistence.actorsystem;
import akka.actor.*;
+import akka.cluster.Cluster;
import akka.cluster.client.ClusterClient;
import akka.cluster.client.ClusterClientReceptionist;
import akka.cluster.client.ClusterClientSettings;
@@ -312,13 +313,21 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
}} );
put( "cluster", new HashMap<String, Object>() {{
- put( "max-nr-of-instances-per-node", numInstancesPerNode);
+ put( "max-nr-of-instances-per-node", numInstancesPerNode); // this sets default if router does not set
put( "roles", Collections.singletonList("io") );
put( "seed-nodes", new ArrayList<String>() {{
for (String seed : seeds) {
add( seed );
}
}} );
+ put( "failure-detector", new HashMap<String, Object>() {{
+ put( "threshold", "20" );
+ put( "acceptable-heartbeat-pause", "6 s" );
+ put( "heartbeat-interval", "1 s" );
+ put( "heartbeat-request", new HashMap<String, Object>() {{
+ put( "expected-response-after", "3 s" );
+ }} );
+ }} );
}} );
}} );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/50b936d5/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClusterListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClusterListener.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClusterListener.java
index a568295..d0a758d 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClusterListener.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClusterListener.java
@@ -78,10 +78,12 @@ public class ClusterListener extends UntypedActor {
java.lang.Runtime.getRuntime().exec("ping -c 1 "+hostname).waitFor() == 0;
if(networkReachable){
- logger.info("Unreachable member {} is accessible on the network, " +
- "application must have died. Marking member down", event.member());
+ logger.info("Unreachable member {} is accessible on the network.", event.member());
- cluster.down(event.member().address());
+// logger.info("Unreachable member {} is accessible on the network, " +
+// "application must have died. Marking member down", event.member());
+//
+// cluster.down(event.member().address());
}else{
logger.warn("Unreachable member {} is not accessible on the network, " +
@@ -95,9 +97,16 @@ public class ClusterListener extends UntypedActor {
} else if (message instanceof ClusterEvent.MemberEvent) {
ClusterEvent.MemberEvent event = (ClusterEvent.MemberEvent) message;
- if(logger.isTraceEnabled()){
- logger.trace("MemberEvent occurred for member: {}, Event: {}", event.member(), event.toString());
- }
+ logger.info("MemberEvent occurred for member: {}, Event: {}", event.member(), event.toString());
+
+ } else if (message instanceof ClusterEvent.LeaderChanged) {
+ ClusterEvent.LeaderChanged event = (ClusterEvent.LeaderChanged) message;
+ logger.info("LeaderChanged occurred for leader: {}, getLeader: {}, Event: {}",
+ event.leader(), event.getLeader(), event.toString());
+
+ } else if (message instanceof ClusterEvent.MemberExited) {
+ ClusterEvent.MemberExited event = (ClusterEvent.MemberExited) message;
+ logger.info("MemberExited occurred for member: {}, Event: {}", event.member(), event.toString());
} else {
unhandled(message);
http://git-wip-us.apache.org/repos/asf/usergrid/blob/50b936d5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
index 82dc8cc..1b13d01 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
@@ -342,15 +342,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
put( "enabled", "on" );
put( "allow-local-routees", "on" );
put( "use-role", "io" );
- put( "max-nr-of-instances-per-node", numInstancesPerNode );
- put( "failure-detector", new HashMap<String, Object>() {{
- put( "threshold", "20" );
- put( "acceptable-heartbeat-pause", "6 s" );
- put( "heartbeat-interval", "1 s" );
- put( "heartbeat-request", new HashMap<String, Object>() {{
- put( "expected-response-after", "3 s" );
- }} );
- }} );
+ put( "max-nr-of-instances-per-node", numInstancesPerNode ); // this sets value specific to this router
}} );
}} );
[7/7] usergrid git commit: Merge branch 'hotfix-20160819'
Posted by mr...@apache.org.
Merge branch 'hotfix-20160819'
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/77d20269
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/77d20269
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/77d20269
Branch: refs/heads/master
Commit: 77d2026907b03625ad7e1ef742c8656712497c8d
Parents: 9947d48 ef8899a
Author: Michael Russo <mr...@apigee.com>
Authored: Fri Sep 30 18:15:27 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Fri Sep 30 18:15:27 2016 -0700
----------------------------------------------------------------------
.../asyncevents/AsyncEventServiceImpl.java | 20 +++-------
.../index/IndexProcessorFig.java | 2 +-
.../index/ReIndexServiceImpl.java | 37 ++++++++---------
stack/corepersistence/actorsystem/pom.xml | 12 +++---
.../persistence/actorsystem/ActorSystemFig.java | 41 ++++++++++++++++++-
.../actorsystem/ActorSystemManager.java | 2 +-
.../actorsystem/ActorSystemManagerImpl.java | 42 +++++++++++++++-----
.../actorsystem/ClusterListener.java | 23 +++++++----
stack/corepersistence/collection/pom.xml | 12 +++---
.../EntityCollectionManagerFactoryImpl.java | 2 +
.../uniquevalues/UniqueValuesRouter.java | 4 +-
.../uniquevalues/UniqueValuesServiceImpl.java | 10 +----
.../apache/usergrid/rest/ShutdownListener.java | 11 +----
13 files changed, 131 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/77d20269/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/77d20269/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
[2/7] usergrid git commit: Update the threshold settings.
Posted by mr...@apache.org.
Update the threshold settings.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2a89d2d9
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2a89d2d9
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2a89d2d9
Branch: refs/heads/master
Commit: 2a89d2d9a9f17c97724f5ca7f28822b4d6d74f0b
Parents: 10ac8d0
Author: Michael Russo <mr...@apigee.com>
Authored: Fri Sep 23 13:12:59 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Fri Sep 23 13:12:59 2016 -0700
----------------------------------------------------------------------
.../persistence/collection/uniquevalues/UniqueValuesRouter.java | 2 +-
.../collection/uniquevalues/UniqueValuesServiceImpl.java | 4 ++--
2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2a89d2d9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java
index 7176202..47db3a5 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java
@@ -44,7 +44,7 @@ public class UniqueValuesRouter extends UntypedActor {
router = getContext().actorOf(
FromConfig.getInstance()
.props(Props.create(UniqueValueActor.class)
- .withDispatcher("blocking-io-dispatcher")), "router");
+ .withDispatcher("akka.blocking-io-dispatcher")), "router");
// TODO: is there some way to pass the injector here without getting this exception:
// NotSerializableException: No configured serialization-bindings for class [InjectorImpl]
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2a89d2d9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
index 8bdb02c..82dc8cc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
@@ -344,8 +344,8 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
put( "use-role", "io" );
put( "max-nr-of-instances-per-node", numInstancesPerNode );
put( "failure-detector", new HashMap<String, Object>() {{
- put( "threshold", "10" );
- put( "acceptable-heartbeat-pause", "3 s" );
+ put( "threshold", "20" );
+ put( "acceptable-heartbeat-pause", "6 s" );
put( "heartbeat-interval", "1 s" );
put( "heartbeat-request", new HashMap<String, Object>() {{
put( "expected-response-after", "3 s" );
[4/7] usergrid git commit: Bump akka version to 2.4.10 ( lastest
current stable version)
Posted by mr...@apache.org.
Bump akka version to 2.4.10 ( lastest current stable version)
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/027e40dd
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/027e40dd
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/027e40dd
Branch: refs/heads/master
Commit: 027e40ddaebffbc365850158c527fa930a0e175f
Parents: 50b936d
Author: Michael Russo <mr...@apigee.com>
Authored: Mon Sep 26 12:28:14 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Mon Sep 26 12:28:14 2016 -0700
----------------------------------------------------------------------
stack/corepersistence/actorsystem/pom.xml | 12 ++++++------
stack/corepersistence/collection/pom.xml | 12 ++++++------
2 files changed, 12 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/027e40dd/stack/corepersistence/actorsystem/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/pom.xml b/stack/corepersistence/actorsystem/pom.xml
index 1933c65..a17c6d3 100644
--- a/stack/corepersistence/actorsystem/pom.xml
+++ b/stack/corepersistence/actorsystem/pom.xml
@@ -35,37 +35,37 @@
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
- <version>2.4.0</version>
+ <version>2.4.10</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.11</artifactId>
- <version>2.4.0</version>
+ <version>2.4.10</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster_2.11</artifactId>
- <version>2.4.0</version>
+ <version>2.4.10</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster-tools_2.11</artifactId>
- <version>2.4.0</version>
+ <version>2.4.10</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster-metrics_2.11</artifactId>
- <version>2.4.0</version>
+ <version>2.4.10</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_2.11</artifactId>
- <version>2.4.0</version>
+ <version>2.4.10</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/usergrid/blob/027e40dd/stack/corepersistence/collection/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/pom.xml b/stack/corepersistence/collection/pom.xml
index 20f8612..2c7702b 100644
--- a/stack/corepersistence/collection/pom.xml
+++ b/stack/corepersistence/collection/pom.xml
@@ -50,37 +50,37 @@
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
- <version>2.4.0</version>
+ <version>2.4.10</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.11</artifactId>
- <version>2.4.0</version>
+ <version>2.4.10</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster_2.11</artifactId>
- <version>2.4.0</version>
+ <version>2.4.10</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster-tools_2.11</artifactId>
- <version>2.4.0</version>
+ <version>2.4.10</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster-metrics_2.11</artifactId>
- <version>2.4.0</version>
+ <version>2.4.10</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_2.11</artifactId>
- <version>2.4.0</version>
+ <version>2.4.10</version>
</dependency>
<!-- tests -->