You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/04/26 17:02:50 UTC
[26/50] [abbrv] usergrid git commit: Concurreny / threading changes.
Remove Quorum reads that aren't necessarily consistency related problems.
Concurreny / threading changes. Remove Quorum reads that aren't necessarily consistency related problems.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/794bbd44
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/794bbd44
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/794bbd44
Branch: refs/heads/asf-site
Commit: 794bbd44c9a546b11cdc9729d15ffcf24c26662d
Parents: 8c0338c
Author: Michael Russo <mr...@apigee.com>
Authored: Wed Apr 13 18:53:09 2016 +0200
Committer: Michael Russo <mr...@apigee.com>
Committed: Wed Apr 13 18:55:00 2016 +0200
----------------------------------------------------------------------
.../asyncevents/AsyncEventServiceImpl.java | 2 +-
.../corepersistence/index/IndexProcessorFig.java | 2 +-
.../corepersistence/service/ServiceSchedulerFig.java | 2 +-
.../collection/mvcc/stage/write/WriteUniqueVerify.java | 9 +++++++--
.../collection/serialization/SerializationFig.java | 3 +++
.../persistence/core/executor/TaskExecutorFactory.java | 10 +++++++---
.../apache/usergrid/persistence/queue/QueueFig.java | 4 ++++
.../services/notifications/gcm/GCMAdapter.java | 13 ++++++++-----
tests/integration/test/notifications/notifications.js | 5 -----
9 files changed, 32 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/794bbd44/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index d180919..82ad5be 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -493,7 +493,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
if(message == null) {
// provide some time back pressure before performing a quorum read
- if ( System.currentTimeMillis() > elasticsearchIndexEvent.getCreationTime() + queueFig.getLocalQuorumTimeout() ) {
+ if ( queueFig.getQuorumFallback() && System.currentTimeMillis() > elasticsearchIndexEvent.getCreationTime() + queueFig.getLocalQuorumTimeout() ) {
if(logger.isDebugEnabled()){
logger.debug("ES batch with id {} not found, reading with strong consistency", messageId);
http://git-wip-us.apache.org/repos/asf/usergrid/blob/794bbd44/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index b94da65..c05c047 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -77,7 +77,7 @@ public interface IndexProcessorFig extends GuicyFig {
/**
* The number of worker threads used to read index write requests from the queue.
*/
- @Default("16")
+ @Default("8")
@Key(ELASTICSEARCH_WORKER_COUNT)
int getWorkerCount();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/794bbd44/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java
index e585ee3..764bba1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java
@@ -50,7 +50,7 @@ public interface ServiceSchedulerFig extends GuicyFig {
- @Default("100")
+ @Default("50")
@Key( SERVICE_IMPORT_THREADS)
int getImportThreadPoolSize();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/794bbd44/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index 780b83b..d05f838 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -66,6 +66,8 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
private final UniqueValueSerializationStrategy uniqueValueStrat;
+ public static int uniqueVerifyPoolSize = 100;
+
protected final SerializationFig serializationFig;
protected final Keyspace keyspace;
@@ -83,6 +85,8 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
this.uniqueValueStrat = uniqueValueSerializiationStrategy;
this.serializationFig = serializationFig;
+
+ uniqueVerifyPoolSize = this.serializationFig.getUniqueVerifyPoolSize();
}
@@ -175,7 +179,8 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
@Override
protected Map<String, Field> getFallback() {
- return executeStrategy(fig.getConsistentReadCL());
+ // fallback with same CL as there are many reasons the 1st execution failed, not just due to consistency problems
+ return executeStrategy(fig.getReadCL());
}
public Map<String, Field> executeStrategy(ConsistencyLevel consistencyLevel){
@@ -219,5 +224,5 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
public static final HystrixCommand.Setter
REPLAY_GROUP = HystrixCommand.Setter.withGroupKey(
HystrixCommandGroupKey.Factory.asKey( "uniqueVerify" ) ).andThreadPoolPropertiesDefaults(
- HystrixThreadPoolProperties.Setter().withCoreSize( 100 ) );
+ HystrixThreadPoolProperties.Setter().withCoreSize( uniqueVerifyPoolSize ) );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/794bbd44/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
index ca9cd99..96759ba 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
@@ -54,4 +54,7 @@ public interface SerializationFig extends GuicyFig {
@Default( "5000000" )
int getMaxEntitySize();
+ @Key ( "usergrid.uniqueverify.poolsize" )
+ @Default( "150" )
+ int getUniqueVerifyPoolSize();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/794bbd44/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
index 5e7761c..c1c6207 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
@@ -168,8 +168,10 @@ public class TaskExecutorFactory {
@Override
public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
- logger.warn( "{} task queue full, rejecting task {} and running in thread {}", poolName, r,
- Thread.currentThread().getName() );
+ if(logger.isDebugEnabled()) {
+ logger.debug("{} task queue full, rejecting task {} and running in thread {}", poolName, r,
+ Thread.currentThread().getName());
+ }
//We've decided we want to have a "caller runs" policy, to just invoke the task when rejected
@@ -191,7 +193,9 @@ public class TaskExecutorFactory {
@Override
public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
- logger.warn( "{} task queue full, dropping task {}", poolName, r );
+ if(logger.isDebugEnabled()) {
+ logger.warn("{} task queue full, dropping task {}", poolName, r);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/794bbd44/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
index ca6e011..533314b 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
@@ -99,4 +99,8 @@ public interface QueueFig extends GuicyFig {
@Default( "3000" ) // 3 seconds
int getQueuePollTimeshift();
+ @Key( "usergrid.queue.quorum.fallback")
+ @Default("false") // 30 seconds
+ boolean getQuorumFallback();
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/794bbd44/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
index e334a54..af0bc78 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
@@ -242,11 +242,14 @@ public class GCMAdapter implements ProviderAdapter {
payload.remove(priorityKey);
}
-
- // add our source notification payload data into the Message Builder
- // Message.Builder requires the payload to be Map<String,String> so blindly cast
- Map<String,String> dataMap = (Map<String,String>) payload;
- dataMap.forEach( (key, value) -> builder.addData(key, value));
+//
+// // add our source notification payload data into the Message Builder
+// // Message.Builder requires the payload to be Map<String,String> so blindly cast
+// Map<String,String> dataMap = (Map<String,String>) payload;
+//
+// dataMap.forEach( (key, value) -> builder.addData(key, value));
+
+ builder.addData("data", JSON.toString(payload));
Message message = builder.build();
MulticastResult multicastResult;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/794bbd44/tests/integration/test/notifications/notifications.js
----------------------------------------------------------------------
diff --git a/tests/integration/test/notifications/notifications.js b/tests/integration/test/notifications/notifications.js
index 644510f..7a255e5 100644
--- a/tests/integration/test/notifications/notifications.js
+++ b/tests/integration/test/notifications/notifications.js
@@ -278,7 +278,6 @@ module.exports = {
function (err, notification) {
should(err).be.null;
notification.should.not.be.null;
- notification.expectedCount.should.be.equal(1);
setTimeout(function() {
// wait a second before proceeding
@@ -306,7 +305,6 @@ module.exports = {
function (err, notification) {
should(err).be.null;
notification.should.not.be.null;
- notification.expectedCount.should.be.equal(1);
setTimeout(function() {
// wait a second before proceeding
@@ -334,9 +332,6 @@ module.exports = {
should(err).be.null;
notification.should.not.be.null;
- // we set up 2 groups of the same 5 users. if duplicate filtering is working,
- // we'll only have 5 expected
- notification.expectedCount.should.be.equal(5);
setTimeout(function() {
// wait a second before proceeding