You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/03/12 16:23:41 UTC
[1/3] incubator-usergrid git commit: Added the ability to specify the
number of workers
Repository: incubator-usergrid
Updated Branches:
refs/heads/USERGRID-466 e8dc17de7 -> 19c6ad0bc
Added the ability to specify the number of workers
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9aec790b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9aec790b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9aec790b
Branch: refs/heads/USERGRID-466
Commit: 9aec790b3fd8486ce6dba67125f94a5d912e8596
Parents: d81dfaa
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Mar 11 18:27:16 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Mar 11 18:27:16 2015 -0600
----------------------------------------------------------------------
.../usergrid/persistence/index/IndexFig.java | 9 ++++
.../index/impl/EsIndexBufferConsumerImpl.java | 55 +++++++++++++++-----
.../index/guice/TestIndexModule.java | 4 +-
3 files changed, 52 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9aec790b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index cde86fd..445789f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -81,6 +81,11 @@ public interface IndexFig extends GuicyFig {
*/
public static final String FAILURE_REJECTED_RETRY_WAIT_TIME = "elasticsearch.rejected_retry_wait";
+ /**
+ * The number of worker threads to consume from the queue
+ */
+ public static final String ELASTICSEARCH_WORKER_COUNT = "elasticsearch.worker_count";
+
public static final String QUERY_LIMIT_DEFAULT = "index.query.limit.default";
@Default( "127.0.0.1" )
@@ -181,4 +186,8 @@ public interface IndexFig extends GuicyFig {
@Key( INDEX_QUEUE_READ_TIMEOUT )
int getIndexQueueTimeout();
+ @Default("2")
+ @Key( ELASTICSEARCH_WORKER_COUNT )
+ int getWorkerCount();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9aec790b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 862b1ae..836ec3d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -49,9 +49,12 @@ import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* Consumer for IndexOperationMessages
@@ -69,15 +72,18 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
private final Meter flushMeter;
private final Timer produceTimer;
private final BufferQueue bufferQueue;
+ private final IndexFig indexFig;
+ private final AtomicLong counter = new AtomicLong( );
//the actively running subscription
- private Subscription subscription;
+ private List<Subscription> subscriptions;
private Object mutex = new Object();
@Inject
- public EsIndexBufferConsumerImpl( final IndexFig config, final EsProvider
- provider, final MetricsFactory metricsFactory, final BufferQueue bufferQueue ){
+ public EsIndexBufferConsumerImpl( final IndexFig config, final EsProvider provider, final MetricsFactory
+ metricsFactory, final BufferQueue bufferQueue, final IndexFig indexFig ){
+
this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index.buffer.flush");
this.flushMeter = metricsFactory.getMeter(EsIndexBufferConsumerImpl.class, "index.buffer.meter");
this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "index.buffer.size");
@@ -86,15 +92,42 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
this.client = provider.getClient();
this.produceTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class,"index.buffer.consumer.messageFetch");
this.bufferQueue = bufferQueue;
+ this.indexFig = indexFig;
-
+ subscriptions = new ArrayList<>( indexFig.getWorkerCount() );
//batch up sets of some size and send them in batch
start();
}
+ /**
+ * Loop throught and start the workers
+ */
public void start() {
+ final int count = indexFig.getWorkerCount();
+
+ for(int i = 0; i < count; i ++){
+ startWorker();
+ }
+ }
+
+
+ /**
+ * Stop the workers
+ */
+ public void stop() {
+ synchronized ( mutex ) {
+ //stop consuming
+
+ for(final Subscription subscription: subscriptions){
+ subscription.unsubscribe();
+ }
+ }
+ }
+
+
+ private void startWorker(){
synchronized ( mutex) {
final AtomicInteger countFail = new AtomicInteger();
@@ -104,7 +137,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
public void call( final Subscriber<? super List<IndexOperationMessage>> subscriber ) {
//name our thread so it's easy to see
- Thread.currentThread().setName( "QueueConsumer_" + Thread.currentThread().getId() );
+ Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
List<IndexOperationMessage> drainList;
do {
@@ -168,20 +201,14 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
} );
//start in the background
- subscription = consumer.subscribe();
- }
- }
+ final Subscription subscription = consumer.subscribe();
- public void stop() {
- synchronized ( mutex ) {
- //stop consuming
- if(subscription != null) {
- subscription.unsubscribe();
- }
+ subscriptions.add(subscription );
}
}
+
/**
* Execute the request, check for errors, then re-init the batch for future use
*/
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9aec790b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
index 7d7a18d..57c2fab 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
@@ -38,8 +38,8 @@ public class TestIndexModule extends TestModule {
install( new IndexModule() {
@Override
public void wireBufferQueue() {
-// bind( BufferQueue.class).to( BufferQueueInMemoryImpl.class );
- bind( BufferQueue.class).to( BufferQueueSQSImpl.class );
+ bind( BufferQueue.class).to( BufferQueueInMemoryImpl.class );
+// bind( BufferQueue.class).to( BufferQueueSQSImpl.class );
}
} );
}
[2/3] incubator-usergrid git commit: Fixes worker issue in cloud
formation
Posted by to...@apache.org.
Fixes worker issue in cloud formation
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c319d07d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c319d07d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c319d07d
Branch: refs/heads/USERGRID-466
Commit: c319d07d8c09d2e5988f58faad1dd42cff85cbe8
Parents: 9aec790
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Mar 11 19:33:54 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Mar 11 19:33:54 2015 -0600
----------------------------------------------------------------------
.../src/main/groovy/configure_usergrid.groovy | 8 +-
stack/awscluster/ugcluster-cf.json | 150 ++-----------------
2 files changed, 23 insertions(+), 135 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c319d07d/stack/awscluster/src/main/groovy/configure_usergrid.groovy
----------------------------------------------------------------------
diff --git a/stack/awscluster/src/main/groovy/configure_usergrid.groovy b/stack/awscluster/src/main/groovy/configure_usergrid.groovy
index 8af5884..fb98368 100644
--- a/stack/awscluster/src/main/groovy/configure_usergrid.groovy
+++ b/stack/awscluster/src/main/groovy/configure_usergrid.groovy
@@ -51,6 +51,9 @@ def esShards = numEsNodes*2;
def esReplicas = 1;
def tomcatThreads = System.getenv().get("TOMCAT_THREADS")
+
+def workerCount = System.getenv().get("INDEX_WORKER_COUNT")
+
//temporarily set to equal since we now have a sane tomcat thread calculation
def hystrixThreads = tomcatThreads
@@ -58,7 +61,6 @@ def hystrixThreads = tomcatThreads
def ec2Region = System.getenv().get("EC2_REGION")
def cassEc2Region = ec2Region.replace("-1", "")
-
NodeRegistry registry = new NodeRegistry();
def selectResult = registry.searchNode('cassandra')
@@ -195,6 +197,10 @@ usergrid.queue.region=${ec2Region}
usergrid.scheduler.enabled=true
usergrid.scheduler.job.workers=1
+
+#Set our ingest rate
+elasticsearch.worker_count=${workerCount}
+
"""
println usergridConfig
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c319d07d/stack/awscluster/ugcluster-cf.json
----------------------------------------------------------------------
diff --git a/stack/awscluster/ugcluster-cf.json b/stack/awscluster/ugcluster-cf.json
index 0c2a9d2..e9db671 100644
--- a/stack/awscluster/ugcluster-cf.json
+++ b/stack/awscluster/ugcluster-cf.json
@@ -46,6 +46,12 @@
],
"ConstraintDescription": "must be valid instance type."
},
+ "RestIndexWorkers":{
+ "Description": "The number of index workers to ingest ElasticSearch batch operations per tomcat",
+ "Type": "Number",
+ "Default": "8",
+ "MinValue": "3"
+ },
"TomcatThreadsPerCore": {
"Description": "Number of threads to configure tomcat for per core",
"Type": "Number",
@@ -289,7 +295,7 @@
}
}
},
- "GraphiteAutoScalingLaunchConfiguration":{
+ "GraphiteAutoScalingLaunchConfiguration":{
"Type":"AWS::AutoScaling::LaunchConfiguration",
"Properties":{
"UserData":{
@@ -464,7 +470,7 @@
}
}
},
- "OpsCenterUser": {
+ "OpsCenterUser": {
"Type": "AWS::IAM::User",
"Properties": {
"Path": "/",
@@ -485,7 +491,7 @@
]
}
},
- "OpsCenterKey": {
+ "OpsCenterKey": {
"Type": "AWS::IAM::AccessKey",
"Properties": {
"UserName": {
@@ -493,7 +499,7 @@
}
}
},
- "OpsCenterAutoScalingLaunchConfiguration":{
+ "OpsCenterAutoScalingLaunchConfiguration":{
"Type":"AWS::AutoScaling::LaunchConfiguration",
"Properties":{
"UserData":{
@@ -619,7 +625,7 @@
}
},
- "OpsCenterAutoScalingGroup": {
+ "OpsCenterAutoScalingGroup": {
"Type": "AWS::AutoScaling::AutoScalingGroup",
"Version": "2014-07-24",
"Properties": {
@@ -767,7 +773,7 @@
}
}
},
- "CassAutoScalingLaunchConfiguration":{
+ "CassAutoScalingLaunchConfiguration":{
"Type":"AWS::AutoScaling::LaunchConfiguration",
"Properties":{
"UserData":{
@@ -955,7 +961,7 @@
}
}
},
- "ESMasterAutoScalingLaunchConfiguration":{
+ "ESMasterAutoScalingLaunchConfiguration":{
"Type":"AWS::AutoScaling::LaunchConfiguration",
"Properties":{
"UserData":{
@@ -1137,131 +1143,6 @@
}
}
},
- "RestAutoScalingLaunchConfiguration":{
- "Type":"AWS::AutoScaling::LaunchConfiguration",
- "Properties":{
- "UserData":{
- "Fn::Base64":{
- "Fn::Join":[
- "",
- [
- "#!/bin/bash -ex\n",
- "# REST SERVER STARTUP \n",
- "exec >/var/log/usergrid-bootstrap.log 2>&1\n",
- "\n",
- "mkdir -p /usr/share/usergrid\n",
- "\n",
- "# create script that sets our environment variables\n",
- "cat >/etc/profile.d/usergrid-env.sh <<EOF\n",
- "alias sudo='sudo -E'\n",
- "\n",
- "export TYPE=rest\n",
- "export STACK_NAME=", { "Ref":"AWS::StackName" }, "\n",
- "export YOURKIT=", { "Ref":"InstallYourkit" }, "\n",
- "export DNS_NAME=", { "Ref":"DnsSubDomain" }, "\n",
- "export DNS_DOMAIN=", { "Ref":"DnsDomain" }, "\n",
- "export PUBLIC_HOSTNAME=`(curl -s http://169.254.169.254/latest/meta-data/public-hostname)`\n",
- "export INTERNAL_HOSTNAME=`(curl http://169.254.169.254/latest/meta-data/local-ipv4)`\n",
- "export ELB_NAME=", { "Ref":"RestElasticLoadBalancer" }, "\n",
- "\n",
- "export EC2_INSTANCE_ID=`ec2metadata --instance-id`\n",
- "export EC2_REGION=", { "Ref":"AWS::Region" }, "\n",
- "export EC2_URL=https://ec2.amazonaws.com/\n", "\n",
- "export REST_SECURITY_GROUP_NAME=", { "Ref":"RestSecurityGroup" }, "\n",
- "export DB_SECURITY_GROUP_NAME=", { "Ref":"CassSecurityGroup" }, "\n",
- "\n",
- "export CASSANDRA_CLUSTER_NAME=", { "Ref":"CassClusterName" }, "\n",
- "export CASSANDRA_KEYSPACE_NAME=usergrid", "\n",
- "export CASSANDRA_NUM_SERVERS=", { "Ref":"CassNumServers" }, "\n",
- "export GRAPHITE_NUM_SERVERS=", { "Ref":"GraphiteNumServers" }, "\n",
- "export TOMCAT_NUM_SERVERS=", { "Ref":"RestMinServers" }, "\n",
- "\n",
- "export CASSANDRA_REPLICATION_FACTOR=", { "Ref":"CassReplicationFactor" }, "\n",
- "\n",
- "export CASSANDRA_READ_CONSISTENCY=", { "Ref":"CassReadConsistency" }, "\n",
- "\n",
- "export CASSANDRA_WRITE_CONSISTENCY=", { "Ref":"CassWriteConsistency" }, "\n",
- "\n",
-
- "export ES_CLUSTER_NAME=", { "Ref":"ESClusterName" }, "\n",
- "export ES_NUM_SERVERS=", { "Ref":"ESNumServers" }, "\n",
- "\n",
- "export RELEASE_BUCKET=", { "Ref":"ReleaseBucket" }, "\n",
- "\n",
- "export NUM_THREAD_PROC=", { "Ref":"TomcatThreadsPerCore" }, "\n",
- "\n",
- "export SUPER_USER_EMAIL=", { "Ref":"SuperUserEmail" }, "\n",
- "export TEST_ADMIN_USER_EMAIL=", { "Ref":"TestAdminUserEmail" }, "\n",
- "\n",
- "EOF\n",
- "\n",
- "# put AWS creds in environment\n",
- "cat >/etc/profile.d/aws-credentials.sh <<EOF\n",
- "export AWS_ACCESS_KEY=", { "Ref":"RestKey" }, "\n",
- "export AWS_SECRET_KEY=", { "Fn::GetAtt":[ "RestKey", "SecretAccessKey" ] }, "\n",
- "EOF\n",
- "\n",
- "# setup s3cmd (will be installed by init script) \n",
- "cat >/etc/s3cfg <<EOF\n",
- "access_key=", { "Ref":"RestKey" }, "\n",
- "secret_key=", { "Fn::GetAtt":[ "RestKey", "SecretAccessKey" ] }, "\n",
- "EOF\n",
- "chmod 644 /etc/s3cfg\n",
- "ln -s /etc/s3cfg ~ubuntu/.s3cfg\n",
- "ln -s /etc/s3cfg ~root/.s3cfg\n",
- "\n",
- "# download usergrid and init script bundle from S3\n",
- "wget -O- -q http://s3tools.org/repo/deb-all/stable/s3tools.key | apt-key add -\n",
- "wget -O/etc/apt/sources.list.d/s3tools.list http://s3tools.org/repo/deb-all/stable/s3tools.list\n",
- "apt-get update\n",
- "apt-get -y install s3cmd\n",
- "cd /usr/share/usergrid\n",
- "s3cmd --config=/etc/s3cfg get s3://", {"Ref": "ReleaseBucket"}, "/awscluster-1.0-SNAPSHOT-any.tar.gz\n",
- "s3cmd --config=/etc/s3cfg get s3://", {"Ref": "ReleaseBucket"}, "/ROOT.war\n",
- "tar xvf awscluster-1.0-SNAPSHOT-any.tar.gz\n",
- "rm -fr awscluster-1.0-SNAPSHOT-any.tar.gz\n",
- "mv ROOT.war webapps/ROOT.war\n",
- "chmod 755 ./init_instance/*.sh\n",
- "cd ./init_instance\n",
- "# Init as a REST intance \n",
- "sh ./init_rest_server.sh\n"
- ]
- ]
- }
- },
- "KeyName":{
- "Ref":"KeyPair"
- },
- "ImageId":{
- "Fn::FindInMap":[
- "AWSRegionArch2AMI",
- {
- "Ref":"AWS::Region"
- },
- {
- "Fn::FindInMap":[
- "AWSInstanceType2Arch",
- {
- "Ref":"RestInstanceType"
- },
- "Arch"
- ]
- }
- ]
- },
- "InstanceType":{
- "Ref":"RestInstanceType"
- },
- "IamInstanceProfile":{
- "Ref":"RootInstanceProfile"
- },
- "SecurityGroups":[
- {
- "Ref":"RestSecurityGroup"
- }
- ]
- }
- },
"ESAutoScalingLaunchConfiguration":{
"Type":"AWS::AutoScaling::LaunchConfiguration",
"Properties":{
@@ -1378,7 +1259,7 @@
]
}
},
- "ESAutoScalingGroup": {
+ "ESAutoScalingGroup": {
"Type": "AWS::AutoScaling::AutoScalingGroup",
"Version": "2009-05-15",
"Properties": {
@@ -1442,7 +1323,7 @@
}
}
},
- "RestAutoScalingLaunchConfiguration":{
+ "RestAutoScalingLaunchConfiguration":{
"Type":"AWS::AutoScaling::LaunchConfiguration",
"Properties":{
"UserData":{
@@ -1487,6 +1368,7 @@
"\n",
"export CASSANDRA_WRITE_CONSISTENCY=", { "Ref":"CassWriteConsistency" }, "\n",
"\n",
+ "export INDEX_WORKER_COUNT=", { "Ref":"RestIndexWorkers" }, "\n",
"export ES_CLUSTER_NAME=", { "Ref":"ESClusterName" }, "\n",
"export ES_NUM_SERVERS=", { "Ref":"ESNumServers" }, "\n",
[3/3] incubator-usergrid git commit: Merge branch 'USERGRID-466' of
https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-466
Posted by to...@apache.org.
Merge branch 'USERGRID-466' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-466
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/19c6ad0b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/19c6ad0b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/19c6ad0b
Branch: refs/heads/USERGRID-466
Commit: 19c6ad0bc01d79719079985b4930956c1ed17b48
Parents: c319d07 e8dc17d
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Mar 11 19:34:13 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Mar 11 19:34:13 2015 -0600
----------------------------------------------------------------------
.../usergrid/persistence/core/metrics/MetricsFactoryImpl.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------