You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/10/01 18:27:36 UTC

[01/36] usergrid git commit: fix time_to_live gcm

Repository: usergrid
Updated Branches:
  refs/heads/master 60acf84a1 -> a56c8dddf


fix time_to_live gcm


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/32fcbb5b
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/32fcbb5b
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/32fcbb5b

Branch: refs/heads/master
Commit: 32fcbb5be966d464a40c644752827dc6996e507a
Parents: 0c12abb
Author: Shawn Feldman <sf...@apache.org>
Authored: Sat Sep 19 15:01:06 2015 -0400
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Sep 22 14:34:15 2015 -0400

----------------------------------------------------------------------
 .../usergrid/services/notifications/gcm/GCMAdapter.java       | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/32fcbb5b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
index bdd7737..ba404e8 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
@@ -74,7 +74,7 @@ public class GCMAdapter implements ProviderAdapter {
             long ttlSeconds = notification.getExpireTTLSeconds();
             // max ttl for gcm is 4 weeks - https://developers.google.com/cloud-messaging/http-server-ref
             ttlSeconds = ttlSeconds <= 2419200 ? ttlSeconds : 2419200;
-            map.put(expiresKey, ttlSeconds);
+            map.put(expiresKey, (int)ttlSeconds);//needs to be int
         }
         Batch batch = getBatch( map);
         batch.add(providerId, tracker);
