You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/03/12 16:23:41 UTC

[1/3] incubator-usergrid git commit: Added the ability to specify the number of workers

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-466 e8dc17de7 -> 19c6ad0bc


Added the ability to specify the number of workers


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9aec790b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9aec790b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9aec790b

Branch: refs/heads/USERGRID-466
Commit: 9aec790b3fd8486ce6dba67125f94a5d912e8596
Parents: d81dfaa
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Mar 11 18:27:16 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Mar 11 18:27:16 2015 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/index/IndexFig.java    |  9 ++++
 .../index/impl/EsIndexBufferConsumerImpl.java   | 55 +++++++++++++++-----
 .../index/guice/TestIndexModule.java            |  4 +-
 3 files changed, 52 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9aec790b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index cde86fd..445789f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -81,6 +81,11 @@ public interface IndexFig extends GuicyFig {
      */
     public static final String FAILURE_REJECTED_RETRY_WAIT_TIME = "elasticsearch.rejected_retry_wait";
 
+    /**
+     * The number of worker threads to consume from the queue
+     */
+    public static final String ELASTICSEARCH_WORKER_COUNT = "elasticsearch.worker_count";
+
     public static final String QUERY_LIMIT_DEFAULT = "index.query.limit.default";
 
     @Default( "127.0.0.1" )
@@ -181,4 +186,8 @@ public interface IndexFig extends GuicyFig {
     @Key( INDEX_QUEUE_READ_TIMEOUT )
     int getIndexQueueTimeout();
 
+    @Default("2")
+    @Key( ELASTICSEARCH_WORKER_COUNT )
+    int getWorkerCount();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9aec790b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 862b1ae..836ec3d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -49,9 +49,12 @@ import rx.functions.Func1;
 import rx.functions.Func2;
 import rx.schedulers.Schedulers;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
 
 /**
  * Consumer for IndexOperationMessages
@@ -69,15 +72,18 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
     private final Meter flushMeter;
     private final Timer produceTimer;
     private final BufferQueue bufferQueue;
+    private final IndexFig indexFig;
+    private final AtomicLong counter = new AtomicLong(  );
 
     //the actively running subscription
-    private Subscription subscription;
+    private List<Subscription> subscriptions;
 
     private Object mutex = new Object();
 
     @Inject
-    public EsIndexBufferConsumerImpl( final IndexFig config,  final EsProvider
-        provider, final MetricsFactory metricsFactory,   final BufferQueue bufferQueue ){
+    public EsIndexBufferConsumerImpl( final IndexFig config, final EsProvider provider, final MetricsFactory
+        metricsFactory, final BufferQueue bufferQueue, final IndexFig indexFig ){
+
         this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index.buffer.flush");
         this.flushMeter = metricsFactory.getMeter(EsIndexBufferConsumerImpl.class, "index.buffer.meter");
         this.indexSizeCounter =  metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "index.buffer.size");
@@ -86,15 +92,42 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
         this.client = provider.getClient();
         this.produceTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class,"index.buffer.consumer.messageFetch");
         this.bufferQueue = bufferQueue;
+        this.indexFig = indexFig;
 
-
+        subscriptions = new ArrayList<>( indexFig.getWorkerCount() );
 
         //batch up sets of some size and send them in batch
           start();
     }
 
 
+    /**
+     * Loop throught and start the workers
+     */
     public void start() {
+        final int count = indexFig.getWorkerCount();
+
+        for(int i = 0; i < count; i ++){
+            startWorker();
+        }
+    }
+
+
+    /**
+     * Stop the workers
+     */
+    public void stop() {
+        synchronized ( mutex ) {
+            //stop consuming
+
+            for(final Subscription subscription: subscriptions){
+                subscription.unsubscribe();
+            }
+        }
+    }
+
+
+    private void startWorker(){
         synchronized ( mutex) {
 
             final AtomicInteger countFail = new AtomicInteger();
@@ -104,7 +137,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                 public void call( final Subscriber<? super List<IndexOperationMessage>> subscriber ) {
 
                     //name our thread so it's easy to see
-                    Thread.currentThread().setName( "QueueConsumer_" + Thread.currentThread().getId() );
+                    Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
 
                     List<IndexOperationMessage> drainList;
                     do {
@@ -168,20 +201,14 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                 } );
 
             //start in the background
-            subscription = consumer.subscribe();
-        }
-    }
 
+           final Subscription subscription = consumer.subscribe();
 
-    public void stop() {
-        synchronized ( mutex ) {
-            //stop consuming
-            if(subscription != null) {
-                subscription.unsubscribe();
-            }
+            subscriptions.add(subscription );
         }
     }
 
+
     /**
      * Execute the request, check for errors, then re-init the batch for future use
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9aec790b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
index 7d7a18d..57c2fab 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
@@ -38,8 +38,8 @@ public class TestIndexModule extends TestModule {
         install( new IndexModule() {
             @Override
             public void wireBufferQueue() {
-//                bind( BufferQueue.class).to( BufferQueueInMemoryImpl.class );
-                bind( BufferQueue.class).to( BufferQueueSQSImpl.class );
+                bind( BufferQueue.class).to( BufferQueueInMemoryImpl.class );
+//                bind( BufferQueue.class).to( BufferQueueSQSImpl.class );
             }
         } );
     }


[2/3] incubator-usergrid git commit: Fixes worker issue in cloud formation

Posted by to...@apache.org.
Fixes worker issue in cloud formation


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c319d07d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c319d07d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c319d07d

Branch: refs/heads/USERGRID-466
Commit: c319d07d8c09d2e5988f58faad1dd42cff85cbe8
Parents: 9aec790
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Mar 11 19:33:54 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Mar 11 19:33:54 2015 -0600

----------------------------------------------------------------------
 .../src/main/groovy/configure_usergrid.groovy   |   8 +-
 stack/awscluster/ugcluster-cf.json              | 150 ++-----------------
 2 files changed, 23 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c319d07d/stack/awscluster/src/main/groovy/configure_usergrid.groovy
----------------------------------------------------------------------
diff --git a/stack/awscluster/src/main/groovy/configure_usergrid.groovy b/stack/awscluster/src/main/groovy/configure_usergrid.groovy
index 8af5884..fb98368 100644
--- a/stack/awscluster/src/main/groovy/configure_usergrid.groovy
+++ b/stack/awscluster/src/main/groovy/configure_usergrid.groovy
@@ -51,6 +51,9 @@ def esShards = numEsNodes*2;
 def esReplicas = 1;
 
 def tomcatThreads = System.getenv().get("TOMCAT_THREADS")
+
+def workerCount = System.getenv().get("INDEX_WORKER_COUNT")
+
 //temporarily set to equal since we now have a sane tomcat thread calculation
 def hystrixThreads = tomcatThreads
 
@@ -58,7 +61,6 @@ def hystrixThreads = tomcatThreads
 def ec2Region = System.getenv().get("EC2_REGION")
 def cassEc2Region = ec2Region.replace("-1", "")
 
-
 NodeRegistry registry = new NodeRegistry();
 
 def selectResult = registry.searchNode('cassandra')
@@ -195,6 +197,10 @@ usergrid.queue.region=${ec2Region}
 usergrid.scheduler.enabled=true
 usergrid.scheduler.job.workers=1
 
+
+#Set our ingest rate
+elasticsearch.worker_count=${workerCount}
+
 """
 
 println usergridConfig

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c319d07d/stack/awscluster/ugcluster-cf.json
----------------------------------------------------------------------
diff --git a/stack/awscluster/ugcluster-cf.json b/stack/awscluster/ugcluster-cf.json
index 0c2a9d2..e9db671 100644
--- a/stack/awscluster/ugcluster-cf.json
+++ b/stack/awscluster/ugcluster-cf.json
@@ -46,6 +46,12 @@
             ],
             "ConstraintDescription": "must be valid instance type."
         },
+      "RestIndexWorkers":{
+        "Description": "The number of index workers to ingest ElasticSearch batch operations per tomcat",
+        "Type": "Number",
+        "Default": "8",
+        "MinValue": "3"
+      },
       "TomcatThreadsPerCore": {
         "Description": "Number of threads to configure tomcat for per core",
         "Type": "Number",
@@ -289,7 +295,7 @@
                 }
             }
         },
-       "GraphiteAutoScalingLaunchConfiguration":{
+        "GraphiteAutoScalingLaunchConfiguration":{
          "Type":"AWS::AutoScaling::LaunchConfiguration",
          "Properties":{
             "UserData":{
@@ -464,7 +470,7 @@
                 }
             }
         },
-            "OpsCenterUser": {
+        "OpsCenterUser": {
                         "Type": "AWS::IAM::User",
                         "Properties": {
                             "Path": "/",
@@ -485,7 +491,7 @@
                             ]
                         }
                     },
-                    "OpsCenterKey": {
+        "OpsCenterKey": {
                         "Type": "AWS::IAM::AccessKey",
                         "Properties": {
                             "UserName": {
@@ -493,7 +499,7 @@
                             }
                         }
                     },
-                   "OpsCenterAutoScalingLaunchConfiguration":{
+        "OpsCenterAutoScalingLaunchConfiguration":{
                      "Type":"AWS::AutoScaling::LaunchConfiguration",
                      "Properties":{
                         "UserData":{
@@ -619,7 +625,7 @@
 
                      }
                   },
-                    "OpsCenterAutoScalingGroup": {
+        "OpsCenterAutoScalingGroup": {
                         "Type": "AWS::AutoScaling::AutoScalingGroup",
                         "Version": "2014-07-24",
                         "Properties": {
@@ -767,7 +773,7 @@
                 }
             }
         },
-      "CassAutoScalingLaunchConfiguration":{
+        "CassAutoScalingLaunchConfiguration":{
          "Type":"AWS::AutoScaling::LaunchConfiguration",
          "Properties":{
             "UserData":{
@@ -955,7 +961,7 @@
                 }
             }
         },
-      "ESMasterAutoScalingLaunchConfiguration":{
+        "ESMasterAutoScalingLaunchConfiguration":{
          "Type":"AWS::AutoScaling::LaunchConfiguration",
          "Properties":{
             "UserData":{
@@ -1137,131 +1143,6 @@
                 }
             }
         },
-      "RestAutoScalingLaunchConfiguration":{
-         "Type":"AWS::AutoScaling::LaunchConfiguration",
-         "Properties":{
-            "UserData":{
-               "Fn::Base64":{
-                  "Fn::Join":[
-                     "",
-                     [
-                        "#!/bin/bash -ex\n",
-                        "# REST SERVER STARTUP \n",
-                        "exec >/var/log/usergrid-bootstrap.log 2>&1\n",
-                        "\n",
-                        "mkdir -p /usr/share/usergrid\n",
-                        "\n",
-                        "# create script that sets our environment variables\n",
-                        "cat >/etc/profile.d/usergrid-env.sh <<EOF\n",
-                        "alias sudo='sudo -E'\n",
-                        "\n",
-                        "export TYPE=rest\n",
-                        "export STACK_NAME=", { "Ref":"AWS::StackName" }, "\n",
-                        "export YOURKIT=", { "Ref":"InstallYourkit" }, "\n",
-                        "export DNS_NAME=", { "Ref":"DnsSubDomain" }, "\n",
-                        "export DNS_DOMAIN=", { "Ref":"DnsDomain" }, "\n",
-                        "export PUBLIC_HOSTNAME=`(curl -s http://169.254.169.254/latest/meta-data/public-hostname)`\n",
-                        "export INTERNAL_HOSTNAME=`(curl http://169.254.169.254/latest/meta-data/local-ipv4)`\n",
-                        "export ELB_NAME=", { "Ref":"RestElasticLoadBalancer" }, "\n",
-                        "\n",
-                        "export EC2_INSTANCE_ID=`ec2metadata --instance-id`\n",
-                        "export EC2_REGION=", { "Ref":"AWS::Region" }, "\n",
-                        "export EC2_URL=https://ec2.amazonaws.com/\n", "\n",
-                        "export REST_SECURITY_GROUP_NAME=", { "Ref":"RestSecurityGroup" }, "\n",
-                        "export DB_SECURITY_GROUP_NAME=", { "Ref":"CassSecurityGroup" }, "\n",
-                        "\n",
-                        "export CASSANDRA_CLUSTER_NAME=", { "Ref":"CassClusterName" }, "\n",
-                        "export CASSANDRA_KEYSPACE_NAME=usergrid", "\n",
-                        "export CASSANDRA_NUM_SERVERS=", { "Ref":"CassNumServers" }, "\n",
-                        "export GRAPHITE_NUM_SERVERS=", { "Ref":"GraphiteNumServers" }, "\n",
-                        "export TOMCAT_NUM_SERVERS=", { "Ref":"RestMinServers" }, "\n",
-                        "\n",
-                        "export CASSANDRA_REPLICATION_FACTOR=", { "Ref":"CassReplicationFactor" }, "\n",
-                        "\n",
-                        "export CASSANDRA_READ_CONSISTENCY=", { "Ref":"CassReadConsistency" }, "\n",
-                        "\n",
-                        "export CASSANDRA_WRITE_CONSISTENCY=", { "Ref":"CassWriteConsistency" }, "\n",
-                        "\n",
-
-                        "export ES_CLUSTER_NAME=", { "Ref":"ESClusterName" }, "\n",
-                        "export ES_NUM_SERVERS=", { "Ref":"ESNumServers" }, "\n",
-                         "\n",
-                        "export RELEASE_BUCKET=", { "Ref":"ReleaseBucket" }, "\n",
-                        "\n",
-                        "export NUM_THREAD_PROC=", { "Ref":"TomcatThreadsPerCore" }, "\n",
-                        "\n",
-                        "export SUPER_USER_EMAIL=", { "Ref":"SuperUserEmail" }, "\n",
-                        "export TEST_ADMIN_USER_EMAIL=", { "Ref":"TestAdminUserEmail" }, "\n",
-                        "\n",
-                        "EOF\n",
-                        "\n",
-                        "# put AWS creds in environment\n",
-                        "cat >/etc/profile.d/aws-credentials.sh <<EOF\n",
-                        "export AWS_ACCESS_KEY=", { "Ref":"RestKey" }, "\n",
-                        "export AWS_SECRET_KEY=", { "Fn::GetAtt":[ "RestKey", "SecretAccessKey" ] }, "\n",
-                        "EOF\n",
-                        "\n",
-                        "# setup s3cmd (will be installed by init script) \n",
-                        "cat >/etc/s3cfg <<EOF\n",
-                        "access_key=", { "Ref":"RestKey" }, "\n",
-                        "secret_key=", { "Fn::GetAtt":[ "RestKey", "SecretAccessKey" ] }, "\n",
-                        "EOF\n",
-                        "chmod 644 /etc/s3cfg\n",
-                        "ln -s /etc/s3cfg ~ubuntu/.s3cfg\n",
-                        "ln -s /etc/s3cfg ~root/.s3cfg\n",
-                        "\n",
-                        "# download usergrid and init script bundle from S3\n",
-                        "wget -O- -q http://s3tools.org/repo/deb-all/stable/s3tools.key | apt-key add -\n",
-                        "wget -O/etc/apt/sources.list.d/s3tools.list http://s3tools.org/repo/deb-all/stable/s3tools.list\n",
-                        "apt-get update\n",
-                        "apt-get -y install s3cmd\n",
-                        "cd /usr/share/usergrid\n",
-                        "s3cmd --config=/etc/s3cfg get s3://", {"Ref": "ReleaseBucket"}, "/awscluster-1.0-SNAPSHOT-any.tar.gz\n",
-                        "s3cmd --config=/etc/s3cfg get s3://", {"Ref": "ReleaseBucket"}, "/ROOT.war\n",
-                        "tar xvf awscluster-1.0-SNAPSHOT-any.tar.gz\n",
-                        "rm -fr awscluster-1.0-SNAPSHOT-any.tar.gz\n",
-                        "mv ROOT.war webapps/ROOT.war\n",
-                        "chmod 755 ./init_instance/*.sh\n",
-                        "cd ./init_instance\n",
-                        "# Init as a REST intance \n",
-                        "sh ./init_rest_server.sh\n"
-                     ]
-                  ]
-               }
-            },
-            "KeyName":{
-               "Ref":"KeyPair"
-            },
-            "ImageId":{
-               "Fn::FindInMap":[
-                  "AWSRegionArch2AMI",
-                  {
-                     "Ref":"AWS::Region"
-                  },
-                  {
-                     "Fn::FindInMap":[
-                        "AWSInstanceType2Arch",
-                        {
-                           "Ref":"RestInstanceType"
-                        },
-                        "Arch"
-                     ]
-                  }
-               ]
-            },
-            "InstanceType":{
-               "Ref":"RestInstanceType"
-            },
-            "IamInstanceProfile":{
-               "Ref":"RootInstanceProfile"
-            },
-            "SecurityGroups":[
-               {
-                  "Ref":"RestSecurityGroup"
-               }
-            ]
-         }
-      },
         "ESAutoScalingLaunchConfiguration":{
                  "Type":"AWS::AutoScaling::LaunchConfiguration",
                  "Properties":{
@@ -1378,7 +1259,7 @@
                             ]
                     }
               },
-                "ESAutoScalingGroup": {
+        "ESAutoScalingGroup": {
                     "Type": "AWS::AutoScaling::AutoScalingGroup",
                     "Version": "2009-05-15",
                     "Properties": {
@@ -1442,7 +1323,7 @@
                         }
                     }
                 },
-              "RestAutoScalingLaunchConfiguration":{
+        "RestAutoScalingLaunchConfiguration":{
                  "Type":"AWS::AutoScaling::LaunchConfiguration",
                  "Properties":{
                     "UserData":{
@@ -1487,6 +1368,7 @@
                                 "\n",
                                 "export CASSANDRA_WRITE_CONSISTENCY=", { "Ref":"CassWriteConsistency" }, "\n",
                                 "\n",
+                               "export INDEX_WORKER_COUNT=", { "Ref":"RestIndexWorkers" }, "\n",
 
                                 "export ES_CLUSTER_NAME=", { "Ref":"ESClusterName" }, "\n",
                                 "export ES_NUM_SERVERS=", { "Ref":"ESNumServers" }, "\n",


[3/3] incubator-usergrid git commit: Merge branch 'USERGRID-466' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-466

Posted by to...@apache.org.
Merge branch 'USERGRID-466' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-466


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/19c6ad0b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/19c6ad0b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/19c6ad0b

Branch: refs/heads/USERGRID-466
Commit: 19c6ad0bc01d79719079985b4930956c1ed17b48
Parents: c319d07 e8dc17d
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Mar 11 19:34:13 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Mar 11 19:34:13 2015 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/core/metrics/MetricsFactoryImpl.java      | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------