@@ -204,8 +204,13 @@ public class GCMAdapter implements ProviderAdapter {
                 Sender sender = new Sender(notifier.getApiKey());
                 Message.Builder builder = new Message.Builder();
                 builder.setData(payload);
+                if(payload.containsKey("time_to_live")){
+                    int ttl = (int)payload.get("time_to_live");
+                    builder.timeToLive(ttl);
+                }
                 Message message = builder.build();
 
+
                 MulticastResult multicastResult = sender.send(message, ids, SEND_RETRIES);
                 LOG.debug("sendNotification result: {}", multicastResult);
 


[11/36] usergrid git commit: upgrade was sdk

Posted by sf...@apache.org.
upgrade was sdk


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/da6afb1f
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/da6afb1f
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/da6afb1f

Branch: refs/heads/master
Commit: da6afb1fd3097acc0af68323b34a98d04a8d9012
Parents: 5218cda
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Sep 25 09:55:10 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Sep 25 09:55:10 2015 -0600

----------------------------------------------------------------------
 stack/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/da6afb1f/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index 40a56fa..dfa5045 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -97,7 +97,7 @@
       <!-- =================================================================== -->
 
       <amber-version>0.22-incubating</amber-version>
-      <aws.version>1.10.6</aws.version>
+      <aws.version>1.10.20</aws.version>
       <cassandra-version>1.2.18</cassandra-version>
       <guava.version>18.0</guava.version>
       <guice.version>4.0-beta5</guice.version>


[23/36] usergrid git commit: fix the ingest method

Posted by sf...@apache.org.
fix the ingest method


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/19ae15bd
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/19ae15bd
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/19ae15bd

Branch: refs/heads/master
Commit: 19ae15bdeb41af154a4770b600d106acdb71dc88
Parents: 3ed0848
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Sep 28 11:14:28 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Sep 28 11:14:28 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/asyncevents/AmazonAsyncEventService.java   | 2 +-
 .../usergrid/persistence/index/impl/IndexOperationMessage.java | 6 ++++--
 2 files changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/19ae15bd/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index e1c6886..82e6d19 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -268,7 +268,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
             .flatMap(indexEventResults -> {
                 IndexOperationMessage combined = new IndexOperationMessage();
                 Observable.from(indexEventResults)
-                    .doOnNext(indexEventResult -> combined.injest(indexEventResult.getIndexOperationMessage().get())).subscribe();
+                    .doOnNext(indexEventResult -> combined.ingest(indexEventResult.getIndexOperationMessage().get())).subscribe();
                 indexProducer.put(combined).subscribe();
                 return Observable.from(indexEventResults);
             })

http://git-wip-us.apache.org/repos/asf/usergrid/blob/19ae15bd/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
index 0676314..12df390 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.persistence.index.impl;
 import java.io.Serializable;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.google.common.base.Optional;
@@ -108,7 +109,8 @@ public class IndexOperationMessage implements Serializable {
         return creationTime;
     }
 
-    public void injest(IndexOperationMessage singleMessage) {
-        si
+    public void ingest(IndexOperationMessage singleMessage) {
+        this.indexRequests.addAll(singleMessage.getIndexRequests().stream().collect(Collectors.toList()));
+        this.deIndexRequests.addAll(singleMessage.getDeIndexRequests().stream().collect(Collectors.toList()));
     }
 }


[20/36] usergrid git commit: remove properties

Posted by sf...@apache.org.
remove properties


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/a7700248
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/a7700248
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/a7700248

Branch: refs/heads/master
Commit: a770024818e478bceae39d8535e593cf56dcb2f4
Parents: e6c6c36
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Sep 25 17:33:35 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Sep 25 17:33:35 2015 -0600

----------------------------------------------------------------------
 .../assets/data/AwsSdkS3BinaryStore.java        | 42 ++++++++------------
 1 file changed, 16 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/a7700248/stack/services/src/main/java/org/apache/usergrid/services/assets/data/AwsSdkS3BinaryStore.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/assets/data/AwsSdkS3BinaryStore.java b/stack/services/src/main/java/org/apache/usergrid/services/assets/data/AwsSdkS3BinaryStore.java
index 688e7bf..6cf5149 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/assets/data/AwsSdkS3BinaryStore.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/assets/data/AwsSdkS3BinaryStore.java
@@ -93,6 +93,9 @@ public class  AwsSdkS3BinaryStore implements BinaryStore {
     @Autowired
     private EntityManagerFactory emf;
 
+    @Autowired
+    private Properties properties;
+
     public AwsSdkS3BinaryStore( ) {
     }
 
@@ -100,38 +103,25 @@ public class  AwsSdkS3BinaryStore implements BinaryStore {
     //ideally it should only do one. and the client should be initlized at the beginning of the run.
     private AmazonS3 getS3Client() throws Exception{
 
-            this.accessId = System.getProperty( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR );
-            if(accessId == null){
-                logger.error( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR + " not properly set so amazon access key is null" );
-                throw new AwsPropertiesNotFoundException( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR );
-
-            }
-            this.secretKey = System.getProperty( SDKGlobalConfiguration.SECRET_KEY_ENV_VAR );
-
-            if(secretKey == null){
-                logger.error( SDKGlobalConfiguration.SECRET_KEY_ENV_VAR + " not properly set so amazon secret key is null" );
-                throw new AwsPropertiesNotFoundException( SDKGlobalConfiguration.SECRET_KEY_ENV_VAR );
+        this.bucketName = properties.getProperty( "usergrid.binary.bucketname" );
+        if(bucketName == null){
+            logger.error( "usergrid.binary.bucketname  not properly set so amazon bucket is null" );
+            throw new AwsPropertiesNotFoundException( "usergrid.binary.bucketname" );
 
-            }
-            this.bucketName = System.getProperty( "usergrid.binary.bucketname" );
-            if(bucketName == null){
-                logger.error( "usergrid.binary.bucketname  not properly set so amazon bucket is null" );
-                throw new AwsPropertiesNotFoundException( "usergrid.binary.bucketname" );
-
-            }
+        }
 
-            AWSCredentials credentials = new BasicAWSCredentials(accessId, secretKey);
-            ClientConfiguration clientConfig = new ClientConfiguration();
-            clientConfig.setProtocol(Protocol.HTTP);
+        final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
+        AWSCredentials credentials = ugProvider.getCredentials();
+        ClientConfiguration clientConfig = new ClientConfiguration();
+        clientConfig.setProtocol(Protocol.HTTP);
 
-            s3Client = new AmazonS3Client(credentials, clientConfig);
-            if(regionName != null)
-                s3Client.setRegion( Region.getRegion(Regions.fromName(regionName)) );
+        s3Client = new AmazonS3Client(credentials, clientConfig);
+        if(regionName != null)
+            s3Client.setRegion( Region.getRegion(Regions.fromName(regionName)) );
 
         return s3Client;
     }
 
-
     @Override
     public void write( final UUID appId, final Entity entity, InputStream inputStream ) throws Exception {
 
@@ -186,7 +176,7 @@ public class  AwsSdkS3BinaryStore implements BinaryStore {
 
             // determine max size file allowed, default to 50mb
             long maxSizeBytes = 50 * FileUtils.ONE_MB;
-            String maxSizeMbString = System.getProperty( "usergrid.binary.max-size-mb", "50" );
+            String maxSizeMbString = properties.getProperty( "usergrid.binary.max-size-mb", "50" );
             if ( StringUtils.isNumeric( maxSizeMbString )) {
                 maxSizeBytes = Long.parseLong( maxSizeMbString ) * FileUtils.ONE_MB;
             }


[24/36] usergrid git commit: test fix

Posted by sf...@apache.org.
test fix


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/0428cd63
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/0428cd63
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/0428cd63

Branch: refs/heads/master
Commit: 0428cd632f58c08bfcd9dca489f1f746558e3ee4
Parents: 19ae15b
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Sep 28 11:31:27 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Sep 28 11:31:27 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/index/InMemoryAsycIndexServiceTest.java     | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/0428cd63/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
index 77d7cab..4666b4c 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence.index;
 
 
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.junit.Rule;
 import org.junit.runner.RunWith;
 
@@ -53,9 +54,11 @@ public class InMemoryAsycIndexServiceTest extends AsyncIndexServiceTest {
     public RxTaskScheduler rxTaskScheduler;
 
 
+    @Inject
+    public IndexProducer indexProducer;
     @Override
     protected AsyncEventService getAsyncEventService() {
-        return  new InMemoryAsyncEventService( eventBuilder, rxTaskScheduler, false  );
+        return  new InMemoryAsyncEventService( eventBuilder, rxTaskScheduler,indexProducer, false  );
     }
 
 


[16/36] usergrid git commit: remove system properities

Posted by sf...@apache.org.
remove system properities


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/cd623e60
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/cd623e60
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/cd623e60

Branch: refs/heads/master
Commit: cd623e60d1257bdd132f43f9b6ea1834172e3f81
Parents: 5233567
Author: Shawn Feldman <sh...@gmail.com>
Authored: Fri Sep 25 13:47:02 2015 -0600
Committer: Shawn Feldman <sh...@gmail.com>
Committed: Fri Sep 25 13:47:02 2015 -0600

----------------------------------------------------------------------
 .../services/assets/data/AwsSdkS3BinaryStore.java        | 11 ++++-------
 1 file changed, 4 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/cd623e60/stack/services/src/main/java/org/apache/usergrid/services/assets/data/AwsSdkS3BinaryStore.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/assets/data/AwsSdkS3BinaryStore.java b/stack/services/src/main/java/org/apache/usergrid/services/assets/data/AwsSdkS3BinaryStore.java
index a0fd361..688e7bf 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/assets/data/AwsSdkS3BinaryStore.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/assets/data/AwsSdkS3BinaryStore.java
@@ -93,9 +93,6 @@ public class  AwsSdkS3BinaryStore implements BinaryStore {
     @Autowired
     private EntityManagerFactory emf;
 
-    @Autowired
-    private Properties properties;
-
     public AwsSdkS3BinaryStore( ) {
     }
 
@@ -103,20 +100,20 @@ public class  AwsSdkS3BinaryStore implements BinaryStore {
     //ideally it should only do one. and the client should be initlized at the beginning of the run.
     private AmazonS3 getS3Client() throws Exception{
 
-            this.accessId = properties.getProperty( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR );
+            this.accessId = System.getProperty( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR );
             if(accessId == null){
                 logger.error( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR + " not properly set so amazon access key is null" );
                 throw new AwsPropertiesNotFoundException( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR );
 
             }
-            this.secretKey = properties.getProperty( SDKGlobalConfiguration.SECRET_KEY_ENV_VAR );
+            this.secretKey = System.getProperty( SDKGlobalConfiguration.SECRET_KEY_ENV_VAR );
 
             if(secretKey == null){
                 logger.error( SDKGlobalConfiguration.SECRET_KEY_ENV_VAR + " not properly set so amazon secret key is null" );
                 throw new AwsPropertiesNotFoundException( SDKGlobalConfiguration.SECRET_KEY_ENV_VAR );
 
             }
-            this.bucketName = properties.getProperty( "usergrid.binary.bucketname" );
+            this.bucketName = System.getProperty( "usergrid.binary.bucketname" );
             if(bucketName == null){
                 logger.error( "usergrid.binary.bucketname  not properly set so amazon bucket is null" );
                 throw new AwsPropertiesNotFoundException( "usergrid.binary.bucketname" );
@@ -189,7 +186,7 @@ public class  AwsSdkS3BinaryStore implements BinaryStore {
 
             // determine max size file allowed, default to 50mb
             long maxSizeBytes = 50 * FileUtils.ONE_MB;
-            String maxSizeMbString = properties.getProperty( "usergrid.binary.max-size-mb", "50" );
+            String maxSizeMbString = System.getProperty( "usergrid.binary.max-size-mb", "50" );
             if ( StringUtils.isNumeric( maxSizeMbString )) {
                 maxSizeBytes = Long.parseLong( maxSizeMbString ) * FileUtils.ONE_MB;
             }


[04/36] usergrid git commit: remove batch consumer

Posted by sf...@apache.org.
remove batch consumer


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/76816007
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/76816007
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/76816007

Branch: refs/heads/master
Commit: 76816007f34317e54659aaad3ac5c1bf59e8580d
Parents: ce5a96f
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Sep 15 13:11:12 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Sep 24 15:00:13 2015 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/index/IndexFig.java    | 14 ----
 .../index/impl/EsIndexBufferConsumerImpl.java   | 87 +++++++++-----------
 .../index/impl/IndexBufferConsumer.java         |  2 +
 .../index/impl/IndexRefreshCommandImpl.java     |  1 +
 4 files changed, 44 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/76816007/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index 5997029..db1ef3d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -153,21 +153,7 @@ public interface IndexFig extends GuicyFig {
     @Default( "2" )
     int getIndexCacheMaxWorkers();
 
-    /**
-     * The maximum time to wait before the buffer flushes and sends index write requests to Elasticsearch.
-     * This is used so the application doesn't wait forever for the buffer to reach its size before writing
-     * data to Elasticsearch.
-     */
-    @Default( "250" )
-    @Key( INDEX_BUFFER_TIMEOUT )
-    long getIndexBufferTimeout();
 
-    /**
-     * The maximum buffer size to use before sending index write requests to Elasticsearch.
-     */
-    @Default( "1000" )
-    @Key( INDEX_BUFFER_SIZE )
-    int getIndexBufferSize();
 
     /**
      * The number of worker threads used for flushing batches of index write requests

http://git-wip-us.apache.org/repos/asf/usergrid/blob/76816007/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 14f2b6f..91fab2f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -100,12 +100,12 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
 
     public Observable<IndexOperationMessage>  put( IndexOperationMessage message ) {
         Preconditions.checkNotNull(message, "Message cannot be null");
-        indexSizeCounter.inc( message.getDeIndexRequests().size() );
-        indexSizeCounter.inc( message.getIndexRequests().size() );
+        indexSizeCounter.inc(message.getDeIndexRequests().size());
+        indexSizeCounter.inc(message.getIndexRequests().size());
         Timer.Context time = offerTimer.time();
-        bufferProducer.send( message );
+        bufferProducer.send(message);
         time.stop();
-        return message.observable();
+        return  message.observable();
     }
 
 
@@ -114,22 +114,20 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
      */
     private void startSubscription() {
 
-
+        //buffer on our new thread with a timeout
         final Observable<IndexOperationMessage> observable = Observable.create(bufferProducer);
 
-        //buffer on our new thread with a timeout
-        observable.buffer( indexFig.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, indexFig.getIndexBufferSize(),
-            Schedulers.io() ).flatMap( indexOpBuffer -> {
+        observable.subscribeOn(Schedulers.io()).flatMap(indexOpBuffer -> {
 
             //hand off to processor in new observable thread so we can continue to buffer faster
-            return Observable.just( indexOpBuffer ).flatMap(
-                indexOpBufferObservable -> ObservableTimer.time( processBatch( indexOpBufferObservable ),flushTimer )
+            return Observable.just(indexOpBuffer).flatMap(
+                indexOpBufferObservable -> ObservableTimer.time(processBatch(indexOpBufferObservable), flushTimer)
             )
 
                 //use the I/O scheduler for thread re-use and efficiency in context switching then use our concurrent
                 // flatmap count or higher throughput of batches once buffered
-                .subscribeOn( Schedulers.io() );
-        }, indexFig.getIndexFlushWorkerCount() )
+                .subscribeOn(Schedulers.io());
+        }, indexFig.getIndexFlushWorkerCount())
             //start in the background
             .subscribe();
     }
@@ -137,63 +135,60 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
 
     /**
      * Process the buffer of batches
-     * @param batches
+     * @param batch
      * @return
      */
-    private Observable<IndexOperationMessage> processBatch( final List<IndexOperationMessage> batches ) {
+    private Observable<IndexOperationMessage> processBatch( final IndexOperationMessage batch ) {
 
 
-        final Observable<IndexOperationMessage> indexOps = Observable.from( batches );
-
         //take our stream of batches, then stream then into individual ops for consumption on ES
-        final Observable<BatchOperation> batchOps = indexOps.flatMap( batch -> {
 
-            final Set<IndexOperation> indexOperationSet = batch.getIndexRequests();
-            final Set<DeIndexOperation> deIndexOperationSet = batch.getDeIndexRequests();
 
-            final int indexOperationSetSize = indexOperationSet.size();
-            final int deIndexOperationSetSize = deIndexOperationSet.size();
+        final Set<IndexOperation> indexOperationSet = batch.getIndexRequests();
+        final Set<DeIndexOperation> deIndexOperationSet = batch.getDeIndexRequests();
+
+        final int indexOperationSetSize = indexOperationSet.size();
+        final int deIndexOperationSetSize = deIndexOperationSet.size();
 
-            log.debug( "Emitting {} add and {} remove operations", indexOperationSetSize, deIndexOperationSetSize );
+        log.debug("Emitting {} add and {} remove operations", indexOperationSetSize, deIndexOperationSetSize);
 
-            indexSizeCounter.dec( indexOperationSetSize );
-            indexSizeCounter.dec( deIndexOperationSetSize );
+        indexSizeCounter.dec(indexOperationSetSize);
+        indexSizeCounter.dec(deIndexOperationSetSize);
 
-            final Observable<IndexOperation> index = Observable.from( batch.getIndexRequests() );
-            final Observable<DeIndexOperation> deIndex = Observable.from( batch.getDeIndexRequests() );
+        final Observable<IndexOperation> index = Observable.from(batch.getIndexRequests());
+        final Observable<DeIndexOperation> deIndex = Observable.from(batch.getDeIndexRequests());
 
-            return Observable.merge( index, deIndex );
-        } );
+        final Observable<BatchOperation> batchOps = Observable.merge(index, deIndex);
 
         //buffer into the max size we can send ES and fire them all off until we're completed
-        final Observable<BulkRequestBuilder> requests = batchOps.buffer( indexFig.getIndexBatchSize() )
+        final Observable<BulkRequestBuilder> requests = batchOps.buffer(indexFig.getIndexBatchSize())
             //flatten the buffer into a single batch execution
-            .flatMap( individualOps -> Observable.from( individualOps )
+            .flatMap(individualOps -> Observable.from(individualOps)
                 //collect them
-                .collect( () -> initRequest(), ( bulkRequestBuilder, batchOperation ) -> {
-                    log.debug( "adding operation {} to bulkRequestBuilder {}", batchOperation, bulkRequestBuilder );
-                    batchOperation.doOperation( client, bulkRequestBuilder );
-                } ) )
+                .collect(() -> initRequest(), (bulkRequestBuilder, batchOperation) -> {
+                    log.debug("adding operation {} to bulkRequestBuilder {}", batchOperation, bulkRequestBuilder);
+                    batchOperation.doOperation(client, bulkRequestBuilder);
+                }))
                 //write them
-            .doOnNext( bulkRequestBuilder -> sendRequest( bulkRequestBuilder ) );
+            .doOnNext(bulkRequestBuilder -> sendRequest(bulkRequestBuilder));
 
 
         //now that we've processed them all, ack the futures after our last batch comes through
         final Observable<IndexOperationMessage> processedIndexOperations =
-            requests.lastOrDefault(null).flatMap( lastRequest ->{
-                if(lastRequest!=null){
-                    return Observable.from( batches ) ;
-                }else{
+            requests.lastOrDefault(null).flatMap(lastRequest -> {
+                if (lastRequest != null) {
+                    return Observable.just(batch);
+                } else {
                     return Observable.empty();
                 }
             });
 
         //subscribe to the operations that generate requests on a new thread so that we can execute them quickly
         //mark this as done
-        return processedIndexOperations.doOnNext( processedIndexOp -> {
-                processedIndexOp.done();
-                roundtripTimer.update( System.currentTimeMillis() - processedIndexOp.getCreationTime() );
-            } );
+        return processedIndexOperations.doOnNext(processedIndexOp -> {
+            processedIndexOp.done();
+            roundtripTimer.update(System.currentTimeMillis() - processedIndexOp.getCreationTime());
+        });
     }
 
 
@@ -266,12 +261,12 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
         /**
          * Send the data through the buffer
          */
-        public void send( final IndexOperationMessage indexOp ) {
+        public void send( final IndexOperationMessage indexOps ) {
             try {
-                subscriber.onNext( indexOp );
+                subscriber.onNext( indexOps );
             }catch(Exception e){
                 //re-throws so the caller can determine failover
-                log.error( "Unable to process message for indexOp {}, error follows.", indexOp, e );
+                log.error( "Unable to process message for indexOp {}, error follows.", indexOps, e );
                 throw e;
             }
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/76816007/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
index e769455..df2119c 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
@@ -20,6 +20,8 @@ package org.apache.usergrid.persistence.index.impl;
 
 import rx.Observable;
 
+import java.util.List;
+
 
 /**
  *  Buffer index requests

http://git-wip-us.apache.org/repos/asf/usergrid/blob/76816007/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
index 70220d0..7b9bc5d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.persistence.index.impl;
 
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.UUID;
 


[22/36] usergrid git commit: index will merge all batches

Posted by sf...@apache.org.
index will merge all batches


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/3ed08483
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/3ed08483
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/3ed08483

Branch: refs/heads/master
Commit: 3ed0848352e2507ad1c3ba886c433f95657a5c24
Parents: 4c263b8
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Sep 28 11:01:19 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Sep 28 11:01:19 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 126 +++++++++++++------
 .../asyncevents/AsyncIndexProvider.java         |  12 +-
 .../asyncevents/InMemoryAsyncEventService.java  |  16 ++-
 .../index/AmazonAsyncEventServiceTest.java      |   5 +-
 .../index/impl/IndexOperationMessage.java       |   5 +
 5 files changed, 115 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ed08483/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index fcb93c9..e1c6886 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -25,6 +25,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.base.Optional;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,6 +71,7 @@ import com.google.inject.Singleton;
 import rx.Observable;
 import rx.Subscriber;
 import rx.Subscription;
+import rx.functions.Func1;
 import rx.schedulers.Schedulers;
 
 
@@ -85,6 +88,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
     private final QueueManager queue;
     private final QueueScope queueScope;
     private final IndexProcessorFig indexProcessorFig;
+    private final IndexProducer indexProducer;
     private final EntityCollectionManagerFactory entityCollectionManagerFactory;
     private final IndexLocationStrategyFactory indexLocationStrategyFactory;
     private final EntityIndexFactory entityIndexFactory;
@@ -110,11 +114,16 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
 
     @Inject
-    public AmazonAsyncEventService( final QueueManagerFactory queueManagerFactory, final IndexProcessorFig indexProcessorFig,
-                                    final MetricsFactory metricsFactory,  final EntityCollectionManagerFactory entityCollectionManagerFactory,
-                                    final IndexLocationStrategyFactory indexLocationStrategyFactory, final EntityIndexFactory entityIndexFactory,
+    public AmazonAsyncEventService( final QueueManagerFactory queueManagerFactory,
+                                    final IndexProcessorFig indexProcessorFig,
+                                    final IndexProducer indexProducer,
+                                    final MetricsFactory metricsFactory,
+                                    final EntityCollectionManagerFactory entityCollectionManagerFactory,
+                                    final IndexLocationStrategyFactory indexLocationStrategyFactory,
+                                    final EntityIndexFactory entityIndexFactory,
                                     final EventBuilder eventBuilder,
                                     final RxTaskScheduler rxTaskScheduler ) {
+        this.indexProducer = indexProducer;
 
         this.entityCollectionManagerFactory = entityCollectionManagerFactory;
         this.indexLocationStrategyFactory = indexLocationStrategyFactory;
@@ -219,43 +228,60 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
 
     private void handleMessages( final List<QueueMessage> messages ) {
-        if ( logger.isDebugEnabled() ) {
-            logger.debug( "handleMessages with {} message", messages.size() );
+        if (logger.isDebugEnabled()) {
+            logger.debug("handleMessages with {} message", messages.size());
         }
 
-        for ( QueueMessage message : messages ) {
-            final AsyncEvent event = ( AsyncEvent ) message.getBody();
+        Observable<IndexEventResult> merged = Observable.empty();
+        for (QueueMessage message : messages) {
+            final AsyncEvent event = (AsyncEvent) message.getBody();
 
-            logger.debug( "Processing {} event", event );
+            logger.debug("Processing {} event", event);
 
-            if ( event == null ) {
-                logger.error( "AsyncEvent type or event is null!" );
+            if (event == null) {
+                logger.error("AsyncEvent type or event is null!");
                 continue;
             }
 
 
-            if ( event instanceof EdgeDeleteEvent ) {
-                handleEdgeDelete( message );
-            }
-            else if ( event instanceof EdgeIndexEvent ) {
-                handleEdgeIndex( message );
+            if (event instanceof EdgeDeleteEvent) {
+               merged = merged.mergeWith(callHandleIndex(queueMessage -> handleEdgeDelete(queueMessage), message));
+            } else if (event instanceof EdgeIndexEvent) {
+               merged = merged.mergeWith(callHandleIndex(queueMessage -> handleEdgeIndex(queueMessage),message));
+            } else if (event instanceof EntityDeleteEvent) {
+                merged = merged.mergeWith( callHandleIndex(queueMessage -> handleEntityDelete(queueMessage),message));
+            } else if (event instanceof EntityIndexEvent) {
+                merged = merged.mergeWith(callHandleIndex(queueMessage -> handleEntityIndexUpdate(queueMessage),message));
+            } else if (event instanceof InitializeApplicationIndexEvent) {
+                //does not return observable
+                handleInitializeApplicationIndex(message);
+            } else {
+                logger.error("Unknown EventType: {}", event);
             }
 
-            else if ( event instanceof EntityDeleteEvent ) {
-                handleEntityDelete( message );
-            }
-            else if ( event instanceof EntityIndexEvent ) {
-                handleEntityIndexUpdate( message );
-            }
+            messageCycle.update(System.currentTimeMillis() - event.getCreationTime());
+        }
 
-            else if ( event instanceof InitializeApplicationIndexEvent ) {
-                handleInitializeApplicationIndex( message );
-            }
-            else {
-                logger.error( "Unknown EventType: {}", event );
-            }
+        merged
+            .filter(indexEventResult -> indexEventResult.success() && indexEventResult.getIndexOperationMessage().isPresent())
+            .buffer(MAX_TAKE)
+            .flatMap(indexEventResults -> {
+                IndexOperationMessage combined = new IndexOperationMessage();
+                Observable.from(indexEventResults)
+                    .doOnNext(indexEventResult -> combined.injest(indexEventResult.getIndexOperationMessage().get())).subscribe();
+                indexProducer.put(combined).subscribe();
+                return Observable.from(indexEventResults);
+            })
+            .doOnNext(indexEventResult ->ack(indexEventResult.queueMessage));
+    }
 
-            messageCycle.update( System.currentTimeMillis() - event.getCreationTime() );
+    private Observable<IndexEventResult> callHandleIndex(Func1<QueueMessage,Observable<IndexOperationMessage>> toCall, QueueMessage message){
+        try{
+            IndexOperationMessage indexOperationMessage =  toCall.call(message).toBlocking().lastOrDefault(null);
+            return Observable.just(new IndexEventResult(message,Optional.fromNullable(indexOperationMessage),true));
+        }catch (Exception e){
+            logger.error("failed to run index",e);
+            return Observable.just( new IndexEventResult(message, Optional.<IndexOperationMessage>absent(),false));
         }
     }
 
@@ -276,7 +302,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
     }
 
 
-    public void handleEntityIndexUpdate(final QueueMessage message) {
+    public Observable<IndexOperationMessage> handleEntityIndexUpdate(final QueueMessage message) {
 
         Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEntityIndexUpdate" );
 
@@ -298,8 +324,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
         final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entityId, updatedAfter);
 
         final Observable<IndexOperationMessage> observable = eventBuilder.buildEntityIndex( entityIndexOperation );
-
-        subscribeAndAck( observable, message );
+        return observable;
     }
 
 
@@ -313,7 +338,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
         offer( operation );
     }
 
-    public void handleEdgeIndex(final QueueMessage message) {
+    public Observable<IndexOperationMessage> handleEdgeIndex(final QueueMessage message) {
 
         Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEdgeIndex");
 
@@ -333,8 +358,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(edgeIndexEvent.getEntityId()).flatMap( entity -> eventBuilder.buildNewEdge(
             applicationScope, entity, edge ) );
-
-        subscribeAndAck( edgeIndexObservable, message );
+        return edgeIndexObservable;
     }
 
     @Override
@@ -344,7 +368,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
         offer( new EdgeDeleteEvent( applicationScope, edge ) );
     }
 
-    public void handleEdgeDelete(final QueueMessage message) {
+    public Observable<IndexOperationMessage> handleEdgeDelete(final QueueMessage message) {
 
         Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEdgeDelete");
 
@@ -362,8 +386,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
         if (logger.isDebugEnabled()) logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
 
         final Observable<IndexOperationMessage> observable = eventBuilder.buildDeleteEdge( applicationScope, edge );
-
-        subscribeAndAck( observable, message );
+        return observable;
     }
 
 
@@ -378,7 +401,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
         return queue.getQueueDepth();
     }
 
-    public void handleEntityDelete(final QueueMessage message) {
+    public Observable<IndexOperationMessage> handleEntityDelete(final QueueMessage message) {
 
         Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete");
 
@@ -401,8 +424,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         final Observable merged = Observable.merge( entityDeleteResults.getEntitiesCompacted(),
             entityDeleteResults.getIndexObservable() );
-
-        subscribeAndAck( merged, message );
+        return merged;
     }
 
 
@@ -526,4 +548,28 @@ public class AmazonAsyncEventService implements AsyncEventService {
     private void subscribeAndAck( final Observable<?> observable, final QueueMessage message ){
        observable.doOnCompleted( ()-> ack(message)  ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
     }
+    public static class IndexEventResult{
+        private final QueueMessage queueMessage;
+        private final Optional<IndexOperationMessage> indexOperationMessage;
+        private final boolean success;
+
+        public IndexEventResult(QueueMessage queueMessage, Optional<IndexOperationMessage> indexOperationMessage ,boolean success){
+
+            this.queueMessage = queueMessage;
+            this.indexOperationMessage = indexOperationMessage;
+            this.success = success;
+        }
+
+        public QueueMessage getQueueMessage() {
+            return queueMessage;
+        }
+
+        public boolean success() {
+            return success;
+        }
+
+        public Optional<IndexOperationMessage> getIndexOperationMessage() {
+            return indexOperationMessage;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ed08483/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 0e773cf..e9e36f0 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -27,6 +27,7 @@ import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.apache.usergrid.persistence.queue.QueueManagerFactory;
 
 import com.google.inject.Inject;
@@ -49,6 +50,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
     private final EventBuilder eventBuilder;
     private final IndexLocationStrategyFactory indexLocationStrategyFactory;
     private final EntityIndexFactory entityIndexFactory;
+    private final IndexProducer indexProducer;
 
     private AsyncEventService asyncEventService;
 
@@ -61,7 +63,8 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
                               final EntityCollectionManagerFactory entityCollectionManagerFactory,
                               final EventBuilder eventBuilder,
                               final IndexLocationStrategyFactory indexLocationStrategyFactory,
-                              final EntityIndexFactory entityIndexFactory) {
+                              final EntityIndexFactory entityIndexFactory,
+                              final IndexProducer indexProducer) {
 
         this.indexProcessorFig = indexProcessorFig;
         this.queueManagerFactory = queueManagerFactory;
@@ -71,6 +74,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
         this.eventBuilder = eventBuilder;
         this.indexLocationStrategyFactory = indexLocationStrategyFactory;
         this.entityIndexFactory = entityIndexFactory;
+        this.indexProducer = indexProducer;
     }
 
 
@@ -92,12 +96,12 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
 
         switch (impl) {
             case LOCAL:
-                return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProcessorFig.resolveSynchronously());
+                return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProducer,indexProcessorFig.resolveSynchronously());
             case SQS:
-                return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory,
+                return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
                     entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler );
             case SNS:
-                return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory,
+                return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
                     entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler );
             default:
                 throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ed08483/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index 6a71b3e..fad6e48 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.corepersistence.asyncevents;
 
 
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,14 +50,19 @@ public class InMemoryAsyncEventService implements AsyncEventService {
 
     private final EventBuilder eventBuilder;
     private final RxTaskScheduler rxTaskScheduler;
+    private final IndexProducer indexProducer;
     private final boolean resolveSynchronously;
 
 
     @Inject
-    public InMemoryAsyncEventService( final EventBuilder eventBuilder, final RxTaskScheduler rxTaskScheduler, boolean
-        resolveSynchronously ) {
+    public InMemoryAsyncEventService( final EventBuilder eventBuilder,
+                                      final RxTaskScheduler rxTaskScheduler,
+                                      final IndexProducer indexProducer,
+                                      boolean resolveSynchronously
+    ) {
         this.eventBuilder = eventBuilder;
         this.rxTaskScheduler = rxTaskScheduler;
+        this.indexProducer = indexProducer;
         this.resolveSynchronously = resolveSynchronously;
     }
 
@@ -117,12 +124,13 @@ public class InMemoryAsyncEventService implements AsyncEventService {
     }
 
     public void run( Observable<?> observable ) {
+        Observable mapped = observable.map(message -> message instanceof IndexOperationMessage ? indexProducer.put((IndexOperationMessage)message) : Observable.just(message));
         //start it in the background on an i/o thread
         if ( !resolveSynchronously ) {
-            observable.subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
+            mapped.subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).subscribe();
         }
         else {
-            observable.toBlocking().lastOrDefault(null);
+            mapped.subscribe();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ed08483/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
index 4660389..a14437c 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.corepersistence.index;
 
 import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.junit.Rule;
 import org.junit.runner.RunWith;
 
@@ -72,6 +73,8 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {
     @Inject
     public EventBuilder eventBuilder;
 
+    @Inject
+    public IndexProducer indexProducer;
 
     @Inject
     public IndexLocationStrategyFactory indexLocationStrategyFactory;
@@ -82,7 +85,7 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {
 
     @Override
     protected AsyncEventService getAsyncEventService() {
-        return  new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, metricsFactory,  entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler );
+        return  new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,  entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ed08483/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
index bd2bec8..0676314 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.Set;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.base.Optional;
 
 
 /**
@@ -106,4 +107,8 @@ public class IndexOperationMessage implements Serializable {
     public long getCreationTime() {
         return creationTime;
     }
+
+    public void injest(IndexOperationMessage singleMessage) {
+        si
+    }
 }


[29/36] usergrid git commit: remove job scheduler log

Posted by sf...@apache.org.
remove job scheduler log


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/4e98ed79
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/4e98ed79
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/4e98ed79

Branch: refs/heads/master
Commit: 4e98ed7935d380009269255497e4431dddbb9497
Parents: a770024
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Sep 28 16:13:40 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Sep 28 16:37:25 2015 -0600

----------------------------------------------------------------------
 .../org/apache/usergrid/batch/service/JobSchedulerService.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e98ed79/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java b/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java
index d76be6f..284e1db 100644
--- a/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java
@@ -131,7 +131,7 @@ public class JobSchedulerService extends AbstractScheduledService {
             }
         }
         catch ( Throwable t ) {
-            LOG.error( "Scheduler run failed, error is", t );
+            LOG.debug( "Scheduler run failed, error is", t );
         }
     }
 


[31/36] usergrid git commit: Merge commit 'refs/pull/391/head' of github.com:apache/usergrid into 2.1-release

Posted by sf...@apache.org.
Merge commit 'refs/pull/391/head' of github.com:apache/usergrid into 2.1-release


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/6626b83e
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/6626b83e
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/6626b83e

Branch: refs/heads/master
Commit: 6626b83e35fda96d2601581affcf2b920df7a322
Parents: 4e98ed7 c19adb8
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Sep 29 10:09:14 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Sep 29 10:09:14 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpManagerCache.java         |  12 +-
 .../corepersistence/CpRelationManager.java      |   4 +-
 .../usergrid/corepersistence/ManagerCache.java  |   7 +
 .../asyncevents/AmazonAsyncEventService.java    | 151 +++++++++++++------
 .../asyncevents/AsyncIndexProvider.java         |  12 +-
 .../asyncevents/InMemoryAsyncEventService.java  |  26 +++-
 .../corepersistence/index/IndexServiceImpl.java |   8 +-
 .../read/search/CandidateEntityFilter.java      |  16 +-
 .../pipeline/read/search/CandidateIdFilter.java |  16 +-
 .../index/AmazonAsyncEventServiceTest.java      |   5 +-
 .../index/InMemoryAsycIndexServiceTest.java     |   5 +-
 .../corepersistence/index/IndexServiceTest.java |  18 ++-
 .../persistence/index/EntityIndexBatch.java     |   7 +-
 .../index/impl/EsEntityIndexBatchImpl.java      |   8 +-
 .../index/impl/EsEntityIndexFactoryImpl.java    |   4 -
 .../index/impl/EsEntityIndexImpl.java           |   5 +-
 .../index/impl/IndexOperationMessage.java       |   7 +
 .../index/impl/IndexRefreshCommandImpl.java     |   2 +-
 .../persistence/index/impl/EntityIndexTest.java |  48 +++---
 .../persistence/index/impl/GeoPagingTest.java   |   4 +-
 .../index/impl/IndexLoadTestsIT.java            |   5 +-
 .../exceptions/AbstractExceptionMapper.java     |   2 +-
 22 files changed, 254 insertions(+), 118 deletions(-)
----------------------------------------------------------------------



[35/36] usergrid git commit: merge from 2.1-release

Posted by sf...@apache.org.
merge from 2.1-release


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/bce1359b
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/bce1359b
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/bce1359b

Branch: refs/heads/master
Commit: bce1359b8874406e9d03da03eea4fe19e91dafcd
Parents: 3b838c8 9ceae77
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Oct 1 09:59:48 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Oct 1 09:59:48 2015 -0600

----------------------------------------------------------------------
 stack/awscluster/gatling-cluster-cf.json        |   2 +-
 .../batch/service/JobSchedulerService.java      |   2 +-
 .../corepersistence/CpManagerCache.java         |  12 +-
 .../usergrid/corepersistence/ManagerCache.java  |   7 +
 .../asyncevents/AmazonAsyncEventService.java    | 151 +++++++++++++------
 .../asyncevents/AsyncIndexProvider.java         |  12 +-
 .../asyncevents/InMemoryAsyncEventService.java  |  26 +++-
 .../corepersistence/index/IndexServiceImpl.java |  22 +--
 .../read/search/CandidateEntityFilter.java      |  21 +--
 .../pipeline/read/search/CandidateIdFilter.java |  15 +-
 .../index/AmazonAsyncEventServiceTest.java      |   5 +-
 .../index/InMemoryAsycIndexServiceTest.java     |   5 +-
 .../corepersistence/index/IndexServiceTest.java |  18 ++-
 .../persistence/index/EntityIndexBatch.java     |   8 +-
 .../index/impl/EsEntityIndexBatchImpl.java      |   6 +-
 .../index/impl/EsEntityIndexFactoryImpl.java    |   4 -
 .../index/impl/EsEntityIndexImpl.java           |   5 +-
 .../index/impl/EsIndexProducerImpl.java         |  39 +++--
 .../index/impl/IndexOperationMessage.java       |   7 +
 .../index/impl/IndexRefreshCommandImpl.java     |   2 +-
 .../persistence/index/impl/EntityIndexTest.java |  54 +++----
 .../persistence/index/impl/GeoPagingTest.java   |   3 +-
 .../index/impl/IndexLoadTestsIT.java            |   9 +-
 stack/pom.xml                                   |   2 +-
 .../exceptions/AbstractExceptionMapper.java     |   2 +-
 stack/rest_integration_tests/lib/entities.js    |  41 +++++
 .../test/entities/create.js                     |  24 ++-
 .../management/AppInfoMigrationPlugin.java      |  11 ++
 .../assets/data/AwsSdkS3BinaryStore.java        |  37 ++---
 29 files changed, 360 insertions(+), 192 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/bce1359b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
index ac2962a,ceb18ae..551d083
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
@@@ -177,7 -178,8 +178,7 @@@ public class CandidateEntityFilter exte
                  validate( candidateResult );
              }
  
-             producer.put(batch).subscribe();
+             indexProducer.put(batch.build()).subscribe();
 -
          }
  
  

http://git-wip-us.apache.org/repos/asf/usergrid/blob/bce1359b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/usergrid/blob/bce1359b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 3679887,68830ca..1ffcd02
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@@ -120,8 -118,9 +118,8 @@@ public class EsEntityIndexBatchImpl imp
          return deindex( searchEdge, entity.getId(), entity.getVersion() );
      }
  
 -
      @Override
-     public IndexOperationMessage build(){
+     public IndexOperationMessage build() {
          return container;
      }
  

http://git-wip-us.apache.org/repos/asf/usergrid/blob/bce1359b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
index 9223293,2b36fc8..b784c2d
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
@@@ -24,7 -22,6 +22,9 @@@ import java.util.Set
  import java.util.concurrent.atomic.AtomicLong;
  
  import com.codahale.metrics.Histogram;
++
++
 +import org.apache.usergrid.persistence.index.EntityIndexBatch;
  import org.elasticsearch.action.WriteConsistencyLevel;
  import org.elasticsearch.action.bulk.BulkItemResponse;
  import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@@ -88,14 -85,8 +88,13 @@@ public class EsIndexProducerImpl implem
  
      }
  
-     public Observable<IndexOperationMessage>  put( EntityIndexBatch batch ) {
-         Preconditions.checkNotNull(batch, "Batch cannot be null");
-         return put(batch.build());
++    @Override
++    public Observable<IndexOperationMessage> put(EntityIndexBatch message) {
++        return put(message.build());
 +    }
 +
      public Observable<IndexOperationMessage>  put( IndexOperationMessage message ) {
- 
-             Preconditions.checkNotNull(message, "Message cannot be null");
+         Preconditions.checkNotNull(message, "Message cannot be null");
          indexSizeCounter.inc(message.getDeIndexRequests().size());
          indexSizeCounter.inc(message.getIndexRequests().size());
          return  processBatch(message);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/bce1359b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index b8136fa,5243d5a..3a91f12
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@@ -135,11 -136,11 +136,11 @@@ public class EntityIndexTest extends Ba
          entity1.setField(new UUIDField(IndexingUtils.ENTITY_ID_FIELDNAME, UUID.randomUUID()));
          entity1.setField( new StringField( "testfield", "test" ) );
          entity1.setField(new IntegerField("ordinal", 0));
 -        entity1.setField(new UUIDField("testuuid",uuid));
 +        entity1.setField(new UUIDField("testuuid", uuid));
  
  
-         batch.index(indexEdge, entity1);
-         indexProducer.put(batch).subscribe();
+         batch.index( indexEdge, entity1 );
+         indexProducer.put(batch.build()).subscribe();
  
  
          Entity entity2 = new Entity( entityType );
@@@ -147,13 -148,13 +148,13 @@@
  
  
          List<String> list = new ArrayList<>();
 -        list.add( "test" );
 -        entity2.setField( new ArrayField<>( "testfield", list ) );
 -        entity2.setField( new IntegerField( "ordinal", 1 ) );
 +        list.add("test");
 +        entity2.setField(new ArrayField<>("testfield", list));
 +        entity2.setField(new IntegerField("ordinal", 1));
  
  
-         batch.index(indexEdge, entity2);
-         indexProducer.put(batch).subscribe();
+         batch.index( indexEdge, entity2 );
+         indexProducer.put(batch.build()).subscribe();;
  
          entityIndex.refreshAsync().toBlocking().first();
  
@@@ -362,11 -363,11 +363,11 @@@
  
  
          Entity entity = EntityIndexMapUtils.fromMap( entityMap );
 -        EntityUtils.setId( entity, new SimpleId( "fastcar" ) );
 -        EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() );
 -        entity.setField( new UUIDField( IndexingUtils.ENTITY_ID_FIELDNAME, UUID.randomUUID() ) );
 +        EntityUtils.setId(entity, new SimpleId( "fastcar" ) );
 +        EntityUtils.setVersion(entity, UUIDGenerator.newTimeUUID() );
 +        entity.setField(new UUIDField(IndexingUtils.ENTITY_ID_FIELDNAME, UUID.randomUUID() ) );
  
-         indexProducer.put(entityIndex.createBatch().index( searchEdge, entity )).toBlocking().last();
+         indexProducer.put(entityIndex.createBatch().index( searchEdge, entity ).build()).subscribe();
          entityIndex.refreshAsync().toBlocking().first();
  
          CandidateResults candidateResults = entityIndex

http://git-wip-us.apache.org/repos/asf/usergrid/blob/bce1359b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
index 4d727c3,98b85f1..e74e95e
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
@@@ -70,8 -70,10 +70,7 @@@ public class GeoPagingTest extends Base
      @Inject
      public EntityIndexFactory eif;
  
- 
      @Inject
 -    IndexProducer indexProducer;
 -
 -    @Inject
      @Rule
      public MigrationManagerRule migrationManagerRule;
  

http://git-wip-us.apache.org/repos/asf/usergrid/blob/bce1359b/stack/pom.xml
----------------------------------------------------------------------


[06/36] usergrid git commit: remove threading

Posted by sf...@apache.org.
remove threading


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2a168278
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2a168278
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2a168278

Branch: refs/heads/master
Commit: 2a16827831f65b61c7de25be262551fe87928425
Parents: 805113c
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Sep 15 13:45:08 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Sep 24 15:00:45 2015 -0600

----------------------------------------------------------------------
 .../core/future/FutureObservable.java           | 44 -------------
 .../persistence/index/EntityIndexBatch.java     |  1 -
 .../index/impl/EsIndexBufferConsumerImpl.java   | 65 +-------------------
 .../index/impl/IndexOperationMessage.java       | 18 ------
 4 files changed, 1 insertion(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/2a168278/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java
deleted file mode 100644
index 06eed4d..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.usergrid.persistence.core.future;
-
-
-import rx.Observable;
-
-import java.util.concurrent.FutureTask;
-
-
-/**
- * Future without the exception nastiness
- */
-public class FutureObservable<T> {
-
-    private final FutureTask<T> future;
-
-
-    public FutureObservable(final T returnVal) {
-        future = new FutureTask<>( () -> returnVal );
-    }
-
-    public void done() {
-        future.run();
-    }
-
-    public Observable<T> observable() {
-        return  Observable.from(future);
-    }
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2a168278/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
index 85b234a..d1b076d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
@@ -20,7 +20,6 @@ package org.apache.usergrid.persistence.index;/*
 
 import java.util.UUID;
 
-import org.apache.usergrid.persistence.core.future.FutureObservable;
 import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2a168278/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 91fab2f..93c0d85 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -33,7 +33,6 @@ import org.elasticsearch.client.Client;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.usergrid.persistence.core.future.FutureObservable;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.index.IndexFig;
 
@@ -62,8 +61,6 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
     private final Timer flushTimer;
     private final IndexFig indexFig;
     private final Counter indexSizeCounter;
-    private final Timer offerTimer;
-    private final BufferProducer bufferProducer;
     private final Histogram roundtripTimer;
     private final Timer indexTimer;
 
@@ -76,7 +73,6 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                                       final MetricsFactory metricsFactory, final IndexFig indexFig ) {
         this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index_buffer.flush");
         this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "index_buffer.size");
-        this.offerTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index_buffer.producer");
         this.roundtripTimer = metricsFactory.getHistogram(EsIndexBufferConsumerImpl.class, "index_buffer.message_cycle");
 
         //wire up the gauge of inflight messages
@@ -90,11 +86,9 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
         this.client = provider.getClient();
         this.indexFig = indexFig;
 
-        this.bufferProducer = new BufferProducer();
 
         //batch up sets of some size and send them in batch
 
-        startSubscription();
     }
 
 
@@ -102,34 +96,8 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
         Preconditions.checkNotNull(message, "Message cannot be null");
         indexSizeCounter.inc(message.getDeIndexRequests().size());
         indexSizeCounter.inc(message.getIndexRequests().size());
-        Timer.Context time = offerTimer.time();
-        bufferProducer.send(message);
-        time.stop();
-        return  message.observable();
-    }
-
-
-    /**
-     * Start the subscription
-     */
-    private void startSubscription() {
-
-        //buffer on our new thread with a timeout
-        final Observable<IndexOperationMessage> observable = Observable.create(bufferProducer);
 
-        observable.subscribeOn(Schedulers.io()).flatMap(indexOpBuffer -> {
-
-            //hand off to processor in new observable thread so we can continue to buffer faster
-            return Observable.just(indexOpBuffer).flatMap(
-                indexOpBufferObservable -> ObservableTimer.time(processBatch(indexOpBufferObservable), flushTimer)
-            )
-
-                //use the I/O scheduler for thread re-use and efficiency in context switching then use our concurrent
-                // flatmap count or higher throughput of batches once buffered
-                .subscribeOn(Schedulers.io());
-        }, indexFig.getIndexFlushWorkerCount())
-            //start in the background
-            .subscribe();
+        return  processBatch(message);
     }
 
 
@@ -140,10 +108,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
      */
     private Observable<IndexOperationMessage> processBatch( final IndexOperationMessage batch ) {
 
-
         //take our stream of batches, then stream then into individual ops for consumption on ES
-
-
         final Set<IndexOperation> indexOperationSet = batch.getIndexRequests();
         final Set<DeIndexOperation> deIndexOperationSet = batch.getDeIndexRequests();
 
@@ -186,7 +151,6 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
         //subscribe to the operations that generate requests on a new thread so that we can execute them quickly
         //mark this as done
         return processedIndexOperations.doOnNext(processedIndexOp -> {
-            processedIndexOp.done();
             roundtripTimer.update(System.currentTimeMillis() - processedIndexOp.getCreationTime());
         });
     }
@@ -251,31 +215,4 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                     + "entries" );
         }
     }
-
-
-    public static class BufferProducer implements Observable.OnSubscribe<IndexOperationMessage> {
-
-        private Subscriber<? super IndexOperationMessage> subscriber;
-
-
-        /**
-         * Send the data through the buffer
-         */
-        public void send( final IndexOperationMessage indexOps ) {
-            try {
-                subscriber.onNext( indexOps );
-            }catch(Exception e){
-                //re-throws so the caller can determine failover
-                log.error( "Unable to process message for indexOp {}, error follows.", indexOps, e );
-                throw e;
-            }
-        }
-
-
-        @Override
-        public void call( final Subscriber<? super IndexOperationMessage> subscriber ) {
-            //just assigns for later use, doesn't do anything else
-            this.subscriber = subscriber;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2a168278/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
index 0a49626..bd2bec8 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
@@ -24,10 +24,7 @@ import java.io.Serializable;
 import java.util.HashSet;
 import java.util.Set;
 
-import org.apache.usergrid.persistence.core.future.FutureObservable;
-
 import com.fasterxml.jackson.annotation.JsonIgnore;
-import rx.Observable;
 
 
 /**
@@ -40,13 +37,11 @@ public class IndexOperationMessage implements Serializable {
     private long creationTime;
 
 
-    private final FutureObservable<IndexOperationMessage> containerFuture;
 
 
     public IndexOperationMessage() {
         this.indexRequests = new HashSet<>();
         this.deIndexRequests = new HashSet<>();
-        this.containerFuture = new FutureObservable<>( this );
         this.creationTime = System.currentTimeMillis();
     }
 
@@ -78,15 +73,6 @@ public class IndexOperationMessage implements Serializable {
         return indexRequests.isEmpty() && deIndexRequests.isEmpty();
     }
 
-    /**
-     * return the promise
-     */
-    @JsonIgnore
-    public Observable<IndexOperationMessage> observable() {
-        return containerFuture.observable();
-    }
-
-
     @Override
     public boolean equals( final Object o ) {
         if ( this == o ) {
@@ -116,10 +102,6 @@ public class IndexOperationMessage implements Serializable {
         return result;
     }
 
-    public void done() {
-        //if this has been serialized, it could be null. don't NPE if it is, there's nothing to ack
-        containerFuture.done();
-    }
 
     public long getCreationTime() {
         return creationTime;


[18/36] usergrid git commit: Merge branch '2.1-release' of https://git-wip-us.apache.org/repos/asf/usergrid into 2.1-release

Posted by sf...@apache.org.
Merge branch '2.1-release' of https://git-wip-us.apache.org/repos/asf/usergrid into 2.1-release


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/175d5268
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/175d5268
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/175d5268

Branch: refs/heads/master
Commit: 175d5268fe5c74c4df7ad16fa6e2d2d370cfb77b
Parents: 5218cda cd623e6
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Sep 25 14:18:27 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Sep 25 14:18:27 2015 -0600

----------------------------------------------------------------------
 .../usergrid/management/AppInfoMigrationPlugin.java      | 11 +++++++++++
 .../services/assets/data/AwsSdkS3BinaryStore.java        | 11 ++++-------
 2 files changed, 15 insertions(+), 7 deletions(-)
----------------------------------------------------------------------



[36/36] usergrid git commit: fix rest tests

Posted by sf...@apache.org.
fix rest tests


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/a56c8ddd
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/a56c8ddd
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/a56c8ddd

Branch: refs/heads/master
Commit: a56c8dddfcde2d17ec03f0b5c6cb8fc6e0b34700
Parents: bce1359
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Oct 1 10:27:22 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Oct 1 10:27:22 2015 -0600

----------------------------------------------------------------------
 .../collection/users/ConnectionResourceTest.java        | 12 ++++++------
 stack/rest_integration_tests/config/default.js          |  1 +
 stack/rest_integration_tests/test/entities/create.js    |  2 +-
 3 files changed, 8 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/a56c8ddd/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/ConnectionResourceTest.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/ConnectionResourceTest.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/ConnectionResourceTest.java
index 777b04e..893703e 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/ConnectionResourceTest.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/ConnectionResourceTest.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.usergrid.services.exceptions.ServiceResourceNotFoundException;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,6 +35,8 @@ import org.apache.usergrid.rest.test.resource.model.QueryParameters;
 
 import com.sun.jersey.api.client.UniformInterfaceException;
 
+import javax.ws.rs.NotFoundException;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
@@ -82,9 +85,8 @@ public class ConnectionResourceTest extends AbstractRestIT {
                 .get();
             fail( "This should throw an exception" );
         }
-        catch ( UniformInterfaceException uie ) {
+        catch ( NotFoundException uie ) {
             // Should return a 404 Not Found
-            assertEquals( 404, uie.getResponse().getStatus() );
         }
     }
 
@@ -172,9 +174,8 @@ public class ConnectionResourceTest extends AbstractRestIT {
             thing2 = this.app().collection( "things" ).entity( thing2 ).get();
             fail( "This should throw an exception" );
         }
-        catch ( UniformInterfaceException uie ) {
+        catch ( NotFoundException uie ) {
             // Should return a 404 Not Found
-            assertEquals( 404, uie.getResponse().getStatus() );
         }
     }
 
@@ -208,9 +209,8 @@ public class ConnectionResourceTest extends AbstractRestIT {
             thing1 = this.app().collection( "things" ).entity( thing1 ).get();
             fail( "This should throw an exception" );
         }
-        catch ( UniformInterfaceException uie ) {
+        catch ( NotFoundException uie ) {
             // Should return a 404 Not Found
-            assertEquals( 404, uie.getResponse().getStatus() );
         }
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/a56c8ddd/stack/rest_integration_tests/config/default.js
----------------------------------------------------------------------
diff --git a/stack/rest_integration_tests/config/default.js b/stack/rest_integration_tests/config/default.js
index 5140638..666f3ea 100644
--- a/stack/rest_integration_tests/config/default.js
+++ b/stack/rest_integration_tests/config/default.js
@@ -20,6 +20,7 @@ module.exports = {
     appName: "test-app", //must pre create app
     numberOfUsers: 5,
     numberOfEntities: 20,
+    numberOfEntitiesConsistency: 100,
     org: {
         clientId: "",
         clientSecret: ""

http://git-wip-us.apache.org/repos/asf/usergrid/blob/a56c8ddd/stack/rest_integration_tests/test/entities/create.js
----------------------------------------------------------------------
diff --git a/stack/rest_integration_tests/test/entities/create.js b/stack/rest_integration_tests/test/entities/create.js
index c7e6f42..3fa6831 100644
--- a/stack/rest_integration_tests/test/entities/create.js
+++ b/stack/rest_integration_tests/test/entities/create.js
@@ -20,7 +20,7 @@ var config = require('../../config');
 
 module.exports = {
     test: function() {
-        var numberOfRecords = 30;
+        var numberOfRecords = config.numberOfEntitiesConsistency;
         var uuid = require("uuid");
         var id = "resttest_"+ uuid.v1().toString().replace("-", "");
 


[25/36] usergrid git commit: fix observables

Posted by sf...@apache.org.
fix observables


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/a9dc6ba2
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/a9dc6ba2
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/a9dc6ba2

Branch: refs/heads/master
Commit: a9dc6ba22e028dcae00cb03b210f12f1a598583b
Parents: 0428cd6
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Sep 28 14:51:29 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Sep 28 14:51:29 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/InMemoryAsyncEventService.java    | 16 +++++++++++-----
 .../corepersistence/index/IndexServiceTest.java   | 18 +++++++++++++-----
 .../index/impl/IndexRefreshCommandImpl.java       |  2 +-
 .../rest/exceptions/AbstractExceptionMapper.java  |  2 +-
 4 files changed, 26 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/a9dc6ba2/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index fad6e48..b29c39e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -124,14 +124,20 @@ public class InMemoryAsyncEventService implements AsyncEventService {
     }
 
     public void run( Observable<?> observable ) {
-        Observable mapped = observable.map(message -> message instanceof IndexOperationMessage ? indexProducer.put((IndexOperationMessage)message) : Observable.just(message));
+
         //start it in the background on an i/o thread
         if ( !resolveSynchronously ) {
-            mapped.subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).subscribe();
-        }
-        else {
-            mapped.subscribe();
+            observable = observable.subscribeOn(rxTaskScheduler.getAsyncIOScheduler());
         }
+
+        Observable mapped = observable.flatMap(message ->{
+            if(message instanceof IndexOperationMessage) {
+                return indexProducer.put((IndexOperationMessage)message);
+            } else{
+                return Observable.just(message);
+            }
+        });
+        mapped.subscribe();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/a9dc6ba2/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
index 50c5501..6001dd4 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -80,6 +81,9 @@ public class IndexServiceTest {
     public GraphManagerFactory graphManagerFactory;
 
     @Inject
+    public IndexProducer indexProducer;
+
+    @Inject
     public EntityCollectionManagerFactory entityCollectionManagerFactory;
 
     @Inject
@@ -121,6 +125,7 @@ public class IndexServiceTest {
 
         //real users should never call to blocking, we're not sure what we'll get
         final IndexOperationMessage results = indexed.toBlocking().last();
+        indexProducer.put(results).subscribe();
 
         final Set<IndexOperation> indexRequests = results.getIndexRequests();
 
@@ -170,7 +175,8 @@ public class IndexServiceTest {
 
 
         //now index
-        final int batches = indexService.indexEntity( applicationScope, testEntity ).count().toBlocking().last();
+        final int batches = indexService.indexEntity( applicationScope, testEntity )
+            .flatMap(mesage -> indexProducer.put(mesage)).count().toBlocking().last();
 
 
         assertEquals(1, batches);
@@ -255,7 +261,8 @@ public class IndexServiceTest {
 
 
         //now index
-        final int batches = indexService.indexEntity( applicationScope, testEntity ).count().toBlocking().last();
+        final int batches = indexService.indexEntity( applicationScope, testEntity )
+            .flatMap(mesage -> indexProducer.put(mesage)).count().toBlocking().last();
 
         //take our edge count + 1 and divided by batch sizes
         final int expectedSize = ( int ) Math.ceil( ( (double)edgeCount + 1 ) / indexFig.getIndexBatchSize() );
@@ -372,7 +379,7 @@ public class IndexServiceTest {
 
         final List<Edge> connectionSearchEdges = createConnectionSearchEdges( testEntity, graphManager, edgeCount );
 
-        indexService.indexEntity( applicationScope, testEntity ).toBlocking().getIterator();
+        indexService.indexEntity( applicationScope, testEntity ).flatMap(mesage -> indexProducer.put(mesage)).toBlocking().getIterator();
 
         //query until results are available for collections
         final SearchEdge collectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( collectionEdge );
@@ -396,7 +403,7 @@ public class IndexServiceTest {
 
             //step 2
             IndexOperationMessage indexOperationMessage =
-                indexService.deleteIndexEdge( applicationScope, toBeDeletedEdge ).toBlocking().lastOrDefault( null );
+                indexService.deleteIndexEdge( applicationScope, toBeDeletedEdge ) .flatMap(mesage ->indexProducer.put(mesage)).toBlocking().lastOrDefault( null );
 
             //not sure if this is still valid.
             assertEquals( 1, indexOperationMessage.getDeIndexRequests().size() );
@@ -436,7 +443,8 @@ public class IndexServiceTest {
         final Edge connectionSearch = graphManager.writeEdge( connectionEdge ).toBlocking().last();
 
         //now index
-        indexService.indexEntity( applicationScope, testEntity ).count().toBlocking().last();
+        indexService.indexEntity( applicationScope, testEntity)
+            .flatMap(mesage ->indexProducer.put(mesage)).count().toBlocking().last();
 
         //query until results are available for collections
         final SearchEdge collectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( collectionEdge );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/a9dc6ba2/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
index 01942a8..087eefe 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
@@ -193,7 +193,7 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand {
                 //delete the item
                 IndexOperationMessage indexOperationMessage = new IndexOperationMessage();
                 indexOperationMessage.addDeIndexRequest( deIndexRequest );
-                producer.put( indexOperationMessage );
+                producer.put( indexOperationMessage ).subscribe();
             } );
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/a9dc6ba2/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/AbstractExceptionMapper.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/AbstractExceptionMapper.java b/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/AbstractExceptionMapper.java
index 1dbffbd..a359618 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/AbstractExceptionMapper.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/AbstractExceptionMapper.java
@@ -76,7 +76,7 @@ public abstract class AbstractExceptionMapper<E extends java.lang.Throwable> imp
                 logger.debug(e.getClass().getCanonicalName() + " Server Error (" + status + ")", e);
             }
             switch (status){
-                case 200 : logger.info("Uncaught Exception", e); break;
+                case 200 : logger.debug("Uncaught Exception", e); break;
                 default: logger.error("Uncaught Exception", e);
             }
         }


[13/36] usergrid git commit: Merge branch '2.1-release' of https://git-wip-us.apache.org/repos/asf/usergrid into remove-buffer

Posted by sf...@apache.org.
Merge branch '2.1-release' of https://git-wip-us.apache.org/repos/asf/usergrid into remove-buffer


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/bdea1ff6
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/bdea1ff6
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/bdea1ff6

Branch: refs/heads/master
Commit: bdea1ff65c6dfdce3408afeae9f43be9cbd31149
Parents: c7a6ebf 5233567
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Sep 25 10:54:56 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Sep 25 10:54:56 2015 -0600

----------------------------------------------------------------------
 .../usergrid/management/AppInfoMigrationPlugin.java      | 11 +++++++++++
 1 file changed, 11 insertions(+)
----------------------------------------------------------------------



[02/36] usergrid git commit: Merge branch '2.1-release' of https://git-wip-us.apache.org/repos/asf/usergrid into 2.1-release

Posted by sf...@apache.org.
Merge branch '2.1-release' of https://git-wip-us.apache.org/repos/asf/usergrid into 2.1-release


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/cdfc575d
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/cdfc575d
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/cdfc575d

Branch: refs/heads/master
Commit: cdfc575dba453121fae080fd265ecf5c666a5ec4
Parents: 32fcbb5 8281430
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Sep 23 17:16:06 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Sep 23 17:16:06 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      |  35 ++-
 .../service/ConnectionServiceImpl.java          |   5 +-
 .../service/ConnectionServiceImplTest.java      |   4 +-
 .../users/ConnectionResourceTest.java           | 302 ++++++++++++++-----
 .../services/AbstractConnectionsService.java    |  43 ++-
 5 files changed, 298 insertions(+), 91 deletions(-)
----------------------------------------------------------------------



[30/36] usergrid git commit: return consistent observable

Posted by sf...@apache.org.
return consistent observable


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/c19adb80
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/c19adb80
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/c19adb80

Branch: refs/heads/master
Commit: c19adb809712eafd3ca4a5b28c109cd050f678e2
Parents: 456f80d
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Sep 28 17:06:46 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Sep 28 17:06:46 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/asyncevents/AmazonAsyncEventService.java       | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/c19adb80/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 50b210e..d1670ed 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -239,7 +239,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
             if (event == null) {
                 logger.error("AsyncEvent type or event is null!");
-                return Observable.empty();
+                return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), false));
             }
             try {
                 //merge each operation to a master observable;


[09/36] usergrid git commit: change consumer to producer

Posted by sf...@apache.org.
change consumer to producer


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/5218cda9
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/5218cda9
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/5218cda9

Branch: refs/heads/master
Commit: 5218cda9d5732ae4ee2ee2ce1e7ae297975a7fad
Parents: a5ece51
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Sep 15 16:17:13 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Sep 24 15:01:20 2015 -0600

----------------------------------------------------------------------
 .../persistence/index/guice/IndexModule.java    |   2 +-
 .../index/impl/EsEntityIndexBatchImpl.java      |   4 +-
 .../index/impl/EsEntityIndexFactoryImpl.java    |   8 +-
 .../index/impl/EsEntityIndexImpl.java           |   6 +-
 .../index/impl/EsIndexBufferConsumerImpl.java   | 216 -------------------
 .../index/impl/EsIndexProducerImpl.java         | 209 ++++++++++++++++++
 .../index/impl/IndexBufferConsumer.java         |  40 ----
 .../persistence/index/impl/IndexProducer.java   |  37 ++++
 .../index/impl/IndexRefreshCommandImpl.java     |   5 +-
 9 files changed, 257 insertions(+), 270 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
index 7279174..46559ad 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
@@ -52,7 +52,7 @@ public abstract class IndexModule extends AbstractModule {
         bind(IndexCache.class).to(EsIndexCacheImpl.class);
         bind(IndexRefreshCommand.class).to(IndexRefreshCommandImpl.class);
 
-        bind(IndexBufferConsumer.class).to(EsIndexBufferConsumerImpl.class).asEagerSingleton();
+        bind(IndexProducer.class).to(EsIndexProducerImpl.class).asEagerSingleton();
 
 
         //wire up the edg migration. A no-op ATM, but retained for future development

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index c11feed..64a1c6a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -41,7 +41,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     private final IndexAlias alias;
 
     private final IndexLocationStrategy indexLocationStrategy;
-    private final IndexBufferConsumer indexBatchBufferProducer;
+    private final IndexProducer indexBatchBufferProducer;
 
     private final EntityIndex entityIndex;
     private final ApplicationScope applicationScope;
@@ -49,7 +49,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
 
     public EsEntityIndexBatchImpl( final IndexLocationStrategy locationStrategy,
-                                   final IndexBufferConsumer indexBatchBufferProducer,
+                                   final IndexProducer indexBatchBufferProducer,
                                    final EntityIndex entityIndex
     ) {
         this.indexLocationStrategy = locationStrategy;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
index 869d079..b66fd40 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
@@ -37,7 +37,7 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
     private final IndexFig config;
     private final IndexCache indexCache;
     private final EsProvider provider;
-    private final IndexBufferConsumer indexBufferConsumer;
+    private final IndexProducer indexProducer;
     private final MetricsFactory metricsFactory;
     private final IndexRefreshCommand refreshCommand;
 
@@ -50,7 +50,7 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
                     config,
                     refreshCommand,
                     metricsFactory,
-                    indexBufferConsumer,
+                    indexProducer,
                     locationStrategy
                 );
                 index.initialize();
@@ -62,7 +62,7 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
     public EsEntityIndexFactoryImpl( final IndexFig indexFig,
                                      final IndexCache indexCache,
                                      final EsProvider provider,
-                                     final IndexBufferConsumer indexBufferConsumer,
+                                     final IndexProducer indexProducer,
                                      final MetricsFactory metricsFactory,
                                      final IndexRefreshCommand refreshCommand
 
@@ -70,7 +70,7 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
         this.config = indexFig;
         this.indexCache = indexCache;
         this.provider = provider;
-        this.indexBufferConsumer = indexBufferConsumer;
+        this.indexProducer = indexProducer;
         this.metricsFactory = metricsFactory;
         this.refreshCommand = refreshCommand;
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 1c63a7b..6317a69 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -63,8 +63,6 @@ import org.elasticsearch.index.query.*;
 import org.elasticsearch.indices.IndexAlreadyExistsException;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
-import org.elasticsearch.search.aggregations.Aggregation;
-import org.elasticsearch.search.aggregations.AggregationBuilder;
 import org.elasticsearch.search.aggregations.metrics.sum.Sum;
 import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder;
 import org.slf4j.Logger;
@@ -118,7 +116,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
     private final SearchRequestBuilderStrategyV2 searchRequestBuilderStrategyV2;
     private final int cursorTimeout;
     private final long queryTimeout;
-    private final IndexBufferConsumer indexBatchBufferProducer;
+    private final IndexProducer indexBatchBufferProducer;
     private final FailureMonitorImpl failureMonitor;
     private final Timer aggregationTimer;
 
@@ -133,7 +131,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
                               final IndexFig indexFig,
                               final IndexRefreshCommand indexRefreshCommand,
                               final MetricsFactory metricsFactory,
-                              final IndexBufferConsumer indexBatchBufferProducer,
+                              final IndexProducer indexBatchBufferProducer,
                               final IndexLocationStrategy indexLocationStrategy
     ) {
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
deleted file mode 100644
index d126b5d..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.index.impl;
-
-
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.codahale.metrics.Histogram;
-import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.elasticsearch.action.WriteConsistencyLevel;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.client.Client;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.index.IndexFig;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Timer;
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-import rx.Observable;
-import rx.Subscriber;
-import rx.schedulers.Schedulers;
-
-
-/**
- * Consumer for IndexOperationMessages
- */
-@Singleton
-public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
-    private static final Logger log = LoggerFactory.getLogger( EsIndexBufferConsumerImpl.class );
-
-    private final IndexFig config;
-    private final FailureMonitorImpl failureMonitor;
-    private final Client client;
-    private final Timer flushTimer;
-    private final IndexFig indexFig;
-    private final Counter indexSizeCounter;
-    private final Histogram roundtripTimer;
-    private final Timer indexTimer;
-
-
-    private AtomicLong inFlight = new AtomicLong();
-
-
-    @Inject
-    public EsIndexBufferConsumerImpl( final IndexFig config, final EsProvider provider,
-                                      final MetricsFactory metricsFactory, final IndexFig indexFig ) {
-        this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index_buffer.flush");
-        this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "index_buffer.size");
-        this.roundtripTimer = metricsFactory.getHistogram(EsIndexBufferConsumerImpl.class, "index_buffer.message_cycle");
-
-        //wire up the gauge of inflight messages
-        metricsFactory.addGauge(EsIndexBufferConsumerImpl.class, "index_buffer.inflight", () -> inFlight.longValue());
-
-
-        this.indexTimer = metricsFactory.getTimer( EsIndexBufferConsumerImpl.class, "index" );
-
-        this.config = config;
-        this.failureMonitor = new FailureMonitorImpl(config, provider);
-        this.client = provider.getClient();
-        this.indexFig = indexFig;
-
-
-        //batch up sets of some size and send them in batch
-
-    }
-
-    public Observable<IndexOperationMessage>  put( IndexOperationMessage message ) {
-        Preconditions.checkNotNull(message, "Message cannot be null");
-        indexSizeCounter.inc(message.getDeIndexRequests().size());
-        indexSizeCounter.inc(message.getIndexRequests().size());
-        return  processBatch(message);
-    }
-
-
-    /**
-     * Process the buffer of batches
-     * @param batch
-     * @return
-     */
-    private Observable<IndexOperationMessage> processBatch( final IndexOperationMessage batch ) {
-
-        //take our stream of batches, then stream then into individual ops for consumption on ES
-        final Set<IndexOperation> indexOperationSet = batch.getIndexRequests();
-        final Set<DeIndexOperation> deIndexOperationSet = batch.getDeIndexRequests();
-
-        final int indexOperationSetSize = indexOperationSet.size();
-        final int deIndexOperationSetSize = deIndexOperationSet.size();
-
-        log.debug("Emitting {} add and {} remove operations", indexOperationSetSize, deIndexOperationSetSize);
-
-        indexSizeCounter.dec(indexOperationSetSize);
-        indexSizeCounter.dec(deIndexOperationSetSize);
-
-        final Observable<IndexOperation> index = Observable.from(batch.getIndexRequests());
-        final Observable<DeIndexOperation> deIndex = Observable.from(batch.getDeIndexRequests());
-
-        final Observable<BatchOperation> batchOps = Observable.merge(index, deIndex);
-
-        //buffer into the max size we can send ES and fire them all off until we're completed
-        final Observable<BulkRequestBuilder> requests = batchOps.buffer(indexFig.getIndexBatchSize())
-            //flatten the buffer into a single batch execution
-            .flatMap(individualOps -> Observable.from(individualOps)
-                //collect them
-                .collect(() -> initRequest(), (bulkRequestBuilder, batchOperation) -> {
-                    log.debug("adding operation {} to bulkRequestBuilder {}", batchOperation, bulkRequestBuilder);
-                    batchOperation.doOperation(client, bulkRequestBuilder);
-                }))
-                //write them
-            .doOnNext(bulkRequestBuilder -> sendRequest(bulkRequestBuilder));
-
-
-        //now that we've processed them all, ack the futures after our last batch comes through
-        final Observable<IndexOperationMessage> processedIndexOperations =
-            requests.lastOrDefault(null).flatMap(lastRequest -> {
-                if (lastRequest != null) {
-                    return Observable.just(batch);
-                } else {
-                    return Observable.empty();
-                }
-            });
-
-        //subscribe to the operations that generate requests on a new thread so that we can execute them quickly
-        //mark this as done
-        return processedIndexOperations.doOnNext(processedIndexOp -> {
-            roundtripTimer.update(System.currentTimeMillis() - processedIndexOp.getCreationTime());
-        });
-    }
-
-
-    /*
-
-    /**
-     * initialize request
-     */
-    private BulkRequestBuilder initRequest() {
-        BulkRequestBuilder bulkRequest = client.prepareBulk();
-        bulkRequest.setConsistencyLevel( WriteConsistencyLevel.fromString( config.getWriteConsistencyLevel() ) );
-        bulkRequest.setRefresh( config.isForcedRefresh() );
-        return bulkRequest;
-    }
-
-
-    /**
-     * send bulk request
-     */
-    private void sendRequest( BulkRequestBuilder bulkRequest ) {
-        //nothing to do, we haven't added anything to the index
-        if ( bulkRequest.numberOfActions() == 0 ) {
-            return;
-        }
-
-        final BulkResponse responses;
-
-
-        final Timer.Context timer = indexTimer.time();
-
-        try {
-            responses = bulkRequest.execute().actionGet( );
-        } catch ( Throwable t ) {
-            log.error( "Unable to communicate with elasticsearch" );
-            failureMonitor.fail( "Unable to execute batch", t );
-            throw t;
-        }finally{
-            timer.stop();
-        }
-
-        failureMonitor.success();
-
-        boolean error = false;
-
-        for ( BulkItemResponse response : responses ) {
-
-            if ( response.isFailed() ) {
-                // log error and continue processing
-                log.error( "Unable to index id={}, type={}, index={}, failureMessage={} ", response.getId(),
-                    response.getType(), response.getIndex(), response.getFailureMessage() );
-
-                error = true;
-            }
-        }
-
-        if ( error ) {
-            throw new RuntimeException(
-                "Error during processing of bulk index operations one of the responses failed.  Check previous log "
-                    + "entries" );
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
new file mode 100644
index 0000000..a2c8663
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.codahale.metrics.Histogram;
+import org.elasticsearch.action.WriteConsistencyLevel;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.index.IndexFig;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+
+
+/**
+ * Consumer for IndexOperationMessages
+ */
+@Singleton
+public class EsIndexProducerImpl implements IndexProducer {
+    private static final Logger log = LoggerFactory.getLogger( EsIndexProducerImpl.class );
+
+    private final IndexFig config;
+    private final FailureMonitorImpl failureMonitor;
+    private final Client client;
+    private final Timer flushTimer;
+    private final IndexFig indexFig;
+    private final Counter indexSizeCounter;
+    private final Histogram roundtripTimer;
+    private final Timer indexTimer;
+
+
+    private AtomicLong inFlight = new AtomicLong();
+
+
+    @Inject
+    public EsIndexProducerImpl(final IndexFig config, final EsProvider provider,
+                               final MetricsFactory metricsFactory, final IndexFig indexFig) {
+        this.flushTimer = metricsFactory.getTimer(EsIndexProducerImpl.class, "index_buffer.flush");
+        this.indexSizeCounter = metricsFactory.getCounter(EsIndexProducerImpl.class, "index_buffer.size");
+        this.roundtripTimer = metricsFactory.getHistogram(EsIndexProducerImpl.class, "index_buffer.message_cycle");
+
+        //wire up the gauge of inflight messages
+        metricsFactory.addGauge(EsIndexProducerImpl.class, "index_buffer.inflight", () -> inFlight.longValue());
+
+
+        this.indexTimer = metricsFactory.getTimer( EsIndexProducerImpl.class, "index" );
+
+        this.config = config;
+        this.failureMonitor = new FailureMonitorImpl(config, provider);
+        this.client = provider.getClient();
+        this.indexFig = indexFig;
+
+
+        //batch up sets of some size and send them in batch
+
+    }
+
+    public Observable<IndexOperationMessage>  put( IndexOperationMessage message ) {
+        Preconditions.checkNotNull(message, "Message cannot be null");
+        indexSizeCounter.inc(message.getDeIndexRequests().size());
+        indexSizeCounter.inc(message.getIndexRequests().size());
+        return  processBatch(message);
+    }
+
+
+    /**
+     * Process the buffer of batches
+     * @param batch
+     * @return
+     */
+    private Observable<IndexOperationMessage> processBatch( final IndexOperationMessage batch ) {
+
+        //take our stream of batches, then stream then into individual ops for consumption on ES
+        final Set<IndexOperation> indexOperationSet = batch.getIndexRequests();
+        final Set<DeIndexOperation> deIndexOperationSet = batch.getDeIndexRequests();
+
+        final int indexOperationSetSize = indexOperationSet.size();
+        final int deIndexOperationSetSize = deIndexOperationSet.size();
+
+        log.debug("Emitting {} add and {} remove operations", indexOperationSetSize, deIndexOperationSetSize);
+
+        indexSizeCounter.dec(indexOperationSetSize);
+        indexSizeCounter.dec(deIndexOperationSetSize);
+
+        final Observable<IndexOperation> index = Observable.from(batch.getIndexRequests());
+        final Observable<DeIndexOperation> deIndex = Observable.from(batch.getDeIndexRequests());
+
+        final Observable<BatchOperation> batchOps = Observable.merge(index, deIndex);
+
+        //buffer into the max size we can send ES and fire them all off until we're completed
+        final Observable<BulkRequestBuilder> requests = batchOps.buffer(indexFig.getIndexBatchSize())
+            //flatten the buffer into a single batch execution
+            .flatMap(individualOps -> Observable.from(individualOps)
+                //collect them
+                .collect(() -> initRequest(), (bulkRequestBuilder, batchOperation) -> {
+                    log.debug("adding operation {} to bulkRequestBuilder {}", batchOperation, bulkRequestBuilder);
+                    batchOperation.doOperation(client, bulkRequestBuilder);
+                }))
+                //write them
+            .doOnNext(bulkRequestBuilder -> sendRequest(bulkRequestBuilder));
+
+
+        //now that we've processed them all, ack the futures after our last batch comes through
+        final Observable<IndexOperationMessage> processedIndexOperations =
+            requests.lastOrDefault(null).flatMap(lastRequest -> {
+                if (lastRequest != null) {
+                    return Observable.just(batch);
+                } else {
+                    return Observable.empty();
+                }
+            });
+
+        //subscribe to the operations that generate requests on a new thread so that we can execute them quickly
+        //mark this as done
+        return processedIndexOperations.doOnNext(processedIndexOp -> {
+            roundtripTimer.update(System.currentTimeMillis() - processedIndexOp.getCreationTime());
+        });
+    }
+
+
+    /*
+
+    /**
+     * initialize request
+     */
+    private BulkRequestBuilder initRequest() {
+        BulkRequestBuilder bulkRequest = client.prepareBulk();
+        bulkRequest.setConsistencyLevel( WriteConsistencyLevel.fromString( config.getWriteConsistencyLevel() ) );
+        bulkRequest.setRefresh( config.isForcedRefresh() );
+        return bulkRequest;
+    }
+
+
+    /**
+     * send bulk request
+     */
+    private void sendRequest( BulkRequestBuilder bulkRequest ) {
+        //nothing to do, we haven't added anything to the index
+        if ( bulkRequest.numberOfActions() == 0 ) {
+            return;
+        }
+
+        final BulkResponse responses;
+
+
+        final Timer.Context timer = indexTimer.time();
+
+        try {
+            responses = bulkRequest.execute().actionGet( );
+        } catch ( Throwable t ) {
+            log.error( "Unable to communicate with elasticsearch" );
+            failureMonitor.fail( "Unable to execute batch", t );
+            throw t;
+        }finally{
+            timer.stop();
+        }
+
+        failureMonitor.success();
+
+        boolean error = false;
+
+        for ( BulkItemResponse response : responses ) {
+
+            if ( response.isFailed() ) {
+                // log error and continue processing
+                log.error( "Unable to index id={}, type={}, index={}, failureMessage={} ", response.getId(),
+                    response.getType(), response.getIndex(), response.getFailureMessage() );
+
+                error = true;
+            }
+        }
+
+        if ( error ) {
+            throw new RuntimeException(
+                "Error during processing of bulk index operations one of the responses failed.  Check previous log "
+                    + "entries" );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
deleted file mode 100644
index cfeb505..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.index.impl;
-
-
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import rx.Observable;
-
-import java.util.List;
-
-
-/**
- *  Buffer index requests
- */
-public interface IndexBufferConsumer {
-
-
-    /**
-     * Put this operation into our collapsing bufer
-     * @param message
-     * @return
-     */
-    Observable<IndexOperationMessage>  put(IndexOperationMessage message);
-
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexProducer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexProducer.java
new file mode 100644
index 0000000..ba7027e
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexProducer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.index.impl;
+
+
+import rx.Observable;
+
+
+/**
+ *  Buffer index requests
+ */
+public interface IndexProducer {
+
+
+    /**
+     * Put this operation into our collapsing bufer
+     * @param message
+     * @return
+     */
+    Observable<IndexOperationMessage>  put(IndexOperationMessage message);
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
index 7b9bc5d..01942a8 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
@@ -20,7 +20,6 @@
 package org.apache.usergrid.persistence.index.impl;
 
 
-import java.util.Collections;
 import java.util.Map;
 import java.util.UUID;
 
@@ -58,7 +57,7 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand {
 
     private final IndexCache indexCache;
     private final EsProvider esProvider;
-    private final IndexBufferConsumer producer;
+    private final IndexProducer producer;
     private final IndexFig indexFig;
     private final Timer timer;
 
@@ -66,7 +65,7 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand {
     @Inject
     public IndexRefreshCommandImpl(
                                     final EsProvider esProvider,
-                                    final IndexBufferConsumer producer,
+                                    final IndexProducer producer,
                                     final IndexFig indexFig,
                                     final MetricsFactory metricsFactory,
                                     final IndexCache indexCache ) {


[17/36] usergrid git commit: remove rx

Posted by sf...@apache.org.
remove rx


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/28919aeb
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/28919aeb
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/28919aeb

Branch: refs/heads/master
Commit: 28919aebea35c027a5b720ebeebb2200c1d214c3
Parents: bdea1ff
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Sep 25 13:47:29 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Sep 25 13:47:29 2015 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/index/impl/EsIndexProducerImpl.java       | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/28919aeb/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
index a2c8663..2b36fc8 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
@@ -132,7 +132,7 @@ public class EsIndexProducerImpl implements IndexProducer {
 
         //now that we've processed them all, ack the futures after our last batch comes through
         final Observable<IndexOperationMessage> processedIndexOperations =
-            requests.lastOrDefault(null).flatMap(lastRequest -> {
+            requests.flatMap(lastRequest -> {
                 if (lastRequest != null) {
                     return Observable.just(batch);
                 } else {


[03/36] usergrid git commit: Merge branch '2.1-release' of https://git-wip-us.apache.org/repos/asf/usergrid into 2.1-release

Posted by sf...@apache.org.
Merge branch '2.1-release' of https://git-wip-us.apache.org/repos/asf/usergrid into 2.1-release


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/ce5a96ff
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/ce5a96ff
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/ce5a96ff

Branch: refs/heads/master
Commit: ce5a96ffe8f919358f6b39b2d4ffe2dcb564a27a
Parents: cdfc575 0b243c4
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Sep 24 14:58:30 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Sep 24 14:58:30 2015 -0600

----------------------------------------------------------------------
 stack/scripts/migrate_entity_data.py            | 66 ++++++++++++++-----
 .../management/AppInfoMigrationPlugin.java      | 69 +++++++++-----------
 2 files changed, 79 insertions(+), 56 deletions(-)
----------------------------------------------------------------------



[34/36] usergrid git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/usergrid

Posted by sf...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/usergrid


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/3b838c82
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/3b838c82
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/3b838c82

Branch: refs/heads/master
Commit: 3b838c8265abde705ea3b70734aa7de24a8a42d0
Parents: 407ebe0 60acf84
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Oct 1 09:41:09 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Oct 1 09:41:09 2015 -0600

----------------------------------------------------------------------
 README-Docs.md                                  |    13 +
 README.md                                       |    10 +-
 content/bootstrap/bootstrap.min.css             |     9 +
 content/community/index.html                    |    22 +-
 content/css/bootflat-extensions.css             |   356 +
 content/css/bootflat-square.css                 |    69 +
 content/css/bootflat.css                        |  1560 +
 content/css/font-awesome.min.css                |   405 +
 content/css/usergrid-site.css                   |  1553 +
 .../docs/_sources/jersey2skeleton/README.txt    |     9 +
 .../creating-and-managing-notifications.txt     |     2 +
 .../docs/_sources/rest-endpoints/api-docs.txt   |   653 +-
 content/docs/jersey2skeleton/README.html        |   303 +
 .../creating-and-managing-notifications.html    |     1 +
 content/docs/rest-endpoints/api-docs.html       |   533 +-
 content/docs/searchindex.js                     |     2 +-
 content/favicon.ico                             |   Bin 0 -> 3989 bytes
 content/font/FontAwesome.otf                    |   Bin 0 -> 61896 bytes
 content/font/fontawesome-webfont-eot.eot        |   Bin 0 -> 37405 bytes
 content/font/fontawesome-webfont-svg.svg        |   399 +
 content/font/fontawesome-webfont-ttf.ttf        |   Bin 0 -> 79076 bytes
 content/font/fontawesome-webfont-woff.woff      |   Bin 0 -> 43572 bytes
 content/img/alberto.jpg                         |   Bin 0 -> 16137 bytes
 content/img/alex.png                            |   Bin 0 -> 40842 bytes
 content/img/apache_usergrid_favicon.png         |   Bin 0 -> 10735 bytes
 content/img/apache_usergrid_logo_white.png      |   Bin 0 -> 26418 bytes
 .../img/apache_usergrid_logo_white_small.png    |   Bin 0 -> 11905 bytes
 content/img/check_flat/default.png              |   Bin 0 -> 25851 bytes
 content/img/dave.jpg                            |   Bin 0 -> 14005 bytes
 content/img/ed.jpg                              |   Bin 0 -> 20460 bytes
 content/img/egg-logo.png                        |   Bin 0 -> 9938 bytes
 content/img/github.png                          |   Bin 0 -> 8936 bytes
 content/img/grey.png                            |   Bin 0 -> 37896 bytes
 content/img/intellij.png                        |   Bin 0 -> 9199 bytes
 content/img/jeff.jpg                            |   Bin 0 -> 13857 bytes
 content/img/michael_r.jpg                       |   Bin 0 -> 10244 bytes
 content/img/mike_d.JPG                          |   Bin 0 -> 36443 bytes
 content/img/nate.jpg                            |   Bin 0 -> 4291 bytes
 content/img/rod.jpg                             |   Bin 0 -> 40313 bytes
 content/img/scott.jpg                           |   Bin 0 -> 8555 bytes
 content/img/shawn.jpg                           |   Bin 0 -> 69304 bytes
 content/img/stliu.jpg                           |   Bin 0 -> 51303 bytes
 content/img/strong.jpg                          |   Bin 0 -> 7434 bytes
 content/img/structure101.png                    |   Bin 0 -> 6475 bytes
 content/img/sungju.jpg                          |   Bin 0 -> 11440 bytes
 content/img/tim.jpg                             |   Bin 0 -> 7611 bytes
 content/img/todd.jpg                            |   Bin 0 -> 18142 bytes
 content/img/usergrid-logo.pdf                   |   398 +
 content/img/usergrid.png                        |   Bin 0 -> 21994 bytes
 content/img/usergrid_160.png                    |   Bin 0 -> 2126 bytes
 content/img/usergrid_200.png                    |   Bin 0 -> 6397 bytes
 content/img/usergrid_300.png                    |   Bin 0 -> 16330 bytes
 content/img/usergrid_300_transparent.png        |   Bin 0 -> 16308 bytes
 content/img/usergrid_400.png                    |   Bin 0 -> 8746 bytes
 content/img/usergrid_800.png                    |   Bin 0 -> 14452 bytes
 content/img/usergrid_card.png                   |   Bin 0 -> 23295 bytes
 content/img/usergrid_logo.png                   |   Bin 0 -> 118086 bytes
 content/img/usergrid_logo_205_50.png            |   Bin 0 -> 7058 bytes
 content/img/usergrid_logo_260_50.png            |   Bin 0 -> 8682 bytes
 content/img/usergrid_logo_720.png               |   Bin 0 -> 27610 bytes
 content/img/usergrid_logo_720p.png              |   Bin 0 -> 27608 bytes
 content/img/usergrid_logo_900_200.png           |   Bin 0 -> 12273 bytes
 content/img/usergrid_logo_white.png             |   Bin 0 -> 16900 bytes
 content/img/usergrid_profile_128.png            |   Bin 0 -> 6689 bytes
 content/img/usergrid_profile_256.png            |   Bin 0 -> 10470 bytes
 content/img/usergrid_profile_256_white.png      |   Bin 0 -> 10724 bytes
 content/img/usergrid_profile_512_margins.png    |   Bin 0 -> 19112 bytes
 content/img/usergrid_profile_64_white.png       |   Bin 0 -> 4839 bytes
 content/img/usergrid_profile_background.png     |   Bin 0 -> 6710 bytes
 content/img/usergrid_screencast_bg.png          |   Bin 0 -> 20478 bytes
 content/img/usergrid_small_100.png              |   Bin 0 -> 6681 bytes
 content/img/usergrid_wiki.png                   |   Bin 0 -> 7135 bytes
 content/img/yourkit.jpeg                        |   Bin 0 -> 7763 bytes
 content/js/bootstrap.min.js                     |     8 +
 content/js/head.js                              |   708 +
 content/js/html5shiv.js                         |     8 +
 content/js/jquery-1.10.1.min.js                 |     6 +
 content/js/jquery.icheck.js                     |   397 +
 content/js/respond.min.js                       |     6 +
 content/js/usergrid-site.js                     |    50 +
 content/static/github-btn.html                  |     2 +
 content/v101-portal-demo/config.js              |   129 +
 content/v101-portal-demo/css/entypo/entypo.eot  |   Bin 0 -> 35540 bytes
 content/v101-portal-demo/css/entypo/entypo.svg  |    13 +
 content/v101-portal-demo/css/entypo/entypo.ttf  |   Bin 0 -> 35392 bytes
 content/v101-portal-demo/css/entypo/entypo.woff |   Bin 0 -> 21916 bytes
 content/v101-portal-demo/css/main.css           |  1990 +
 content/v101-portal-demo/css/main.min.css       |     1 +
 content/v101-portal-demo/favicon.ico            |   Bin 0 -> 3989 bytes
 content/v101-portal-demo/helpJson.json          |    47 +
 .../img/appswitcher/apiPlatform_lg.png          |   Bin 0 -> 2397 bytes
 .../img/appswitcher/appServices_lg.png          |   Bin 0 -> 2295 bytes
 .../img/appswitcher/console_lg.png              |   Bin 0 -> 1453 bytes
 .../img/appswitcher/home_lg.png                 |   Bin 0 -> 1522 bytes
 .../img/appswitcher/logo_color.png              |   Bin 0 -> 3459 bytes
 .../v101-portal-demo/img/appswitcher/max_lg.png |   Bin 0 -> 1970 bytes
 .../img/appswitcher/triangleMenuItem_right.png  |   Bin 0 -> 1158 bytes
 .../triangleMenuItem_right_hover.png            |   Bin 0 -> 1169 bytes
 content/v101-portal-demo/img/blue-bars.png      |   Bin 0 -> 3635 bytes
 content/v101-portal-demo/img/blue-bolt.png      |   Bin 0 -> 3942 bytes
 content/v101-portal-demo/img/blue-carat.png     |   Bin 0 -> 1006 bytes
 content/v101-portal-demo/img/green_dot.png      |   Bin 0 -> 3472 bytes
 .../img/introjs_arrow_step_next.png             |   Bin 0 -> 219 bytes
 .../img/introjs_arrow_step_next_disabled.png    |   Bin 0 -> 220 bytes
 .../img/introjs_arrow_step_prev.png             |   Bin 0 -> 217 bytes
 .../img/introjs_arrow_step_prev_disabled.png    |   Bin 0 -> 218 bytes
 content/v101-portal-demo/img/introjs_close.png  |   Bin 0 -> 274 bytes
 content/v101-portal-demo/img/logo.png           |   Bin 0 -> 7758 bytes
 content/v101-portal-demo/img/nav-device.gif     |   Bin 0 -> 2184 bytes
 content/v101-portal-demo/img/nav-sprites.png    |   Bin 0 -> 7953 bytes
 content/v101-portal-demo/img/no-data1.png       |   Bin 0 -> 45300 bytes
 content/v101-portal-demo/img/phone-small.gif    |   Bin 0 -> 1300 bytes
 .../img/push/APNS_cert_upload.png               |   Bin 0 -> 33956 bytes
 .../img/push/APNS_certification.png             |   Bin 0 -> 16855 bytes
 .../img/push/android-notification.png           |   Bin 0 -> 41629 bytes
 .../img/push/google_api_key.png                 |   Bin 0 -> 98118 bytes
 .../img/push/iphone_message.png                 |   Bin 0 -> 90307 bytes
 content/v101-portal-demo/img/push/step_1.png    |   Bin 0 -> 1953 bytes
 content/v101-portal-demo/img/push/step_2.png    |   Bin 0 -> 2117 bytes
 content/v101-portal-demo/img/push/step_3.png    |   Bin 0 -> 2162 bytes
 content/v101-portal-demo/img/red_dot.png        |   Bin 0 -> 3482 bytes
 .../v101-portal-demo/img/sdk-sprites-large.png  |   Bin 0 -> 15115 bytes
 content/v101-portal-demo/img/sdk-sprites.png    |   Bin 0 -> 4401 bytes
 content/v101-portal-demo/img/tablet-small.gif   |   Bin 0 -> 1390 bytes
 content/v101-portal-demo/img/user-photo.png     |   Bin 0 -> 3849 bytes
 content/v101-portal-demo/img/user_profile.png   |   Bin 0 -> 3775 bytes
 content/v101-portal-demo/img/verify.png         |   Bin 0 -> 22934 bytes
 content/v101-portal-demo/img/yellow_dot.png     |   Bin 0 -> 3475 bytes
 content/v101-portal-demo/index-debug.html       |   151 +
 content/v101-portal-demo/index-template.html    |   156 +
 content/v101-portal-demo/index.html             |   151 +
 .../js/generated/usergrid-dev.js                |  4886 +
 .../js/generated/usergrid-libs.min.js           |    38 +
 .../js/generated/usergrid.min.js                |    25 +
 content/v101-portal-demo/js/libs/MD5.min.js     |     1 +
 .../js/libs/angular-1.2.5/angular-animate.js    |  1323 +
 .../libs/angular-1.2.5/angular-animate.min.js   |    23 +
 .../angular-1.2.5/angular-animate.min.js.map    |     8 +
 .../js/libs/angular-1.2.5/angular-cookies.js    |   202 +
 .../libs/angular-1.2.5/angular-cookies.min.js   |     8 +
 .../angular-1.2.5/angular-cookies.min.js.map    |     8 +
 .../js/libs/angular-1.2.5/angular-csp.css       |    24 +
 .../js/libs/angular-1.2.5/angular-loader.js     |   410 +
 .../js/libs/angular-1.2.5/angular-loader.min.js |     9 +
 .../angular-1.2.5/angular-loader.min.js.map     |     8 +
 .../js/libs/angular-1.2.5/angular-mocks.js      |  2116 +
 .../js/libs/angular-1.2.5/angular-resource.js   |   565 +
 .../libs/angular-1.2.5/angular-resource.min.js  |    13 +
 .../angular-1.2.5/angular-resource.min.js.map   |     8 +
 .../js/libs/angular-1.2.5/angular-route.js      |   911 +
 .../js/libs/angular-1.2.5/angular-route.min.js  |    14 +
 .../libs/angular-1.2.5/angular-route.min.js.map |     8 +
 .../js/libs/angular-1.2.5/angular-sanitize.js   |   622 +
 .../libs/angular-1.2.5/angular-sanitize.min.js  |    14 +
 .../angular-1.2.5/angular-sanitize.min.js.map   |     8 +
 .../js/libs/angular-1.2.5/angular-scenario.js   | 32374 ++++++
 .../js/libs/angular-1.2.5/angular-touch.js      |   563 +
 .../js/libs/angular-1.2.5/angular-touch.min.js  |    13 +
 .../libs/angular-1.2.5/angular-touch.min.js.map |     8 +
 .../js/libs/angular-1.2.5/angular.js            | 20369 ++++
 .../js/libs/angular-1.2.5/angular.min.js        |   201 +
 .../js/libs/angular-1.2.5/angular.min.js.map    |     8 +
 .../js/libs/angular-1.2.5/errors.json           |     1 +
 .../js/libs/angular-1.2.5/version.json          |     1 +
 .../js/libs/angular-1.2.5/version.txt           |     1 +
 .../libs/bootstrap/css/bootstrap-responsive.css |  1345 +
 .../bootstrap/css/bootstrap-responsive.min.css  |  1245 +
 .../js/libs/bootstrap/css/bootstrap.css         |  6169 ++
 .../js/libs/bootstrap/css/bootstrap.min.css     |  5469 ++
 .../js/libs/bootstrap/custom/css/bootstrap.css  |  6316 ++
 .../libs/bootstrap/custom/css/bootstrap.min.css |     9 +
 .../custom/img/glyphicons-halflings-white.png   |   Bin 0 -> 8777 bytes
 .../custom/img/glyphicons-halflings.png         |   Bin 0 -> 12799 bytes
 .../js/libs/bootstrap/custom/js/bootstrap.js    |  2291 +
 .../libs/bootstrap/custom/js/bootstrap.min.js   |     7 +
 .../img/glyphicons-halflings-white.png          |   Bin 0 -> 8777 bytes
 .../libs/bootstrap/img/glyphicons-halflings.png |   Bin 0 -> 12799 bytes
 .../js/libs/bootstrap/js/bootstrap.js           |  2117 +
 .../js/libs/bootstrap/js/bootstrap.min.js       |   644 +
 .../v101-portal-demo/js/libs/google-viz-api.js  |    49 +
 .../js/libs/jquery/jquery-1.9.1.min.js          |     5 +
 .../js/libs/jquery/jquery-migrate-1.1.1.min.js  |     3 +
 .../js/libs/jquery/jquery.sparkline.min.js      |     5 +
 .../js/libs/jqueryui/date.min.js                |     2 +
 .../ui-bg_diagonals-thick_90_eeeeee_40x40.png   |   Bin 0 -> 251 bytes
 .../images/ui-bg_flat_100_deedf7_40x100.png     |   Bin 0 -> 182 bytes
 .../images/ui-bg_flat_100_e4f1fb_40x100.png     |   Bin 0 -> 213 bytes
 .../images/ui-bg_flat_100_f2f5f7_40x100.png     |   Bin 0 -> 212 bytes
 .../images/ui-bg_flat_15_cd0a0a_40x100.png      |   Bin 0 -> 181 bytes
 .../images/ui-bg_flat_50_3baae3_40x100.png      |   Bin 0 -> 182 bytes
 .../images/ui-bg_flat_80_d7ebf9_40x100.png      |   Bin 0 -> 183 bytes
 .../ui-bg_highlight-hard_70_000000_1x100.png    |   Bin 0 -> 118 bytes
 .../ui-bg_highlight-soft_25_ffef8f_1x100.png    |   Bin 0 -> 153 bytes
 .../jqueryui/images/ui-icons_000000_256x240.png |   Bin 0 -> 4369 bytes
 .../jqueryui/images/ui-icons_2694e8_256x240.png |   Bin 0 -> 4369 bytes
 .../jqueryui/images/ui-icons_2e83ff_256x240.png |   Bin 0 -> 4369 bytes
 .../jqueryui/images/ui-icons_3d80b3_256x240.png |   Bin 0 -> 4369 bytes
 .../jqueryui/images/ui-icons_72a7cf_256x240.png |   Bin 0 -> 4369 bytes
 .../jqueryui/images/ui-icons_ffffff_256x240.png |   Bin 0 -> 4369 bytes
 .../js/libs/jqueryui/jquery-ui-1.8.18.min.js    |    15 +
 .../js/libs/jqueryui/jquery-ui-1.8.9.custom.css |     1 +
 .../js/libs/jqueryui/jquery-ui-timepicker.css   |     1 +
 .../libs/jqueryui/jquery.ui.timepicker.min.js   |     1 +
 .../ui-bootstrap-custom-0.3.0.min.js            |     1 +
 .../ui-bootstrap-custom-tpls-0.3.0.min.js       |     1 +
 .../v101-portal-demo/js/libs/usergrid.sdk.js    |  2568 +
 docs/rest-endpoints/api-docs.html               |   750 +-
 docs/rest-endpoints/api-docs.md                 |   142 +-
 website/README.md                               |     3 +-
 website/Rules                                   |    52 +
 website/build.sh                                |     1 +
 website/content/bootstrap/bootstrap.min.css     |     9 +
 website/content/community/index.html            |    22 +-
 website/content/css/bootflat-extensions.css     |   356 +
 website/content/css/bootflat-square.css         |    69 +
 website/content/css/bootflat.css                |  1560 +
 website/content/css/font-awesome.min.css        |   405 +
 website/content/css/usergrid-site.css           |  1553 +
 website/content/favicon.ico                     |   Bin 0 -> 3989 bytes
 website/content/font/FontAwesome.otf            |   Bin 0 -> 61896 bytes
 .../content/font/fontawesome-webfont-eot.eot    |   Bin 0 -> 37405 bytes
 .../content/font/fontawesome-webfont-svg.svg    |   399 +
 .../content/font/fontawesome-webfont-ttf.ttf    |   Bin 0 -> 79076 bytes
 .../content/font/fontawesome-webfont-woff.woff  |   Bin 0 -> 43572 bytes
 website/content/img/alberto.jpg                 |   Bin 0 -> 16137 bytes
 website/content/img/alex.png                    |   Bin 0 -> 40842 bytes
 website/content/img/apache_usergrid_favicon.png |   Bin 0 -> 10735 bytes
 .../content/img/apache_usergrid_logo_white.png  |   Bin 0 -> 26418 bytes
 .../img/apache_usergrid_logo_white_small.png    |   Bin 0 -> 11905 bytes
 website/content/img/check_flat/default.png      |   Bin 0 -> 25851 bytes
 website/content/img/dave.jpg                    |   Bin 0 -> 14005 bytes
 website/content/img/ed.jpg                      |   Bin 0 -> 20460 bytes
 website/content/img/egg-logo.png                |   Bin 0 -> 9938 bytes
 website/content/img/github.png                  |   Bin 0 -> 8936 bytes
 website/content/img/grey.png                    |   Bin 0 -> 37896 bytes
 website/content/img/intellij.png                |   Bin 0 -> 9199 bytes
 website/content/img/jeff.jpg                    |   Bin 0 -> 13857 bytes
 website/content/img/michael_r.jpg               |   Bin 0 -> 10244 bytes
 website/content/img/mike_d.JPG                  |   Bin 0 -> 36443 bytes
 website/content/img/nate.jpg                    |   Bin 0 -> 4291 bytes
 website/content/img/rod.jpg                     |   Bin 0 -> 40313 bytes
 website/content/img/scott.jpg                   |   Bin 0 -> 8555 bytes
 website/content/img/shawn.jpg                   |   Bin 0 -> 69304 bytes
 website/content/img/stliu.jpg                   |   Bin 0 -> 51303 bytes
 website/content/img/strong.jpg                  |   Bin 0 -> 7434 bytes
 website/content/img/structure101.png            |   Bin 0 -> 6475 bytes
 website/content/img/sungju.jpg                  |   Bin 0 -> 11440 bytes
 website/content/img/tim.jpg                     |   Bin 0 -> 7611 bytes
 website/content/img/todd.jpg                    |   Bin 0 -> 18142 bytes
 website/content/img/usergrid-logo.pdf           |   398 +
 website/content/img/usergrid.png                |   Bin 0 -> 21994 bytes
 website/content/img/usergrid_160.png            |   Bin 0 -> 2126 bytes
 website/content/img/usergrid_200.png            |   Bin 0 -> 6397 bytes
 website/content/img/usergrid_300.png            |   Bin 0 -> 16330 bytes
 .../content/img/usergrid_300_transparent.png    |   Bin 0 -> 16308 bytes
 website/content/img/usergrid_400.png            |   Bin 0 -> 8746 bytes
 website/content/img/usergrid_800.png            |   Bin 0 -> 14452 bytes
 website/content/img/usergrid_card.png           |   Bin 0 -> 23295 bytes
 website/content/img/usergrid_logo.png           |   Bin 0 -> 118086 bytes
 website/content/img/usergrid_logo_205_50.png    |   Bin 0 -> 7058 bytes
 website/content/img/usergrid_logo_260_50.png    |   Bin 0 -> 8682 bytes
 website/content/img/usergrid_logo_720.png       |   Bin 0 -> 27610 bytes
 website/content/img/usergrid_logo_720p.png      |   Bin 0 -> 27608 bytes
 website/content/img/usergrid_logo_900_200.png   |   Bin 0 -> 12273 bytes
 website/content/img/usergrid_logo_white.png     |   Bin 0 -> 16900 bytes
 website/content/img/usergrid_profile_128.png    |   Bin 0 -> 6689 bytes
 website/content/img/usergrid_profile_256.png    |   Bin 0 -> 10470 bytes
 .../content/img/usergrid_profile_256_white.png  |   Bin 0 -> 10724 bytes
 .../img/usergrid_profile_512_margins.png        |   Bin 0 -> 19112 bytes
 .../content/img/usergrid_profile_64_white.png   |   Bin 0 -> 4839 bytes
 .../content/img/usergrid_profile_background.png |   Bin 0 -> 6710 bytes
 website/content/img/usergrid_screencast_bg.png  |   Bin 0 -> 20478 bytes
 website/content/img/usergrid_small_100.png      |   Bin 0 -> 6681 bytes
 website/content/img/usergrid_wiki.png           |   Bin 0 -> 7135 bytes
 website/content/img/yourkit.jpeg                |   Bin 0 -> 7763 bytes
 website/content/js/bootstrap.min.js             |     8 +
 website/content/js/head.js                      |   708 +
 website/content/js/html5shiv.js                 |     8 +
 website/content/js/jquery-1.10.1.min.js         |     6 +
 website/content/js/jquery.icheck.js             |   397 +
 website/content/js/respond.min.js               |     6 +
 website/content/js/usergrid-site.js             |    50 +
 website/content/static/github-btn.html          |     2 +
 website/crash.log                               |   143 +
 website/layouts/community.html                  |     1 +
 website/layouts/docs.html                       |     1 +
 website/lib/default.rb                          |    43 +
 website/lib/helpers_.rb                         |     0
 website/lib/pandoc.template                     |     4 +
 website/nanoc.yaml                              |    77 +
 website/run.sh                                  |     1 +
 website/tmp/checksums                           |     4 +-
 website/tmp/compiled_content                    | 87099 +----------------
 website/tmp/dependencies                        |   Bin 2372 -> 2397 bytes
 website/tmp/rule_memory                         |   Bin 0 -> 5163 bytes
 website/utilities/map-markers.rb                |    62 +
 website/utilities/markers.txt                   |   440 +
 website/utilities/snapshot-apigee.rb            |    71 +
 website/utilities/usergrid.csv                  |   290 +
 299 files changed, 111298 insertions(+), 87017 deletions(-)
----------------------------------------------------------------------



[05/36] usergrid git commit: remove batch consumer

Posted by sf...@apache.org.
remove batch consumer


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/805113c7
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/805113c7
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/805113c7

Branch: refs/heads/master
Commit: 805113c76695fe415b99ebd6f8aba47a6e0aa781
Parents: 7681600
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Sep 15 13:14:08 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Sep 24 15:00:33 2015 -0600

----------------------------------------------------------------------
 .../java/org/apache/usergrid/persistence/index/IndexFig.java     | 4 ----
 1 file changed, 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/805113c7/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index db1ef3d..4f35730 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -48,10 +48,6 @@ public interface IndexFig extends GuicyFig {
 
     String ELASTICSEARCH_FORCE_REFRESH = "elasticsearch.force_refresh";
 
-    String INDEX_BUFFER_SIZE = "elasticsearch.buffer_size";
-
-    String INDEX_BUFFER_TIMEOUT = "elasticsearch.buffer_timeout";
-
     String INDEX_BATCH_SIZE = "elasticsearch.batch_size";
 
     String INDEX_WRITE_CONSISTENCY_LEVEL = "elasticsearch.write_consistency_level";


[28/36] usergrid git commit: remove job scheduler log

Posted by sf...@apache.org.
remove job scheduler log


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/456f80d4
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/456f80d4
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/456f80d4

Branch: refs/heads/master
Commit: 456f80d4a7a94ac8b67261cbdfc1f3af828fde61
Parents: d73f98d
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Sep 28 16:13:40 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Sep 28 16:13:40 2015 -0600

----------------------------------------------------------------------
 .../org/apache/usergrid/batch/service/JobSchedulerService.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/456f80d4/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java b/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java
index d76be6f..284e1db 100644
--- a/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java
@@ -131,7 +131,7 @@ public class JobSchedulerService extends AbstractScheduledService {
             }
         }
         catch ( Throwable t ) {
-            LOG.error( "Scheduler run failed, error is", t );
+            LOG.debug( "Scheduler run failed, error is", t );
         }
     }
 


[33/36] usergrid git commit: add consistency check for create

Posted by sf...@apache.org.
add consistency check for create


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9ceae774
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9ceae774
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9ceae774

Branch: refs/heads/master
Commit: 9ceae7748ce522778fbd0d9a9bbbd6b240cf9b9b
Parents: 2e51d06
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Oct 1 09:40:14 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Oct 1 09:40:14 2015 -0600

----------------------------------------------------------------------
 stack/rest_integration_tests/lib/entities.js    | 41 ++++++++++++++++++++
 .../test/entities/create.js                     | 24 +++++++++++-
 2 files changed, 64 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/9ceae774/stack/rest_integration_tests/lib/entities.js
----------------------------------------------------------------------
diff --git a/stack/rest_integration_tests/lib/entities.js b/stack/rest_integration_tests/lib/entities.js
index c62ca0d..3693dcb 100644
--- a/stack/rest_integration_tests/lib/entities.js
+++ b/stack/rest_integration_tests/lib/entities.js
@@ -46,6 +46,37 @@ module.exports = {
             cb(error, error ? error : body);
         });
     },
+    createEach: function(collection, numberOfEntities, cb) {
+        var url = urls.appendOrgCredentials(urls.appUrl() + collection);
+        var requestArray = []
+        geos = random.geo(config.location, 2000, numberOfEntities);
+        // console.log(geos);
+        for (var i = 0; i < numberOfEntities; i++) {
+            requestArray.push({
+                consistentProperty: "somethingConsistent",
+                randomProperty: "somethingRandom - " + random.randomString(10),
+                intProperty: random.randomNumber(5),
+                optionsProperty: random.abc(),
+                location: geos[i],
+                title: "A Tale of Two Cities"
+            });
+        }
+        var returnBody = [];
+        async.each(requestArray, function(options, cb) {
+            request.post({
+                url: url,
+                json: true,
+                body: options
+            }, function(e, r, body) {
+                var error = responseLib.getError(e, r);
+                returnBody.push(body.entities[0]);
+                cb(error, error ? error : body.entities[0]);
+            });
+        }, function(err,bodies) {
+           cb(err,returnBody);
+        });
+
+    },
     deleteAll: function(collection, cb) {
         var url = urls.appendOrgCredentials(urls.appUrl() + collection);
         deleteAllEntities(collection, function(e) {
@@ -79,6 +110,16 @@ module.exports = {
             cb(error, error ? error : body);
         })
     },
+    getByUuid: function(collection, uuid, cb) {
+        var url = urls.appendOrgCredentials(urls.appUrl() + collection + "/"+uuid);
+        request.get({
+            url: url,
+            json: true
+        }, function(e, r, body) {
+            var error = responseLib.getError(e, r);
+            cb(error, error ? error : body);
+        })
+    },
     getWithQuery: function(collection, query, numberOfEntities, cb) {
         var url = urls.appendOrgCredentials(urls.appUrl() + collection + "?ql=" + encodeURIComponent(query) + "&limit=" + numberOfEntities.toString());
         request.get({

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9ceae774/stack/rest_integration_tests/test/entities/create.js
----------------------------------------------------------------------
diff --git a/stack/rest_integration_tests/test/entities/create.js b/stack/rest_integration_tests/test/entities/create.js
index 97e820c..c7e6f42 100644
--- a/stack/rest_integration_tests/test/entities/create.js
+++ b/stack/rest_integration_tests/test/entities/create.js
@@ -21,6 +21,9 @@ var config = require('../../config');
 module.exports = {
     test: function() {
         var numberOfRecords = 30;
+        var uuid = require("uuid");
+        var id = "resttest_"+ uuid.v1().toString().replace("-", "");
+
         describe("create entities", function() {
             it("should create " + numberOfRecords.toString() + " entities in the " + config.entitiesTestCollection + " collection", function(done) {
                 this.slow(numberOfRecords * 500);
@@ -29,10 +32,29 @@ module.exports = {
                     body.entities.should.be.an.instanceOf(Array).and.have.lengthOf(numberOfRecords);
                     body.entities.forEach(function(entity) {
                         entity.should.have.property("uuid").and.match(/(\w{8}(-\w{4}){3}-\w{12}?)/);
-                    })
+                    });
                     done();
                 })
             });
+            it("should create " + numberOfRecords.toString() + " entities in the " + id + " collection and check for consistency", function(done) {
+                this.slow(numberOfRecords * 500);
+                entities.createEach(id, numberOfRecords, function(err, bodies) {
+                    should(err).be.null;
+                    bodies.should.be.an.instanceOf(Array).and.have.lengthOf(numberOfRecords);
+                    bodyMap = {};
+                    bodies.forEach(function(body){
+                        bodyMap[body.uuid] = body;
+                    });
+                    entities.get(id, numberOfRecords, function (err,entityArray) {
+                        should(err).be.null;
+                        entityArray.entities.forEach(function(entity){
+                            delete(bodyMap[entity.uuid]);
+                        });
+                        should(Object.keys(bodyMap)).have.lengthOf(0);
+                        done();
+                    });
+                });
+            });
         });
     }
 };


[32/36] usergrid git commit: change branch in cf config

Posted by sf...@apache.org.
change branch in cf config


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2e51d06d
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2e51d06d
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2e51d06d

Branch: refs/heads/master
Commit: 2e51d06da9a8234a28354ba8769457696ed8f159
Parents: 6626b83
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Sep 29 16:40:29 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Sep 29 16:40:29 2015 -0600

----------------------------------------------------------------------
 stack/awscluster/gatling-cluster-cf.json | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/2e51d06d/stack/awscluster/gatling-cluster-cf.json
----------------------------------------------------------------------
diff --git a/stack/awscluster/gatling-cluster-cf.json b/stack/awscluster/gatling-cluster-cf.json
index 4cc4ab0..5f2f61b 100644
--- a/stack/awscluster/gatling-cluster-cf.json
+++ b/stack/awscluster/gatling-cluster-cf.json
@@ -77,7 +77,7 @@
         "Branch": {
             "Description": "The branch of usergrid to check out",
             "Type": "String",
-            "Default": "two-dot-o-dev"
+            "Default": "master"
         }
     },
 


[19/36] usergrid git commit: Merge branch '2.1-release' into remove-buffer

Posted by sf...@apache.org.
Merge branch '2.1-release' into remove-buffer


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/e6c6c36f
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/e6c6c36f
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/e6c6c36f

Branch: refs/heads/master
Commit: e6c6c36fa3e00514a76d0e690c961f3d73ad2e60
Parents: 28919ae 175d526
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Sep 25 14:18:48 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Sep 25 14:18:48 2015 -0600

----------------------------------------------------------------------
 .../services/assets/data/AwsSdkS3BinaryStore.java        | 11 ++++-------
 1 file changed, 4 insertions(+), 7 deletions(-)
----------------------------------------------------------------------



[14/36] usergrid git commit: add delete queue for cleanup and poll for messages

Posted by sf...@apache.org.
add delete queue for cleanup and poll for messages


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/a5b51c9a
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/a5b51c9a
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/a5b51c9a

Branch: refs/heads/master
Commit: a5b51c9a8bdc5249fdae1ac618da4c6cd3e0a70d
Parents: 109ce2d
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Sep 25 10:36:40 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Sep 25 11:30:12 2015 -0600

----------------------------------------------------------------------
 .../persistence/queue/DefaultQueueManager.java  |  5 +++
 .../persistence/queue/QueueManager.java         |  5 +++
 .../queue/impl/SNSQueueManagerImpl.java         |  9 +++++
 .../queue/impl/SQSQueueManagerImpl.java         |  7 ++++
 .../persistence/queue/QueueManagerTest.java     | 37 ++++++++++++++++----
 .../services/queues/ImportQueueManager.java     |  5 +++
 6 files changed, 61 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/a5b51c9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
index c72e109..d974529 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
@@ -72,4 +72,9 @@ public class DefaultQueueManager implements QueueManager {
         String uuid = UUID.randomUUID().toString();
         queue.add(new QueueMessage(uuid,"handle_"+uuid,body,"put type here"));
     }
+
+    @Override
+    public void deleteQueue() {
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/a5b51c9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index 0ec2337..027abb2 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -69,4 +69,9 @@ public interface QueueManager {
      * @throws IOException
      */
     void sendMessage(Object body)throws IOException;
+
+    /**
+     * purge messages
+     */
+    void deleteQueue();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/a5b51c9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 8fb0f52..bc63f53 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -449,6 +449,15 @@ public class SNSQueueManagerImpl implements QueueManager {
 
     }
 
+    @Override
+    public void deleteQueue() {
+        logger.warn("Deleting queue: "+getReadQueue().getUrl());
+        sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getReadQueue().getUrl()));
+        logger.warn("Deleting queue: "+getReadQueue().getUrl()+"_dead");
+        sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getReadQueue().getUrl()+"_dead"));
+
+    }
+
 
     @Override
     public void commitMessage(final QueueMessage queueMessage) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/a5b51c9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index 075e90c..daa1cb5 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -327,6 +327,13 @@ public class SQSQueueManagerImpl implements QueueManager {
         return region;
     }
 
+    @Override
+    public void deleteQueue() {
+        logger.warn("Deleting queue: "+getQueue().getUrl());
+        sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getQueue().getUrl()));
+    }
+
+
 
     /**
      * Create the SQS client for the specified settings

http://git-wip-us.apache.org/repos/asf/usergrid/blob/a5b51c9a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index 29e88ce..ac70af6 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -20,7 +20,6 @@
 package org.apache.usergrid.persistence.queue;
 
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -63,16 +62,24 @@ public class QueueManagerTest {
     protected QueueScope scope;
     private QueueManager qm;
 
+    public static long queueSeed = System.currentTimeMillis();
+
 
     @Before
     public void mockApp() {
-        this.scope = new QueueScopeImpl( "testQueue", QueueScope.RegionImplementation.LOCAL);
+
+        this.scope = new QueueScopeImpl( "testQueue"+queueSeed++, QueueScope.RegionImplementation.LOCAL);
         qm = qmf.getQueueManager(scope);
     }
 
+    @org.junit.After
+    public void cleanup(){
+        qm.deleteQueue();
+    }
+
 
     @Test
-    public void send() throws IOException,ClassNotFoundException{
+    public void send() throws Exception{
         String value = "bodytest";
         qm.sendMessage(value);
         List<QueueMessage> messageList = qm.getMessages(1,5000,5000,String.class).toList().toBlocking().last();
@@ -81,13 +88,14 @@ public class QueueManagerTest {
             assertTrue(message.getBody().equals(value));
             qm.commitMessage(message);
         }
+
         messageList = qm.getMessages(1,5000,5000,String.class).toList().toBlocking().last();
         assertTrue(messageList.size() <= 0);
 
     }
 
     @Test
-    public void sendMore() throws IOException,ClassNotFoundException{
+    public void sendMore() throws Exception{
         HashMap<String,String> values = new HashMap<>();
         values.put("test","Test");
 
@@ -107,7 +115,7 @@ public class QueueManagerTest {
     }
 
     @Test
-    public void queueSize() throws IOException,ClassNotFoundException{
+    public void queueSize() throws Exception{
         HashMap<String,String> values = new HashMap<>();
         values.put("test", "Test");
 
@@ -115,8 +123,16 @@ public class QueueManagerTest {
         bodies.add(values);
         long initialDepth = qm.getQueueDepth();
         qm.sendMessages(bodies);
-        long depth = qm.getQueueDepth();
+        long depth = 0;
+        for(int i=0; i<10;i++){
+             depth = qm.getQueueDepth();
+            if(depth>0){
+                break;
+            }
+            Thread.sleep(1000);
+        }
         assertTrue(depth>0);
+
         List<QueueMessage> messageList = qm.getMessages(10,5000,5000,values.getClass()).toList().toBlocking().last();
         assertTrue(messageList.size() <= 500);
         for(QueueMessage message : messageList){
@@ -125,9 +141,16 @@ public class QueueManagerTest {
         if(messageList.size()>0) {
             qm.commitMessages(messageList);
         }
-        depth = qm.getQueueDepth();
+        for(int i=0; i<10;i++){
+            depth = qm.getQueueDepth();
+            if(depth==initialDepth){
+                break;
+            }
+            Thread.sleep(1000);
+        }
         assertEquals(initialDepth, depth);
     }
 
 
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/a5b51c9a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
index d74f688..bca9a49 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
@@ -68,4 +68,9 @@ public class ImportQueueManager implements QueueManager {
     public void sendMessage( final Object body ) throws IOException {
 
     }
+
+    @Override
+    public void deleteQueue() {
+
+    }
 }


[15/36] usergrid git commit: remove observable

Posted by sf...@apache.org.
remove observable


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/407ebe03
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/407ebe03
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/407ebe03

Branch: refs/heads/master
Commit: 407ebe037beb275c92183b450b0885d2b421a27b
Parents: a5b51c9
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Sep 25 13:35:59 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Sep 25 13:35:59 2015 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/index/impl/EsIndexProducerImpl.java       | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/407ebe03/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
index ef59abb..9223293 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
@@ -141,7 +141,7 @@ public class EsIndexProducerImpl implements IndexProducer {
 
         //now that we've processed them all, ack the futures after our last batch comes through
         final Observable<IndexOperationMessage> processedIndexOperations =
-            requests.lastOrDefault(null).flatMap(lastRequest -> {
+            requests.flatMap(lastRequest -> {
                 if (lastRequest != null) {
                     return Observable.just(batch);
                 } else {


[08/36] usergrid git commit: add subscribe to entity verifier

Posted by sf...@apache.org.
add subscribe to entity verifier


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/a5ece510
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/a5ece510
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/a5ece510

Branch: refs/heads/master
Commit: a5ece510eca65dfd3606825d72975b90c17ea826
Parents: 3ece30f
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Sep 15 15:39:20 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Sep 24 15:01:10 2015 -0600

----------------------------------------------------------------------
 .../pipeline/read/search/CandidateEntityFilter.java             | 5 +++--
 .../persistence/index/impl/EsIndexBufferConsumerImpl.java       | 2 +-
 .../usergrid/persistence/index/impl/IndexBufferConsumer.java    | 2 ++
 3 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/a5ece510/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
index c056b33..14c880f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
@@ -61,7 +61,8 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
     @Inject
     public CandidateEntityFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
                                   final EntityIndexFactory entityIndexFactory,
-                                  final IndexLocationStrategyFactory indexLocationStrategyFactory) {
+                                  final IndexLocationStrategyFactory indexLocationStrategyFactory
+                                  ) {
         this.entityCollectionManagerFactory = entityCollectionManagerFactory;
         this.entityIndexFactory = entityIndexFactory;
         this.indexLocationStrategyFactory = indexLocationStrategyFactory;
@@ -170,7 +171,7 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
                 validate( candidateResult );
             }
 
-            batch.execute();
+           batch.execute().subscribe();
         }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/a5ece510/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 514e6eb..d126b5d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import com.codahale.metrics.Histogram;
 import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.elasticsearch.action.WriteConsistencyLevel;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -91,7 +92,6 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
 
     }
 
-
     public Observable<IndexOperationMessage>  put( IndexOperationMessage message ) {
         Preconditions.checkNotNull(message, "Message cannot be null");
         indexSizeCounter.inc(message.getDeIndexRequests().size());

http://git-wip-us.apache.org/repos/asf/usergrid/blob/a5ece510/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
index df2119c..cfeb505 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
@@ -18,6 +18,7 @@
 package org.apache.usergrid.persistence.index.impl;
 
 
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import rx.Observable;
 
 import java.util.List;
@@ -35,4 +36,5 @@ public interface IndexBufferConsumer {
      * @return
      */
     Observable<IndexOperationMessage>  put(IndexOperationMessage message);
+
 }


[10/36] usergrid git commit: Fixes NPE from missing org during testing

Posted by sf...@apache.org.
Fixes NPE from missing org during testing


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/52335673
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/52335673
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/52335673

Branch: refs/heads/master
Commit: 52335673b3f9b1dd63300654785a28c9278d22ea
Parents: 0b243c4
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Sep 24 18:32:04 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Sep 24 18:34:14 2015 -0600

----------------------------------------------------------------------
 .../usergrid/management/AppInfoMigrationPlugin.java      | 11 +++++++++++
 1 file changed, 11 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/52335673/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java b/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java
index ff6bbe3..b78b646 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java
@@ -178,6 +178,8 @@ public class AppInfoMigrationPlugin implements MigrationPlugin {
 
         final String name = ( String ) oldAppInfoMap.get( PROPERTY_NAME );
 
+        logger.info( "Attempting to migrate app {}", name );
+
         try {
             final String orgName = name.split( "/" )[0];
             final String appName = name.split( "/" )[1];
@@ -190,6 +192,15 @@ public class AppInfoMigrationPlugin implements MigrationPlugin {
             //avoid management org
 
             EntityRef orgRef = managementEm.getAlias( Group.ENTITY_TYPE, orgName );
+
+            /**
+             * No op, we couldn't find the org, so we can't roll the app forward
+             */
+            if(orgRef == null){
+                logger.error( "Unable to retrieve ref for org {}.  Not migrating app {}", orgName, appName );
+                return;
+            }
+
             // create and connect new APPLICATION_INFO oldAppInfo to Organization
             managementService.createApplication( orgRef.getUuid(), name, applicationId, null );
 


[27/36] usergrid git commit: rewrite observable

Posted by sf...@apache.org.
rewrite observable


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/d73f98db
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/d73f98db
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/d73f98db

Branch: refs/heads/master
Commit: d73f98dbfaec511c74c7801ff046755f29da3a69
Parents: 0c4c1ab
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Sep 28 15:59:48 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Sep 28 15:59:48 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 81 ++++++++++++--------
 1 file changed, 50 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/d73f98db/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 5f681e7..50b210e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -232,56 +232,75 @@ public class AmazonAsyncEventService implements AsyncEventService {
             logger.debug("handleMessages with {} message", messages.size());
         }
 
-        Observable<IndexEventResult> merged = Observable.empty();
-        for (QueueMessage message : messages) {
+        Observable<IndexEventResult> masterObservable = Observable.from(messages).flatMap(message -> {
             final AsyncEvent event = (AsyncEvent) message.getBody();
 
             logger.debug("Processing {} event", event);
 
             if (event == null) {
                 logger.error("AsyncEvent type or event is null!");
-                continue;
+                return Observable.empty();
             }
+            try {
+                //merge each operation to a master observable;
+                if (event instanceof EdgeDeleteEvent) {
+                    return handleIndexOperation(message, queueMessage -> handleEdgeDelete(queueMessage));
+                } else if (event instanceof EdgeIndexEvent) {
+                    return handleIndexOperation(message, queueMessage -> handleEdgeIndex(queueMessage));
+                } else if (event instanceof EntityDeleteEvent) {
+                    return handleIndexOperation(message, queueMessage -> handleEntityDelete(queueMessage));
+                } else if (event instanceof EntityIndexEvent) {
+                    return handleIndexOperation(message, queueMessage -> handleEntityIndexUpdate(queueMessage));
+                } else if (event instanceof InitializeApplicationIndexEvent) {
+                    //does not return observable
+                    handleInitializeApplicationIndex(message);
+                    return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), true));
+                } else {
+                    logger.error("Unknown EventType: {}", event);
+                    return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), false));
+                }
+            }catch (Exception e){
+                logger.error("Failed to index entity", e,message);
+                return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), false));
+            }finally {
+                messageCycle.update(System.currentTimeMillis() - event.getCreationTime());
 
-            if (event instanceof EdgeDeleteEvent) {
-               merged = merged.mergeWith(callHandleIndex(message, queueMessage -> handleEdgeDelete(queueMessage)));
-            } else if (event instanceof EdgeIndexEvent) {
-               merged = merged.mergeWith(callHandleIndex(message, queueMessage -> handleEdgeIndex(queueMessage)));
-            } else if (event instanceof EntityDeleteEvent) {
-                merged = merged.mergeWith( callHandleIndex(message, queueMessage -> handleEntityDelete(queueMessage)));
-            } else if (event instanceof EntityIndexEvent) {
-                merged = merged.mergeWith(callHandleIndex(message, queueMessage -> handleEntityIndexUpdate(queueMessage)));
-            } else if (event instanceof InitializeApplicationIndexEvent) {
-                //does not return observable
-                handleInitializeApplicationIndex(message);
-            } else {
-                logger.error("Unknown EventType: {}", event);
             }
+        });
 
-            messageCycle.update(System.currentTimeMillis() - event.getCreationTime());
-        }
-
-        merged
+        masterObservable
+            //remove unsuccessful
             .filter(indexEventResult -> indexEventResult.success() && indexEventResult.getIndexOperationMessage().isPresent())
+            //take the max
             .buffer(MAX_TAKE)
-            .flatMap(indexEventResults -> {
+            //map them to index results and return them
+            .map(indexEventResults -> {
                 IndexOperationMessage combined = new IndexOperationMessage();
-                Observable.from(indexEventResults)
-                    .doOnNext(indexEventResult -> combined.ingest(indexEventResult.getIndexOperationMessage().get())).subscribe();
-                indexProducer.put(combined).subscribe();
-                return Observable.from(indexEventResults);
+                indexEventResults.stream()
+                    .forEach(indexEventResult -> combined.ingest(indexEventResult.getIndexOperationMessage().get()));
+                indexProducer.put(combined).subscribe();//execute the index operation
+                return indexEventResults;
+            })
+                //flat map the ops so they are back to individual
+            .flatMap(indexEventResults -> Observable.from(indexEventResults))
+            //ack each message
+            .map(indexEventResult -> {
+                ack(indexEventResult.queueMessage);
+                return indexEventResult;
             })
-            .doOnNext(indexEventResult ->ack(indexEventResult.queueMessage))
             .subscribe();
     }
 
-    private Observable<IndexEventResult> callHandleIndex(QueueMessage message, Func1<QueueMessage, Observable<IndexOperationMessage>> toCall){
+    //transform index operation to
+    private Observable<IndexEventResult> handleIndexOperation(QueueMessage queueMessage,
+                                                              Func1<QueueMessage, Observable<IndexOperationMessage>> operation
+    ){
         try{
-            IndexOperationMessage indexOperationMessage =  toCall.call(message).toBlocking().lastOrDefault(null);
-            return Observable.just(new IndexEventResult(message,Optional.fromNullable(indexOperationMessage),true));
+            return operation.call(queueMessage)
+                .map(indexOperationMessage -> new IndexEventResult(queueMessage, Optional.fromNullable(indexOperationMessage), true));
         }catch (Exception e){
             logger.error("failed to run index",e);
-            return Observable.just( new IndexEventResult(message, Optional.<IndexOperationMessage>absent(),false));
+            return Observable.just( new IndexEventResult(queueMessage, Optional.<IndexOperationMessage>absent(),false));
         }
     }
 
@@ -548,7 +567,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
     private void subscribeAndAck( final Observable<?> observable, final QueueMessage message ){
        observable.doOnCompleted( ()-> ack(message)  ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
     }
-    public static class IndexEventResult{
+    public class IndexEventResult{
         private final QueueMessage queueMessage;
         private final Optional<IndexOperationMessage> indexOperationMessage;
         private final boolean success;


[07/36] usergrid git commit: removing async processing

Posted by sf...@apache.org.
removing async processing


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/3ece30f0
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/3ece30f0
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/3ece30f0

Branch: refs/heads/master
Commit: 3ece30f018bbd28feea53537b62724b247904bfa
Parents: 2a16827
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Sep 15 15:09:28 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Sep 24 15:00:59 2015 -0600

----------------------------------------------------------------------
 .../apache/usergrid/corepersistence/CoreModule.java   |  4 ----
 .../usergrid/corepersistence/CpEntityManager.java     |  9 ---------
 .../corepersistence/index/IndexServiceImpl.java       |  4 ----
 .../corepersistence/StaleIndexCleanupTest.java        | 14 +++++---------
 .../index/impl/EsEntityIndexBatchImpl.java            |  5 +----
 .../index/impl/EsIndexBufferConsumerImpl.java         |  4 +---
 6 files changed, 7 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ece30f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index 486aa6f..959edec 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -77,14 +77,10 @@ public class CoreModule  extends AbstractModule {
 
 
 
-    public static final String EVENTS_DISABLED = "corepersistence.events.disabled";
-
-
 
     @Override
     protected void configure() {
 
-
         install( new CommonModule());
         install( new CollectionModule() {
             /**

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ece30f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index dc3d6a2..d46c112 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -2627,15 +2627,6 @@ public class CpEntityManager implements EntityManager {
             handleWriteUniqueVerifyException( entity, wuve );
         }
 
-
-        // Index CP entity into default collection scope
-        //        IndexScope defaultIndexScope = new IndexScopeImpl(
-        //            applicationScope.getApplication(),
-        //            applicationScope.getApplication(),
-        //            CpEntityManager.getCollectionScopeNameFromEntityType( entity.getType() ) );
-        //        EntityIndex ei = managerCache.getEntityIndex( applicationScope );
-        //        ei.createBatch().index( defaultIndexScope, cpEntity ).execute();
-
         // reflect changes in the legacy Entity
         entity.setUuid( cpEntity.getId().getUuid() );
         entity.setProperties( cpEntity );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ece30f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 3ca33a3..8a8dba1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -104,10 +104,6 @@ public class IndexServiceImpl implements IndexService {
 
         //do our observable for batching
         //try to send a whole batch if we can
-
-
-        //do our observable for batching
-        //try to send a whole batch if we can
         final Observable<IndexOperationMessage>  batches =  sourceEdgesToIndex.buffer( indexFig.getIndexBatchSize() )
 
             //map into batches based on our buffer size

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ece30f0/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index 832409b..366d41c 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -60,7 +60,6 @@ import com.google.inject.Injector;
 
 import net.jcip.annotations.NotThreadSafe;
 
-import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED;
 import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
 import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
 import static org.junit.Assert.assertEquals;
@@ -73,6 +72,7 @@ import static org.junit.Assert.assertTrue;
 @NotThreadSafe
 public class StaleIndexCleanupTest extends AbstractCoreIT {
     private static final Logger logger = LoggerFactory.getLogger( StaleIndexCleanupTest.class );
+    public static final String EVENTS_DISABLED = "corepersistence.events.disabled";
 
     // take it easy on Cassandra
     private static final long writeDelayMs = 0;
@@ -379,7 +379,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
         app.refreshIndex();
 
         Thread.sleep(250); // refresh happens asynchronously, wait for some time
-        
+
 
         //we can't use our candidate result sets here.  The repair won't happen since we now have orphaned documents in our index
         //us the EM so the repair process happens
@@ -401,7 +401,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
      * Test that the EntityDeleteImpl cleans up stale indexes on update. Ensures that when an
      * entity is updated its old indexes are cleared from ElasticSearch.
      */
-    @Test(timeout=30000)
+    @Test()
     public void testCleanupOnUpdate() throws Exception {
 
         logger.info( "Started testCleanupOnUpdate()" );
@@ -425,20 +425,17 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
         app.refreshIndex();
 
         CandidateResults crs = queryCollectionCp( "dogs", "dog", "select *");
-        Assert.assertEquals( "Expect no stale candidates yet", numEntities, crs.size() );
+        Assert.assertEquals("Expect no stale candidates yet", numEntities, crs.size());
 
         // turn off post processing stuff that cleans up stale entities
-        System.setProperty( EVENTS_DISABLED, "false" );
 
         // update each entity a bunch of times
 
-        List<Entity> maxVersions = new ArrayList<>(numEntities);
         int count = 0;
         for ( Entity dog : dogs ) {
-            Entity toUpdate = null;
 
             for ( int j=0; j<numUpdates; j++) {
-                toUpdate = em.get( dog.getUuid() );
+                Entity toUpdate = em.get( dog.getUuid() );
                 toUpdate.setProperty( "property", RandomStringUtils.randomAlphanumeric(10));
                 em.update(toUpdate);
                 count++;
@@ -447,7 +444,6 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
                 }
             }
 
-            maxVersions.add( toUpdate );
         }
         app.refreshIndex();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ece30f0/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 87b2368..c11feed 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -123,10 +123,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
     @Override
     public Observable execute() {
-        IndexOperationMessage tempContainer = container;
-        container = new IndexOperationMessage();
-
-        return indexBatchBufferProducer.put( tempContainer );
+        return indexBatchBufferProducer.put( container );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ece30f0/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 93c0d85..514e6eb 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -96,7 +96,6 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
         Preconditions.checkNotNull(message, "Message cannot be null");
         indexSizeCounter.inc(message.getDeIndexRequests().size());
         indexSizeCounter.inc(message.getIndexRequests().size());
-
         return  processBatch(message);
     }
 
@@ -185,8 +184,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
 
         try {
             responses = bulkRequest.execute().actionGet( );
-        }
-        catch ( Throwable t ) {
+        } catch ( Throwable t ) {
             log.error( "Unable to communicate with elasticsearch" );
             failureMonitor.fail( "Unable to execute batch", t );
             throw t;


[12/36] usergrid git commit: add delete queue for cleanup and poll for messages

Posted by sf...@apache.org.
add delete queue for cleanup and poll for messages


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/c7a6ebf0
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/c7a6ebf0
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/c7a6ebf0

Branch: refs/heads/master
Commit: c7a6ebf0329a95e6813667d81fa387ecf0240d14
Parents: da6afb1
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Sep 25 10:36:40 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Sep 25 10:36:40 2015 -0600

----------------------------------------------------------------------
 .../persistence/queue/DefaultQueueManager.java  |  5 +++
 .../persistence/queue/QueueManager.java         |  5 +++
 .../queue/impl/SNSQueueManagerImpl.java         |  9 +++++
 .../queue/impl/SQSQueueManagerImpl.java         |  7 ++++
 .../persistence/queue/QueueManagerTest.java     | 37 ++++++++++++++++----
 .../services/queues/ImportQueueManager.java     |  5 +++
 6 files changed, 61 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/c7a6ebf0/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
index c72e109..d974529 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
@@ -72,4 +72,9 @@ public class DefaultQueueManager implements QueueManager {
         String uuid = UUID.randomUUID().toString();
         queue.add(new QueueMessage(uuid,"handle_"+uuid,body,"put type here"));
     }
+
+    @Override
+    public void deleteQueue() {
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c7a6ebf0/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index 0ec2337..027abb2 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -69,4 +69,9 @@ public interface QueueManager {
      * @throws IOException
      */
     void sendMessage(Object body)throws IOException;
+
+    /**
+     * purge messages
+     */
+    void deleteQueue();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c7a6ebf0/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 8fb0f52..bc63f53 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -449,6 +449,15 @@ public class SNSQueueManagerImpl implements QueueManager {
 
     }
 
+    @Override
+    public void deleteQueue() {
+        logger.warn("Deleting queue: "+getReadQueue().getUrl());
+        sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getReadQueue().getUrl()));
+        logger.warn("Deleting queue: "+getReadQueue().getUrl()+"_dead");
+        sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getReadQueue().getUrl()+"_dead"));
+
+    }
+
 
     @Override
     public void commitMessage(final QueueMessage queueMessage) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c7a6ebf0/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index 075e90c..daa1cb5 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -327,6 +327,13 @@ public class SQSQueueManagerImpl implements QueueManager {
         return region;
     }
 
+    @Override
+    public void deleteQueue() {
+        logger.warn("Deleting queue: "+getQueue().getUrl());
+        sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getQueue().getUrl()));
+    }
+
+
 
     /**
      * Create the SQS client for the specified settings

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c7a6ebf0/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index 29e88ce..ac70af6 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -20,7 +20,6 @@
 package org.apache.usergrid.persistence.queue;
 
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -63,16 +62,24 @@ public class QueueManagerTest {
     protected QueueScope scope;
     private QueueManager qm;
 
+    public static long queueSeed = System.currentTimeMillis();
+
 
     @Before
     public void mockApp() {
-        this.scope = new QueueScopeImpl( "testQueue", QueueScope.RegionImplementation.LOCAL);
+
+        this.scope = new QueueScopeImpl( "testQueue"+queueSeed++, QueueScope.RegionImplementation.LOCAL);
         qm = qmf.getQueueManager(scope);
     }
 
+    @org.junit.After
+    public void cleanup(){
+        qm.deleteQueue();
+    }
+
 
     @Test
-    public void send() throws IOException,ClassNotFoundException{
+    public void send() throws Exception{
         String value = "bodytest";
         qm.sendMessage(value);
         List<QueueMessage> messageList = qm.getMessages(1,5000,5000,String.class).toList().toBlocking().last();
@@ -81,13 +88,14 @@ public class QueueManagerTest {
             assertTrue(message.getBody().equals(value));
             qm.commitMessage(message);
         }
+
         messageList = qm.getMessages(1,5000,5000,String.class).toList().toBlocking().last();
         assertTrue(messageList.size() <= 0);
 
     }
 
     @Test
-    public void sendMore() throws IOException,ClassNotFoundException{
+    public void sendMore() throws Exception{
         HashMap<String,String> values = new HashMap<>();
         values.put("test","Test");
 
@@ -107,7 +115,7 @@ public class QueueManagerTest {
     }
 
     @Test
-    public void queueSize() throws IOException,ClassNotFoundException{
+    public void queueSize() throws Exception{
         HashMap<String,String> values = new HashMap<>();
         values.put("test", "Test");
 
@@ -115,8 +123,16 @@ public class QueueManagerTest {
         bodies.add(values);
         long initialDepth = qm.getQueueDepth();
         qm.sendMessages(bodies);
-        long depth = qm.getQueueDepth();
+        long depth = 0;
+        for(int i=0; i<10;i++){
+             depth = qm.getQueueDepth();
+            if(depth>0){
+                break;
+            }
+            Thread.sleep(1000);
+        }
         assertTrue(depth>0);
+
         List<QueueMessage> messageList = qm.getMessages(10,5000,5000,values.getClass()).toList().toBlocking().last();
         assertTrue(messageList.size() <= 500);
         for(QueueMessage message : messageList){
@@ -125,9 +141,16 @@ public class QueueManagerTest {
         if(messageList.size()>0) {
             qm.commitMessages(messageList);
         }
-        depth = qm.getQueueDepth();
+        for(int i=0; i<10;i++){
+            depth = qm.getQueueDepth();
+            if(depth==initialDepth){
+                break;
+            }
+            Thread.sleep(1000);
+        }
         assertEquals(initialDepth, depth);
     }
 
 
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c7a6ebf0/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
index d74f688..bca9a49 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
@@ -68,4 +68,9 @@ public class ImportQueueManager implements QueueManager {
     public void sendMessage( final Object body ) throws IOException {
 
     }
+
+    @Override
+    public void deleteQueue() {
+
+    }
 }


[21/36] usergrid git commit: remove index batch execute

Posted by sf...@apache.org.
remove index batch execute


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/4c263b87
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/4c263b87
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/4c263b87

Branch: refs/heads/master
Commit: 4c263b870d0b45f53e198cad876373485c188669
Parents: a770024
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Sep 28 10:12:54 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Sep 28 10:12:54 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpManagerCache.java         | 12 ++++-
 .../corepersistence/CpRelationManager.java      |  4 +-
 .../usergrid/corepersistence/ManagerCache.java  |  7 +++
 .../corepersistence/index/IndexServiceImpl.java |  8 ++--
 .../read/search/CandidateEntityFilter.java      | 16 +++++--
 .../pipeline/read/search/CandidateIdFilter.java | 16 +++++--
 .../persistence/index/EntityIndexBatch.java     |  7 ++-
 .../index/impl/EsEntityIndexBatchImpl.java      |  8 ++--
 .../index/impl/EsEntityIndexFactoryImpl.java    |  4 --
 .../index/impl/EsEntityIndexImpl.java           |  5 +-
 .../persistence/index/impl/EntityIndexTest.java | 48 +++++++++++---------
 .../persistence/index/impl/GeoPagingTest.java   |  4 +-
 .../index/impl/IndexLoadTestsIT.java            |  5 +-
 13 files changed, 88 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
index 89f2ab2..0408bbd 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
@@ -26,6 +26,7 @@ import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.IndexLocationStrategy;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.apache.usergrid.persistence.map.MapManager;
 import org.apache.usergrid.persistence.map.MapManagerFactory;
 import org.apache.usergrid.persistence.map.MapScope;
@@ -43,6 +44,7 @@ public class CpManagerCache implements ManagerCache {
     private final GraphManagerFactory gmf;
     private final MapManagerFactory mmf;
     private final IndexLocationStrategyFactory indexLocationStrategyFactory;
+    private final IndexProducer indexProducer;
 
     // TODO: consider making these cache sizes and timeouts configurable
 
@@ -52,13 +54,16 @@ public class CpManagerCache implements ManagerCache {
                            final EntityIndexFactory eif,
                            final GraphManagerFactory gmf,
                            final MapManagerFactory mmf,
-                           final IndexLocationStrategyFactory indexLocationStrategyFactory) {
+                           final IndexLocationStrategyFactory indexLocationStrategyFactory,
+                           final IndexProducer indexProducer
+    ) {
 
         this.ecmf = ecmf;
         this.eif = eif;
         this.gmf = gmf;
         this.mmf = mmf;
         this.indexLocationStrategyFactory = indexLocationStrategyFactory;
+        this.indexProducer = indexProducer;
     }
 
 
@@ -88,6 +93,11 @@ public class CpManagerCache implements ManagerCache {
         return mmf.createMapManager( mapScope );
     }
 
+    @Override
+    public IndexProducer getIndexProducer() {
+        return indexProducer;
+    }
+
 
     @Override
     public void invalidate() {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index de687b3..b4b39a0 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -535,9 +535,7 @@ public class CpRelationManager implements RelationManager {
 
         batch.deindex( indexScope, memberEntity );
 
-
-        batch.execute();
-
+        managerCache.getIndexProducer().put( batch.build()).subscribe();
 
         // special handling for roles collection of a group
         if ( headEntity.getType().equals( Group.ENTITY_TYPE ) ) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagerCache.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagerCache.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagerCache.java
index 6425b61..1dee80a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagerCache.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagerCache.java
@@ -25,6 +25,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.index.IndexLocationStrategy;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.apache.usergrid.persistence.map.MapManager;
 import org.apache.usergrid.persistence.map.MapScope;
 
@@ -66,6 +67,12 @@ public interface ManagerCache {
     MapManager getMapManager(MapScope mapScope);
 
     /**
+     * gets index producer
+     * @return
+     */
+    IndexProducer getIndexProducer();
+
+    /**
      * invalidate the cache
      */
     void invalidate();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 8a8dba1..d160aac 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -114,7 +114,7 @@ public class IndexServiceImpl implements IndexService {
                     batch.index( indexEdge, entity );
                 } )
                     //return the future from the batch execution
-                .flatMap( batch -> batch.execute() ) );
+                .flatMap( batch -> Observable.just(batch.build()) ) );
 
         return ObservableTimer.time( batches, indexTimer );
     }
@@ -142,7 +142,7 @@ public class IndexServiceImpl implements IndexService {
 
             batch.index( indexEdge, entity );
 
-            return batch.execute();
+            return Observable.just(batch.build());
         } );
 
         return ObservableTimer.time( batches, addTimer  );
@@ -185,7 +185,7 @@ public class IndexServiceImpl implements IndexService {
 
                 batch = deindexBatchIteratorResolver( fromTarget, sourceEdgesToBeDeindexed, batch );
 
-                return batch.execute();
+                return Observable.just(batch.build());
             } );
 
         return ObservableTimer.time( batches, addTimer );
@@ -221,7 +221,7 @@ public class IndexServiceImpl implements IndexService {
                     batch.deindex( searchEdge, candidateResult );
                 } )
                     //return the future from the batch execution
-                .flatMap( batch -> batch.execute() );
+                .flatMap( batch ->Observable.just(batch.build()) );
 
         return ObservableTimer.time(batches, indexTimer);
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
index 14c880f..ceb18ae 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
@@ -24,6 +24,7 @@ import java.util.*;
 
 import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
 import org.apache.usergrid.persistence.index.*;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.apache.usergrid.persistence.model.field.Field;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,16 +57,19 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
     private final EntityCollectionManagerFactory entityCollectionManagerFactory;
     private final EntityIndexFactory entityIndexFactory;
     private final IndexLocationStrategyFactory indexLocationStrategyFactory;
+    private final IndexProducer indexProducer;
 
 
     @Inject
     public CandidateEntityFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
                                   final EntityIndexFactory entityIndexFactory,
-                                  final IndexLocationStrategyFactory indexLocationStrategyFactory
+                                  final IndexLocationStrategyFactory indexLocationStrategyFactory,
+                                  final IndexProducer indexProducer
                                   ) {
         this.entityCollectionManagerFactory = entityCollectionManagerFactory;
         this.entityIndexFactory = entityIndexFactory;
         this.indexLocationStrategyFactory = indexLocationStrategyFactory;
+        this.indexProducer = indexProducer;
     }
 
 
@@ -108,7 +112,7 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
                         //now we have a collection, validate our canidate set is correct.
                         return entitySets.map(
                             entitySet -> new EntityVerifier(
-                                applicationIndex.createBatch(), entitySet, candidateResults)
+                                applicationIndex.createBatch(), entitySet, candidateResults,indexProducer)
                         )
                             .doOnNext(entityCollector -> entityCollector.merge())
                             .flatMap(entityCollector -> Observable.from(entityCollector.getResults()))
@@ -150,14 +154,17 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
 
         private final EntityIndexBatch batch;
         private final List<FilterResult<Candidate>> candidateResults;
+        private final IndexProducer indexProducer;
         private final EntitySet entitySet;
 
 
         public EntityVerifier( final EntityIndexBatch batch, final EntitySet entitySet,
-                               final List<FilterResult<Candidate>> candidateResults ) {
+                               final List<FilterResult<Candidate>> candidateResults,
+                               final IndexProducer indexProducer) {
             this.batch = batch;
             this.entitySet = entitySet;
             this.candidateResults = candidateResults;
+            this.indexProducer = indexProducer;
             this.results = new ArrayList<>( entitySet.size() );
         }
 
@@ -171,7 +178,8 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
                 validate( candidateResult );
             }
 
-           batch.execute().subscribe();
+            indexProducer.put(batch.build()).subscribe();
+
         }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java
index 3b1c102..b2fd675 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java
@@ -26,6 +26,7 @@ import java.util.UUID;
 
 import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
 import org.apache.usergrid.persistence.index.*;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,15 +54,19 @@ public class CandidateIdFilter extends AbstractFilter<FilterResult<Candidate>, F
     private final EntityCollectionManagerFactory entityCollectionManagerFactory;
     private final EntityIndexFactory entityIndexFactory;
     private final IndexLocationStrategyFactory indexLocationStrategyFactory;
+    private final IndexProducer indexProducer;
 
 
     @Inject
     public CandidateIdFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
                               final EntityIndexFactory entityIndexFactory,
-                              final IndexLocationStrategyFactory indexLocationStrategyFactory) {
+                              final IndexLocationStrategyFactory indexLocationStrategyFactory,
+                              final IndexProducer indexProducer
+                              ) {
         this.entityCollectionManagerFactory = entityCollectionManagerFactory;
         this.entityIndexFactory = entityIndexFactory;
         this.indexLocationStrategyFactory = indexLocationStrategyFactory;
+        this.indexProducer = indexProducer;
     }
 
 
@@ -97,7 +102,7 @@ public class CandidateIdFilter extends AbstractFilter<FilterResult<Candidate>, F
 
                     return versionSetObservable.map(
                         entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet,
-                            candidateResults ) ).doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
+                            candidateResults, indexProducer ) ).doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
                         entityCollector -> Observable.from( entityCollector.collectResults() ) );
                 } );
 
@@ -115,14 +120,16 @@ public class CandidateIdFilter extends AbstractFilter<FilterResult<Candidate>, F
 
         private final EntityIndexBatch batch;
         private final List<FilterResult<Candidate>> candidateResults;
+        private final IndexProducer indexProducer;
         private final VersionSet versionSet;
 
 
         public EntityCollector( final EntityIndexBatch batch, final VersionSet versionSet,
-                                final List<FilterResult<Candidate>> candidateResults ) {
+                                final List<FilterResult<Candidate>> candidateResults, final IndexProducer indexProducer ) {
             this.batch = batch;
             this.versionSet = versionSet;
             this.candidateResults = candidateResults;
+            this.indexProducer = indexProducer;
             this.results = new ArrayList<>( versionSet.size() );
         }
 
@@ -136,7 +143,8 @@ public class CandidateIdFilter extends AbstractFilter<FilterResult<Candidate>, F
                 validate( candidateResult );
             }
 
-            batch.execute();
+            indexProducer.put( batch.build()).subscribe();
+
         }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
index d1b076d..98652c1 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
@@ -18,6 +18,7 @@ package org.apache.usergrid.persistence.index;/*
  */
 
 
+import java.util.List;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
@@ -62,11 +63,13 @@ public interface EntityIndexBatch {
     EntityIndexBatch deindex( final SearchEdge searchEdge, final Id id, final UUID version );
 
 
+
+
     /**
-     * Execute the batch
+     * get the batches
      * @return future to guarantee execution
      */
-    Observable<IndexOperationMessage> execute();
+    IndexOperationMessage build();
 
     /**
      * Get the number of operations in the batch

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 64a1c6a..68830ca 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -19,6 +19,7 @@
 package org.apache.usergrid.persistence.index.impl;
 
 
+import java.util.List;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.index.*;
@@ -41,7 +42,6 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     private final IndexAlias alias;
 
     private final IndexLocationStrategy indexLocationStrategy;
-    private final IndexProducer indexBatchBufferProducer;
 
     private final EntityIndex entityIndex;
     private final ApplicationScope applicationScope;
@@ -49,12 +49,10 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
 
     public EsEntityIndexBatchImpl( final IndexLocationStrategy locationStrategy,
-                                   final IndexProducer indexBatchBufferProducer,
                                    final EntityIndex entityIndex
     ) {
         this.indexLocationStrategy = locationStrategy;
 
-        this.indexBatchBufferProducer = indexBatchBufferProducer;
         this.entityIndex = entityIndex;
         this.applicationScope = indexLocationStrategy.getApplicationScope();
 
@@ -122,8 +120,8 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
 
     @Override
-    public Observable execute() {
-        return indexBatchBufferProducer.put( container );
+    public IndexOperationMessage build() {
+        return container;
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
index b66fd40..c91057b 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
@@ -37,7 +37,6 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
     private final IndexFig config;
     private final IndexCache indexCache;
     private final EsProvider provider;
-    private final IndexProducer indexProducer;
     private final MetricsFactory metricsFactory;
     private final IndexRefreshCommand refreshCommand;
 
@@ -50,7 +49,6 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
                     config,
                     refreshCommand,
                     metricsFactory,
-                    indexProducer,
                     locationStrategy
                 );
                 index.initialize();
@@ -62,7 +60,6 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
     public EsEntityIndexFactoryImpl( final IndexFig indexFig,
                                      final IndexCache indexCache,
                                      final EsProvider provider,
-                                     final IndexProducer indexProducer,
                                      final MetricsFactory metricsFactory,
                                      final IndexRefreshCommand refreshCommand
 
@@ -70,7 +67,6 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
         this.config = indexFig;
         this.indexCache = indexCache;
         this.provider = provider;
-        this.indexProducer = indexProducer;
         this.metricsFactory = metricsFactory;
         this.refreshCommand = refreshCommand;
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 6317a69..f6ebce2 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -116,7 +116,6 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
     private final SearchRequestBuilderStrategyV2 searchRequestBuilderStrategyV2;
     private final int cursorTimeout;
     private final long queryTimeout;
-    private final IndexProducer indexBatchBufferProducer;
     private final FailureMonitorImpl failureMonitor;
     private final Timer aggregationTimer;
 
@@ -131,13 +130,11 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
                               final IndexFig indexFig,
                               final IndexRefreshCommand indexRefreshCommand,
                               final MetricsFactory metricsFactory,
-                              final IndexProducer indexBatchBufferProducer,
                               final IndexLocationStrategy indexLocationStrategy
     ) {
 
         this.indexFig = indexFig;
         this.indexLocationStrategy = indexLocationStrategy;
-        this.indexBatchBufferProducer = indexBatchBufferProducer;
         this.failureMonitor = new FailureMonitorImpl( indexFig, provider );
         this.esProvider = provider;
         this.indexRefreshCommand = indexRefreshCommand;
@@ -374,7 +371,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
     @Override
     public EntityIndexBatch createBatch() {
         EntityIndexBatch batch =
-            new EsEntityIndexBatchImpl(indexLocationStrategy , indexBatchBufferProducer, this );
+            new EsEntityIndexBatchImpl(indexLocationStrategy, this );
         return batch;
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index 6348a44..5243d5a 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -82,6 +82,9 @@ public class EntityIndexTest extends BaseIT {
     public IndexFig fig;
 
     @Inject
+    public IndexProducer indexProducer;
+
+    @Inject
     public CassandraFig cassandraFig;
 
     @Inject
@@ -137,7 +140,7 @@ public class EntityIndexTest extends BaseIT {
 
 
         batch.index( indexEdge, entity1 );
-        batch.execute().toBlocking().last();
+        indexProducer.put(batch.build()).subscribe();
 
 
         Entity entity2 = new Entity( entityType );
@@ -151,7 +154,7 @@ public class EntityIndexTest extends BaseIT {
 
 
         batch.index( indexEdge, entity2 );
-        batch.execute().toBlocking().last();
+        indexProducer.put(batch.build()).subscribe();;
 
         entityIndex.refreshAsync().toBlocking().first();
 
@@ -215,7 +218,7 @@ public class EntityIndexTest extends BaseIT {
 
                     EntityIndexBatch batch = entityIndex.createBatch();
                     insertJsonBlob( sampleJson, batch, entityType, indexEdge, size, 0 );
-                    batch.execute().toBlocking().last();
+                    indexProducer.put(batch.build()).subscribe();;
                 }
                 catch ( Exception e ) {
                     synchronized ( failTime ) {
@@ -290,7 +293,7 @@ public class EntityIndexTest extends BaseIT {
 
         EntityIndexBatch entityIndexBatch = entityIndex.createBatch();
         entityIndexBatch.deindex(searchEdge, crs.get(0));
-        entityIndexBatch.execute().toBlocking().last();
+        indexProducer.put(entityIndexBatch.build()).subscribe();
         entityIndex.refreshAsync().toBlocking().first();
 
         //Hilda Youn
@@ -306,7 +309,7 @@ public class EntityIndexTest extends BaseIT {
         });
         EntityIndexBatch batch = entityIndex.createBatch();
         insertJsonBlob(sampleJson, batch, entityType, indexEdge, max, startIndex);
-        batch.execute().toBlocking().last();
+        indexProducer.put(batch.build()).subscribe();;
         IndexRefreshCommandImpl.IndexRefreshCommandInfo info =  entityIndex.refreshAsync().toBlocking().first();
         long time = info.getExecutionTime();
         log.info("refresh took ms:" + time);
@@ -364,7 +367,7 @@ public class EntityIndexTest extends BaseIT {
         EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() );
         entity.setField( new UUIDField( IndexingUtils.ENTITY_ID_FIELDNAME, UUID.randomUUID() ) );
 
-        entityIndex.createBatch().index( searchEdge, entity ).execute().toBlocking().last();
+        indexProducer.put(entityIndex.createBatch().index( searchEdge, entity ).build()).subscribe();
         entityIndex.refreshAsync().toBlocking().first();
 
         CandidateResults candidateResults = entityIndex
@@ -373,7 +376,7 @@ public class EntityIndexTest extends BaseIT {
 
         EntityIndexBatch batch = entityIndex.createBatch();
         batch.deindex( searchEdge, entity );
-        batch.execute().toBlocking().last();
+        indexProducer.put(batch.build()).subscribe();;
         entityIndex.refreshAsync().toBlocking().first();
 
         candidateResults = entityIndex
@@ -410,7 +413,8 @@ public class EntityIndexTest extends BaseIT {
             entity[i].setField( new UUIDField( IndexingUtils.ENTITY_ID_FIELDNAME, entityUUID ) );
 
             //index the new entity. This is where the loop will be set to create like 100 entities.
-            entityIndex.createBatch().index( searchEdge, entity[i] ).execute().toBlocking().last();
+            indexProducer.put(entityIndex.createBatch().index( searchEdge, entity[i]  ).build()).subscribe();
+
         }
         entityIndex.refreshAsync().toBlocking().first();
 
@@ -541,16 +545,16 @@ public class EntityIndexTest extends BaseIT {
         EntityIndexBatch batch = entityIndex.createBatch();
 
         batch.index( indexSCope, user );
-        batch.execute().toBlocking().last();
+        indexProducer.put(batch.build()).subscribe();;
         entityIndex.refreshAsync().toBlocking().first();
 
         final String query = "where username = 'edanuff'";
 
         CandidateResults r = entityIndex.search( indexSCope, SearchTypes.fromTypes( "edanuff" ), query, 10, 0);
-        assertEquals( user.getId(), r.get( 0 ).getId() );
+        assertEquals( user.getId(), r.get( 0 ).getId());
 
         batch.deindex( indexSCope, user.getId(), user.getVersion() );
-        batch.execute().toBlocking().last();
+        indexProducer.put(batch.build()).subscribe();;
         entityIndex.refreshAsync().toBlocking().first();
 
         // EntityRef
@@ -611,7 +615,7 @@ public class EntityIndexTest extends BaseIT {
         EntityUtils.setVersion( fred, UUIDGenerator.newTimeUUID() );
         batch.index( indexScope, fred);
 
-        batch.execute().toBlocking().last();
+        indexProducer.put(batch.build()).subscribe();;
         entityIndex.refreshAsync().toBlocking().first();
 
         final SearchTypes searchTypes = SearchTypes.fromTypes( "user" );
@@ -681,14 +685,14 @@ public class EntityIndexTest extends BaseIT {
             EntityUtils.setId( user, userId );
             EntityUtils.setVersion( user, UUIDGenerator.newTimeUUID() );
 
-            entityIds.add( userId );
+            entityIds.add(userId );
 
 
             batch.index( indexEdge, user );
         }
 
 
-        batch.execute().toBlocking().last();
+        indexProducer.put(batch.build()).subscribe();;
 
         entityIndex.refreshAsync().toBlocking().first();
 
@@ -755,7 +759,7 @@ public class EntityIndexTest extends BaseIT {
         EntityIndexBatch batch = entityIndex.createBatch();
 
         batch.index( indexSCope, user );
-        batch.execute().toBlocking().last();
+        indexProducer.put(batch.build()).subscribe();;
         entityIndex.refreshAsync().toBlocking().first();
 
         final String query = "where searchUUID = " + searchUUID;
@@ -794,7 +798,7 @@ public class EntityIndexTest extends BaseIT {
         EntityIndexBatch batch = entityIndex.createBatch();
 
         batch.index(indexSCope, user);
-        batch.execute().toBlocking().last();
+        indexProducer.put(batch.build()).subscribe();;
         entityIndex.refreshAsync().toBlocking().first();
 
         final String query = "where string = 'I am*'";
@@ -849,9 +853,9 @@ public class EntityIndexTest extends BaseIT {
 
 
         EntityIndexBatch batch = entityIndex.createBatch();
-        batch.index( indexSCope, first );
+        batch.index(indexSCope, first );
         batch.index( indexSCope, second );
-        batch.execute().toBlocking().last();
+        indexProducer.put(batch.build()).subscribe();;
         entityIndex.refreshAsync().toBlocking().first();
 
 
@@ -916,7 +920,7 @@ public class EntityIndexTest extends BaseIT {
         batch.index(indexScope2, second);
 
 
-        batch.execute().toBlocking().last();
+        indexProducer.put(batch.build()).subscribe();;
         entityIndex.refreshAsync().toBlocking().first();
 
 
@@ -997,7 +1001,7 @@ public class EntityIndexTest extends BaseIT {
         batch.index( indexScope2, second);
 
 
-        batch.execute().toBlocking().last();
+        indexProducer.put(batch.build()).subscribe();;
         entityIndex.refreshAsync().toBlocking().first();
 
 
@@ -1107,7 +1111,7 @@ public class EntityIndexTest extends BaseIT {
         batch.index( indexScope2, second);
 
 
-        batch.execute().toBlocking().last();
+        indexProducer.put(batch.build()).subscribe();;
         entityIndex.refreshAsync().toBlocking().first();
 
 
@@ -1262,7 +1266,7 @@ public class EntityIndexTest extends BaseIT {
         batch.index( indexScope2, second);
 
 
-        batch.execute().toBlocking().last();
+        indexProducer.put(batch.build()).subscribe();;
         entityIndex.refreshAsync().toBlocking().first();
         long size = entityIndex.getEntitySize(new SearchEdgeImpl(ownerId,type, SearchEdge.NodeType.SOURCE));
         assertTrue( size == 100 );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
index dce69f9..98b85f1 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
@@ -70,6 +70,8 @@ public class GeoPagingTest extends BaseIT {
     @Inject
     public EntityIndexFactory eif;
 
+    @Inject
+    IndexProducer indexProducer;
 
     @Inject
     @Rule
@@ -128,7 +130,7 @@ public class GeoPagingTest extends BaseIT {
 
         }
 
-        batch.execute().toBlocking().last();
+        indexProducer.put( batch.build()).subscribe();
 
         entityIndex.refreshAsync().toBlocking().last();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
index cca3f0f..1be1195 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
@@ -92,7 +92,7 @@ public class IndexLoadTestsIT extends BaseIT {
     public IndexTestFig indexTestFig;
 
     @Inject
-    public EntityIndexFactory entityIndexFactory;
+    public IndexProducer indexProducer;
 
     @Inject
     public MetricsFactory metricsFactory;
@@ -347,7 +347,8 @@ public class IndexLoadTestsIT extends BaseIT {
 
 
                     //execute
-                    entityIndexBatch.execute();
+                    IndexOperationMessage message = entityIndexBatch.build();
+                    indexProducer.put(message);
                     //stop
                     time.close();
                 } ).toBlocking().last();


[26/36] usergrid git commit: add missing subscribe

Posted by sf...@apache.org.
add missing subscribe


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/0c4c1abf
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/0c4c1abf
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/0c4c1abf

Branch: refs/heads/master
Commit: 0c4c1abf3bcbdb1a62d1ec74d892698e7e211d40
Parents: a9dc6ba
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Sep 28 15:13:00 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Sep 28 15:13:00 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java          | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c4c1abf/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 82e6d19..5f681e7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -243,15 +243,14 @@ public class AmazonAsyncEventService implements AsyncEventService {
                 continue;
             }
 
-
             if (event instanceof EdgeDeleteEvent) {
-               merged = merged.mergeWith(callHandleIndex(queueMessage -> handleEdgeDelete(queueMessage), message));
+               merged = merged.mergeWith(callHandleIndex(message, queueMessage -> handleEdgeDelete(queueMessage)));
             } else if (event instanceof EdgeIndexEvent) {
-               merged = merged.mergeWith(callHandleIndex(queueMessage -> handleEdgeIndex(queueMessage),message));
+               merged = merged.mergeWith(callHandleIndex(message, queueMessage -> handleEdgeIndex(queueMessage)));
             } else if (event instanceof EntityDeleteEvent) {
-                merged = merged.mergeWith( callHandleIndex(queueMessage -> handleEntityDelete(queueMessage),message));
+                merged = merged.mergeWith( callHandleIndex(message, queueMessage -> handleEntityDelete(queueMessage)));
             } else if (event instanceof EntityIndexEvent) {
-                merged = merged.mergeWith(callHandleIndex(queueMessage -> handleEntityIndexUpdate(queueMessage),message));
+                merged = merged.mergeWith(callHandleIndex(message, queueMessage -> handleEntityIndexUpdate(queueMessage)));
             } else if (event instanceof InitializeApplicationIndexEvent) {
                 //does not return observable
                 handleInitializeApplicationIndex(message);
@@ -272,10 +271,11 @@ public class AmazonAsyncEventService implements AsyncEventService {
                 indexProducer.put(combined).subscribe();
                 return Observable.from(indexEventResults);
             })
-            .doOnNext(indexEventResult ->ack(indexEventResult.queueMessage));
+            .doOnNext(indexEventResult ->ack(indexEventResult.queueMessage))
+            .subscribe();
     }
 
-    private Observable<IndexEventResult> callHandleIndex(Func1<QueueMessage,Observable<IndexOperationMessage>> toCall, QueueMessage message){
+    private Observable<IndexEventResult> callHandleIndex(QueueMessage message, Func1<QueueMessage, Observable<IndexOperationMessage>> toCall){
         try{
             IndexOperationMessage indexOperationMessage =  toCall.call(message).toBlocking().lastOrDefault(null);
             return Observable.just(new IndexEventResult(message,Optional.fromNullable(indexOperationMessage),true));