You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/10/01 18:27:36 UTC
[01/36] usergrid git commit: fix time_to_live gcm
Repository: usergrid
Updated Branches:
refs/heads/master 60acf84a1 -> a56c8dddf
fix time_to_live gcm
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/32fcbb5b
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/32fcbb5b
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/32fcbb5b
Branch: refs/heads/master
Commit: 32fcbb5be966d464a40c644752827dc6996e507a
Parents: 0c12abb
Author: Shawn Feldman <sf...@apache.org>
Authored: Sat Sep 19 15:01:06 2015 -0400
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Sep 22 14:34:15 2015 -0400
----------------------------------------------------------------------
.../usergrid/services/notifications/gcm/GCMAdapter.java | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/32fcbb5b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
index bdd7737..ba404e8 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
@@ -74,7 +74,7 @@ public class GCMAdapter implements ProviderAdapter {
long ttlSeconds = notification.getExpireTTLSeconds();
// max ttl for gcm is 4 weeks - https://developers.google.com/cloud-messaging/http-server-ref
ttlSeconds = ttlSeconds <= 2419200 ? ttlSeconds : 2419200;
- map.put(expiresKey, ttlSeconds);
+ map.put(expiresKey, (int)ttlSeconds);//needs to be int
}
Batch batch = getBatch( map);
batch.add(providerId, tracker);
@@ -204,8 +204,13 @@ public class GCMAdapter implements ProviderAdapter {
Sender sender = new Sender(notifier.getApiKey());
Message.Builder builder = new Message.Builder();
builder.setData(payload);
+ if(payload.containsKey("time_to_live")){
+ int ttl = (int)payload.get("time_to_live");
+ builder.timeToLive(ttl);
+ }
Message message = builder.build();
+
MulticastResult multicastResult = sender.send(message, ids, SEND_RETRIES);
LOG.debug("sendNotification result: {}", multicastResult);
[11/36] usergrid git commit: upgrade was sdk
Posted by sf...@apache.org.
upgrade was sdk
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/da6afb1f
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/da6afb1f
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/da6afb1f
Branch: refs/heads/master
Commit: da6afb1fd3097acc0af68323b34a98d04a8d9012
Parents: 5218cda
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Sep 25 09:55:10 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Sep 25 09:55:10 2015 -0600
----------------------------------------------------------------------
stack/pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/da6afb1f/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index 40a56fa..dfa5045 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -97,7 +97,7 @@
<!-- =================================================================== -->
<amber-version>0.22-incubating</amber-version>
- <aws.version>1.10.6</aws.version>
+ <aws.version>1.10.20</aws.version>
<cassandra-version>1.2.18</cassandra-version>
<guava.version>18.0</guava.version>
<guice.version>4.0-beta5</guice.version>
[23/36] usergrid git commit: fix the ingest method
Posted by sf...@apache.org.
fix the ingest method
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/19ae15bd
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/19ae15bd
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/19ae15bd
Branch: refs/heads/master
Commit: 19ae15bdeb41af154a4770b600d106acdb71dc88
Parents: 3ed0848
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Sep 28 11:14:28 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Sep 28 11:14:28 2015 -0600
----------------------------------------------------------------------
.../corepersistence/asyncevents/AmazonAsyncEventService.java | 2 +-
.../usergrid/persistence/index/impl/IndexOperationMessage.java | 6 ++++--
2 files changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/19ae15bd/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index e1c6886..82e6d19 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -268,7 +268,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
.flatMap(indexEventResults -> {
IndexOperationMessage combined = new IndexOperationMessage();
Observable.from(indexEventResults)
- .doOnNext(indexEventResult -> combined.injest(indexEventResult.getIndexOperationMessage().get())).subscribe();
+ .doOnNext(indexEventResult -> combined.ingest(indexEventResult.getIndexOperationMessage().get())).subscribe();
indexProducer.put(combined).subscribe();
return Observable.from(indexEventResults);
})
http://git-wip-us.apache.org/repos/asf/usergrid/blob/19ae15bd/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
index 0676314..12df390 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.persistence.index.impl;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;
+import java.util.stream.Collectors;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.base.Optional;
@@ -108,7 +109,8 @@ public class IndexOperationMessage implements Serializable {
return creationTime;
}
- public void injest(IndexOperationMessage singleMessage) {
- si
+ public void ingest(IndexOperationMessage singleMessage) {
+ this.indexRequests.addAll(singleMessage.getIndexRequests().stream().collect(Collectors.toList()));
+ this.deIndexRequests.addAll(singleMessage.getDeIndexRequests().stream().collect(Collectors.toList()));
}
}
[20/36] usergrid git commit: remove properties
Posted by sf...@apache.org.
remove properties
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/a7700248
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/a7700248
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/a7700248
Branch: refs/heads/master
Commit: a770024818e478bceae39d8535e593cf56dcb2f4
Parents: e6c6c36
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Sep 25 17:33:35 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Sep 25 17:33:35 2015 -0600
----------------------------------------------------------------------
.../assets/data/AwsSdkS3BinaryStore.java | 42 ++++++++------------
1 file changed, 16 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/a7700248/stack/services/src/main/java/org/apache/usergrid/services/assets/data/AwsSdkS3BinaryStore.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/assets/data/AwsSdkS3BinaryStore.java b/stack/services/src/main/java/org/apache/usergrid/services/assets/data/AwsSdkS3BinaryStore.java
index 688e7bf..6cf5149 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/assets/data/AwsSdkS3BinaryStore.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/assets/data/AwsSdkS3BinaryStore.java
@@ -93,6 +93,9 @@ public class AwsSdkS3BinaryStore implements BinaryStore {
@Autowired
private EntityManagerFactory emf;
+ @Autowired
+ private Properties properties;
+
public AwsSdkS3BinaryStore( ) {
}
@@ -100,38 +103,25 @@ public class AwsSdkS3BinaryStore implements BinaryStore {
//ideally it should only do one. and the client should be initlized at the beginning of the run.
private AmazonS3 getS3Client() throws Exception{
- this.accessId = System.getProperty( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR );
- if(accessId == null){
- logger.error( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR + " not properly set so amazon access key is null" );
- throw new AwsPropertiesNotFoundException( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR );
-
- }
- this.secretKey = System.getProperty( SDKGlobalConfiguration.SECRET_KEY_ENV_VAR );
-
- if(secretKey == null){
- logger.error( SDKGlobalConfiguration.SECRET_KEY_ENV_VAR + " not properly set so amazon secret key is null" );
- throw new AwsPropertiesNotFoundException( SDKGlobalConfiguration.SECRET_KEY_ENV_VAR );
+ this.bucketName = properties.getProperty( "usergrid.binary.bucketname" );
+ if(bucketName == null){
+ logger.error( "usergrid.binary.bucketname not properly set so amazon bucket is null" );
+ throw new AwsPropertiesNotFoundException( "usergrid.binary.bucketname" );
- }
- this.bucketName = System.getProperty( "usergrid.binary.bucketname" );
- if(bucketName == null){
- logger.error( "usergrid.binary.bucketname not properly set so amazon bucket is null" );
- throw new AwsPropertiesNotFoundException( "usergrid.binary.bucketname" );
-
- }
+ }
- AWSCredentials credentials = new BasicAWSCredentials(accessId, secretKey);
- ClientConfiguration clientConfig = new ClientConfiguration();
- clientConfig.setProtocol(Protocol.HTTP);
+ final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
+ AWSCredentials credentials = ugProvider.getCredentials();
+ ClientConfiguration clientConfig = new ClientConfiguration();
+ clientConfig.setProtocol(Protocol.HTTP);
- s3Client = new AmazonS3Client(credentials, clientConfig);
- if(regionName != null)
- s3Client.setRegion( Region.getRegion(Regions.fromName(regionName)) );
+ s3Client = new AmazonS3Client(credentials, clientConfig);
+ if(regionName != null)
+ s3Client.setRegion( Region.getRegion(Regions.fromName(regionName)) );
return s3Client;
}
-
@Override
public void write( final UUID appId, final Entity entity, InputStream inputStream ) throws Exception {
@@ -186,7 +176,7 @@ public class AwsSdkS3BinaryStore implements BinaryStore {
// determine max size file allowed, default to 50mb
long maxSizeBytes = 50 * FileUtils.ONE_MB;
- String maxSizeMbString = System.getProperty( "usergrid.binary.max-size-mb", "50" );
+ String maxSizeMbString = properties.getProperty( "usergrid.binary.max-size-mb", "50" );
if ( StringUtils.isNumeric( maxSizeMbString )) {
maxSizeBytes = Long.parseLong( maxSizeMbString ) * FileUtils.ONE_MB;
}
[24/36] usergrid git commit: test fix
Posted by sf...@apache.org.
test fix
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/0428cd63
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/0428cd63
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/0428cd63
Branch: refs/heads/master
Commit: 0428cd632f58c08bfcd9dca489f1f746558e3ee4
Parents: 19ae15b
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Sep 28 11:31:27 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Sep 28 11:31:27 2015 -0600
----------------------------------------------------------------------
.../corepersistence/index/InMemoryAsycIndexServiceTest.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0428cd63/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
index 77d7cab..4666b4c 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.corepersistence.index;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.junit.Rule;
import org.junit.runner.RunWith;
@@ -53,9 +54,11 @@ public class InMemoryAsycIndexServiceTest extends AsyncIndexServiceTest {
public RxTaskScheduler rxTaskScheduler;
+ @Inject
+ public IndexProducer indexProducer;
@Override
protected AsyncEventService getAsyncEventService() {
- return new InMemoryAsyncEventService( eventBuilder, rxTaskScheduler, false );
+ return new InMemoryAsyncEventService( eventBuilder, rxTaskScheduler,indexProducer, false );
}
[16/36] usergrid git commit: remove system properities
Posted by sf...@apache.org.
remove system properities
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/cd623e60
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/cd623e60
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/cd623e60
Branch: refs/heads/master
Commit: cd623e60d1257bdd132f43f9b6ea1834172e3f81
Parents: 5233567
Author: Shawn Feldman <sh...@gmail.com>
Authored: Fri Sep 25 13:47:02 2015 -0600
Committer: Shawn Feldman <sh...@gmail.com>
Committed: Fri Sep 25 13:47:02 2015 -0600
----------------------------------------------------------------------
.../services/assets/data/AwsSdkS3BinaryStore.java | 11 ++++-------
1 file changed, 4 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/cd623e60/stack/services/src/main/java/org/apache/usergrid/services/assets/data/AwsSdkS3BinaryStore.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/assets/data/AwsSdkS3BinaryStore.java b/stack/services/src/main/java/org/apache/usergrid/services/assets/data/AwsSdkS3BinaryStore.java
index a0fd361..688e7bf 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/assets/data/AwsSdkS3BinaryStore.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/assets/data/AwsSdkS3BinaryStore.java
@@ -93,9 +93,6 @@ public class AwsSdkS3BinaryStore implements BinaryStore {
@Autowired
private EntityManagerFactory emf;
- @Autowired
- private Properties properties;
-
public AwsSdkS3BinaryStore( ) {
}
@@ -103,20 +100,20 @@ public class AwsSdkS3BinaryStore implements BinaryStore {
//ideally it should only do one. and the client should be initlized at the beginning of the run.
private AmazonS3 getS3Client() throws Exception{
- this.accessId = properties.getProperty( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR );
+ this.accessId = System.getProperty( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR );
if(accessId == null){
logger.error( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR + " not properly set so amazon access key is null" );
throw new AwsPropertiesNotFoundException( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR );
}
- this.secretKey = properties.getProperty( SDKGlobalConfiguration.SECRET_KEY_ENV_VAR );
+ this.secretKey = System.getProperty( SDKGlobalConfiguration.SECRET_KEY_ENV_VAR );
if(secretKey == null){
logger.error( SDKGlobalConfiguration.SECRET_KEY_ENV_VAR + " not properly set so amazon secret key is null" );
throw new AwsPropertiesNotFoundException( SDKGlobalConfiguration.SECRET_KEY_ENV_VAR );
}
- this.bucketName = properties.getProperty( "usergrid.binary.bucketname" );
+ this.bucketName = System.getProperty( "usergrid.binary.bucketname" );
if(bucketName == null){
logger.error( "usergrid.binary.bucketname not properly set so amazon bucket is null" );
throw new AwsPropertiesNotFoundException( "usergrid.binary.bucketname" );
@@ -189,7 +186,7 @@ public class AwsSdkS3BinaryStore implements BinaryStore {
// determine max size file allowed, default to 50mb
long maxSizeBytes = 50 * FileUtils.ONE_MB;
- String maxSizeMbString = properties.getProperty( "usergrid.binary.max-size-mb", "50" );
+ String maxSizeMbString = System.getProperty( "usergrid.binary.max-size-mb", "50" );
if ( StringUtils.isNumeric( maxSizeMbString )) {
maxSizeBytes = Long.parseLong( maxSizeMbString ) * FileUtils.ONE_MB;
}
[04/36] usergrid git commit: remove batch consumer
Posted by sf...@apache.org.
remove batch consumer
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/76816007
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/76816007
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/76816007
Branch: refs/heads/master
Commit: 76816007f34317e54659aaad3ac5c1bf59e8580d
Parents: ce5a96f
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Sep 15 13:11:12 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Sep 24 15:00:13 2015 -0600
----------------------------------------------------------------------
.../usergrid/persistence/index/IndexFig.java | 14 ----
.../index/impl/EsIndexBufferConsumerImpl.java | 87 +++++++++-----------
.../index/impl/IndexBufferConsumer.java | 2 +
.../index/impl/IndexRefreshCommandImpl.java | 1 +
4 files changed, 44 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/76816007/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index 5997029..db1ef3d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -153,21 +153,7 @@ public interface IndexFig extends GuicyFig {
@Default( "2" )
int getIndexCacheMaxWorkers();
- /**
- * The maximum time to wait before the buffer flushes and sends index write requests to Elasticsearch.
- * This is used so the application doesn't wait forever for the buffer to reach its size before writing
- * data to Elasticsearch.
- */
- @Default( "250" )
- @Key( INDEX_BUFFER_TIMEOUT )
- long getIndexBufferTimeout();
- /**
- * The maximum buffer size to use before sending index write requests to Elasticsearch.
- */
- @Default( "1000" )
- @Key( INDEX_BUFFER_SIZE )
- int getIndexBufferSize();
/**
* The number of worker threads used for flushing batches of index write requests
http://git-wip-us.apache.org/repos/asf/usergrid/blob/76816007/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 14f2b6f..91fab2f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -100,12 +100,12 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
public Observable<IndexOperationMessage> put( IndexOperationMessage message ) {
Preconditions.checkNotNull(message, "Message cannot be null");
- indexSizeCounter.inc( message.getDeIndexRequests().size() );
- indexSizeCounter.inc( message.getIndexRequests().size() );
+ indexSizeCounter.inc(message.getDeIndexRequests().size());
+ indexSizeCounter.inc(message.getIndexRequests().size());
Timer.Context time = offerTimer.time();
- bufferProducer.send( message );
+ bufferProducer.send(message);
time.stop();
- return message.observable();
+ return message.observable();
}
@@ -114,22 +114,20 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
*/
private void startSubscription() {
-
+ //buffer on our new thread with a timeout
final Observable<IndexOperationMessage> observable = Observable.create(bufferProducer);
- //buffer on our new thread with a timeout
- observable.buffer( indexFig.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, indexFig.getIndexBufferSize(),
- Schedulers.io() ).flatMap( indexOpBuffer -> {
+ observable.subscribeOn(Schedulers.io()).flatMap(indexOpBuffer -> {
//hand off to processor in new observable thread so we can continue to buffer faster
- return Observable.just( indexOpBuffer ).flatMap(
- indexOpBufferObservable -> ObservableTimer.time( processBatch( indexOpBufferObservable ),flushTimer )
+ return Observable.just(indexOpBuffer).flatMap(
+ indexOpBufferObservable -> ObservableTimer.time(processBatch(indexOpBufferObservable), flushTimer)
)
//use the I/O scheduler for thread re-use and efficiency in context switching then use our concurrent
// flatmap count or higher throughput of batches once buffered
- .subscribeOn( Schedulers.io() );
- }, indexFig.getIndexFlushWorkerCount() )
+ .subscribeOn(Schedulers.io());
+ }, indexFig.getIndexFlushWorkerCount())
//start in the background
.subscribe();
}
@@ -137,63 +135,60 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
/**
* Process the buffer of batches
- * @param batches
+ * @param batch
* @return
*/
- private Observable<IndexOperationMessage> processBatch( final List<IndexOperationMessage> batches ) {
+ private Observable<IndexOperationMessage> processBatch( final IndexOperationMessage batch ) {
- final Observable<IndexOperationMessage> indexOps = Observable.from( batches );
-
//take our stream of batches, then stream then into individual ops for consumption on ES
- final Observable<BatchOperation> batchOps = indexOps.flatMap( batch -> {
- final Set<IndexOperation> indexOperationSet = batch.getIndexRequests();
- final Set<DeIndexOperation> deIndexOperationSet = batch.getDeIndexRequests();
- final int indexOperationSetSize = indexOperationSet.size();
- final int deIndexOperationSetSize = deIndexOperationSet.size();
+ final Set<IndexOperation> indexOperationSet = batch.getIndexRequests();
+ final Set<DeIndexOperation> deIndexOperationSet = batch.getDeIndexRequests();
+
+ final int indexOperationSetSize = indexOperationSet.size();
+ final int deIndexOperationSetSize = deIndexOperationSet.size();
- log.debug( "Emitting {} add and {} remove operations", indexOperationSetSize, deIndexOperationSetSize );
+ log.debug("Emitting {} add and {} remove operations", indexOperationSetSize, deIndexOperationSetSize);
- indexSizeCounter.dec( indexOperationSetSize );
- indexSizeCounter.dec( deIndexOperationSetSize );
+ indexSizeCounter.dec(indexOperationSetSize);
+ indexSizeCounter.dec(deIndexOperationSetSize);
- final Observable<IndexOperation> index = Observable.from( batch.getIndexRequests() );
- final Observable<DeIndexOperation> deIndex = Observable.from( batch.getDeIndexRequests() );
+ final Observable<IndexOperation> index = Observable.from(batch.getIndexRequests());
+ final Observable<DeIndexOperation> deIndex = Observable.from(batch.getDeIndexRequests());
- return Observable.merge( index, deIndex );
- } );
+ final Observable<BatchOperation> batchOps = Observable.merge(index, deIndex);
//buffer into the max size we can send ES and fire them all off until we're completed
- final Observable<BulkRequestBuilder> requests = batchOps.buffer( indexFig.getIndexBatchSize() )
+ final Observable<BulkRequestBuilder> requests = batchOps.buffer(indexFig.getIndexBatchSize())
//flatten the buffer into a single batch execution
- .flatMap( individualOps -> Observable.from( individualOps )
+ .flatMap(individualOps -> Observable.from(individualOps)
//collect them
- .collect( () -> initRequest(), ( bulkRequestBuilder, batchOperation ) -> {
- log.debug( "adding operation {} to bulkRequestBuilder {}", batchOperation, bulkRequestBuilder );
- batchOperation.doOperation( client, bulkRequestBuilder );
- } ) )
+ .collect(() -> initRequest(), (bulkRequestBuilder, batchOperation) -> {
+ log.debug("adding operation {} to bulkRequestBuilder {}", batchOperation, bulkRequestBuilder);
+ batchOperation.doOperation(client, bulkRequestBuilder);
+ }))
//write them
- .doOnNext( bulkRequestBuilder -> sendRequest( bulkRequestBuilder ) );
+ .doOnNext(bulkRequestBuilder -> sendRequest(bulkRequestBuilder));
//now that we've processed them all, ack the futures after our last batch comes through
final Observable<IndexOperationMessage> processedIndexOperations =
- requests.lastOrDefault(null).flatMap( lastRequest ->{
- if(lastRequest!=null){
- return Observable.from( batches ) ;
- }else{
+ requests.lastOrDefault(null).flatMap(lastRequest -> {
+ if (lastRequest != null) {
+ return Observable.just(batch);
+ } else {
return Observable.empty();
}
});
//subscribe to the operations that generate requests on a new thread so that we can execute them quickly
//mark this as done
- return processedIndexOperations.doOnNext( processedIndexOp -> {
- processedIndexOp.done();
- roundtripTimer.update( System.currentTimeMillis() - processedIndexOp.getCreationTime() );
- } );
+ return processedIndexOperations.doOnNext(processedIndexOp -> {
+ processedIndexOp.done();
+ roundtripTimer.update(System.currentTimeMillis() - processedIndexOp.getCreationTime());
+ });
}
@@ -266,12 +261,12 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
/**
* Send the data through the buffer
*/
- public void send( final IndexOperationMessage indexOp ) {
+ public void send( final IndexOperationMessage indexOps ) {
try {
- subscriber.onNext( indexOp );
+ subscriber.onNext( indexOps );
}catch(Exception e){
//re-throws so the caller can determine failover
- log.error( "Unable to process message for indexOp {}, error follows.", indexOp, e );
+ log.error( "Unable to process message for indexOp {}, error follows.", indexOps, e );
throw e;
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/76816007/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
index e769455..df2119c 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
@@ -20,6 +20,8 @@ package org.apache.usergrid.persistence.index.impl;
import rx.Observable;
+import java.util.List;
+
/**
* Buffer index requests
http://git-wip-us.apache.org/repos/asf/usergrid/blob/76816007/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
index 70220d0..7b9bc5d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.persistence.index.impl;
+import java.util.Collections;
import java.util.Map;
import java.util.UUID;
[22/36] usergrid git commit: index will merge all batches
Posted by sf...@apache.org.
index will merge all batches
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/3ed08483
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/3ed08483
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/3ed08483
Branch: refs/heads/master
Commit: 3ed0848352e2507ad1c3ba886c433f95657a5c24
Parents: 4c263b8
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Sep 28 11:01:19 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Sep 28 11:01:19 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 126 +++++++++++++------
.../asyncevents/AsyncIndexProvider.java | 12 +-
.../asyncevents/InMemoryAsyncEventService.java | 16 ++-
.../index/AmazonAsyncEventServiceTest.java | 5 +-
.../index/impl/IndexOperationMessage.java | 5 +
5 files changed, 115 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ed08483/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index fcb93c9..e1c6886 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -25,6 +25,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.base.Optional;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,6 +71,7 @@ import com.google.inject.Singleton;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
+import rx.functions.Func1;
import rx.schedulers.Schedulers;
@@ -85,6 +88,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
private final QueueManager queue;
private final QueueScope queueScope;
private final IndexProcessorFig indexProcessorFig;
+ private final IndexProducer indexProducer;
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
private final IndexLocationStrategyFactory indexLocationStrategyFactory;
private final EntityIndexFactory entityIndexFactory;
@@ -110,11 +114,16 @@ public class AmazonAsyncEventService implements AsyncEventService {
@Inject
- public AmazonAsyncEventService( final QueueManagerFactory queueManagerFactory, final IndexProcessorFig indexProcessorFig,
- final MetricsFactory metricsFactory, final EntityCollectionManagerFactory entityCollectionManagerFactory,
- final IndexLocationStrategyFactory indexLocationStrategyFactory, final EntityIndexFactory entityIndexFactory,
+ public AmazonAsyncEventService( final QueueManagerFactory queueManagerFactory,
+ final IndexProcessorFig indexProcessorFig,
+ final IndexProducer indexProducer,
+ final MetricsFactory metricsFactory,
+ final EntityCollectionManagerFactory entityCollectionManagerFactory,
+ final IndexLocationStrategyFactory indexLocationStrategyFactory,
+ final EntityIndexFactory entityIndexFactory,
final EventBuilder eventBuilder,
final RxTaskScheduler rxTaskScheduler ) {
+ this.indexProducer = indexProducer;
this.entityCollectionManagerFactory = entityCollectionManagerFactory;
this.indexLocationStrategyFactory = indexLocationStrategyFactory;
@@ -219,43 +228,60 @@ public class AmazonAsyncEventService implements AsyncEventService {
private void handleMessages( final List<QueueMessage> messages ) {
- if ( logger.isDebugEnabled() ) {
- logger.debug( "handleMessages with {} message", messages.size() );
+ if (logger.isDebugEnabled()) {
+ logger.debug("handleMessages with {} message", messages.size());
}
- for ( QueueMessage message : messages ) {
- final AsyncEvent event = ( AsyncEvent ) message.getBody();
+ Observable<IndexEventResult> merged = Observable.empty();
+ for (QueueMessage message : messages) {
+ final AsyncEvent event = (AsyncEvent) message.getBody();
- logger.debug( "Processing {} event", event );
+ logger.debug("Processing {} event", event);
- if ( event == null ) {
- logger.error( "AsyncEvent type or event is null!" );
+ if (event == null) {
+ logger.error("AsyncEvent type or event is null!");
continue;
}
- if ( event instanceof EdgeDeleteEvent ) {
- handleEdgeDelete( message );
- }
- else if ( event instanceof EdgeIndexEvent ) {
- handleEdgeIndex( message );
+ if (event instanceof EdgeDeleteEvent) {
+ merged = merged.mergeWith(callHandleIndex(queueMessage -> handleEdgeDelete(queueMessage), message));
+ } else if (event instanceof EdgeIndexEvent) {
+ merged = merged.mergeWith(callHandleIndex(queueMessage -> handleEdgeIndex(queueMessage),message));
+ } else if (event instanceof EntityDeleteEvent) {
+ merged = merged.mergeWith( callHandleIndex(queueMessage -> handleEntityDelete(queueMessage),message));
+ } else if (event instanceof EntityIndexEvent) {
+ merged = merged.mergeWith(callHandleIndex(queueMessage -> handleEntityIndexUpdate(queueMessage),message));
+ } else if (event instanceof InitializeApplicationIndexEvent) {
+ //does not return observable
+ handleInitializeApplicationIndex(message);
+ } else {
+ logger.error("Unknown EventType: {}", event);
}
- else if ( event instanceof EntityDeleteEvent ) {
- handleEntityDelete( message );
- }
- else if ( event instanceof EntityIndexEvent ) {
- handleEntityIndexUpdate( message );
- }
+ messageCycle.update(System.currentTimeMillis() - event.getCreationTime());
+ }
- else if ( event instanceof InitializeApplicationIndexEvent ) {
- handleInitializeApplicationIndex( message );
- }
- else {
- logger.error( "Unknown EventType: {}", event );
- }
+ merged
+ .filter(indexEventResult -> indexEventResult.success() && indexEventResult.getIndexOperationMessage().isPresent())
+ .buffer(MAX_TAKE)
+ .flatMap(indexEventResults -> {
+ IndexOperationMessage combined = new IndexOperationMessage();
+ Observable.from(indexEventResults)
+ .doOnNext(indexEventResult -> combined.injest(indexEventResult.getIndexOperationMessage().get())).subscribe();
+ indexProducer.put(combined).subscribe();
+ return Observable.from(indexEventResults);
+ })
+ .doOnNext(indexEventResult ->ack(indexEventResult.queueMessage));
+ }
- messageCycle.update( System.currentTimeMillis() - event.getCreationTime() );
+ private Observable<IndexEventResult> callHandleIndex(Func1<QueueMessage,Observable<IndexOperationMessage>> toCall, QueueMessage message){
+ try{
+ IndexOperationMessage indexOperationMessage = toCall.call(message).toBlocking().lastOrDefault(null);
+ return Observable.just(new IndexEventResult(message,Optional.fromNullable(indexOperationMessage),true));
+ }catch (Exception e){
+ logger.error("failed to run index",e);
+ return Observable.just( new IndexEventResult(message, Optional.<IndexOperationMessage>absent(),false));
}
}
@@ -276,7 +302,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
}
- public void handleEntityIndexUpdate(final QueueMessage message) {
+ public Observable<IndexOperationMessage> handleEntityIndexUpdate(final QueueMessage message) {
Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEntityIndexUpdate" );
@@ -298,8 +324,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entityId, updatedAfter);
final Observable<IndexOperationMessage> observable = eventBuilder.buildEntityIndex( entityIndexOperation );
-
- subscribeAndAck( observable, message );
+ return observable;
}
@@ -313,7 +338,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
offer( operation );
}
- public void handleEdgeIndex(final QueueMessage message) {
+ public Observable<IndexOperationMessage> handleEdgeIndex(final QueueMessage message) {
Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEdgeIndex");
@@ -333,8 +358,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(edgeIndexEvent.getEntityId()).flatMap( entity -> eventBuilder.buildNewEdge(
applicationScope, entity, edge ) );
-
- subscribeAndAck( edgeIndexObservable, message );
+ return edgeIndexObservable;
}
@Override
@@ -344,7 +368,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
offer( new EdgeDeleteEvent( applicationScope, edge ) );
}
- public void handleEdgeDelete(final QueueMessage message) {
+ public Observable<IndexOperationMessage> handleEdgeDelete(final QueueMessage message) {
Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEdgeDelete");
@@ -362,8 +386,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
if (logger.isDebugEnabled()) logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
final Observable<IndexOperationMessage> observable = eventBuilder.buildDeleteEdge( applicationScope, edge );
-
- subscribeAndAck( observable, message );
+ return observable;
}
@@ -378,7 +401,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
return queue.getQueueDepth();
}
- public void handleEntityDelete(final QueueMessage message) {
+ public Observable<IndexOperationMessage> handleEntityDelete(final QueueMessage message) {
Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete");
@@ -401,8 +424,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
final Observable merged = Observable.merge( entityDeleteResults.getEntitiesCompacted(),
entityDeleteResults.getIndexObservable() );
-
- subscribeAndAck( merged, message );
+ return merged;
}
@@ -526,4 +548,28 @@ public class AmazonAsyncEventService implements AsyncEventService {
private void subscribeAndAck( final Observable<?> observable, final QueueMessage message ){
observable.doOnCompleted( ()-> ack(message) ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
}
+ public static class IndexEventResult{
+ private final QueueMessage queueMessage;
+ private final Optional<IndexOperationMessage> indexOperationMessage;
+ private final boolean success;
+
+ public IndexEventResult(QueueMessage queueMessage, Optional<IndexOperationMessage> indexOperationMessage ,boolean success){
+
+ this.queueMessage = queueMessage;
+ this.indexOperationMessage = indexOperationMessage;
+ this.success = success;
+ }
+
+ public QueueMessage getQueueMessage() {
+ return queueMessage;
+ }
+
+ public boolean success() {
+ return success;
+ }
+
+ public Optional<IndexOperationMessage> getIndexOperationMessage() {
+ return indexOperationMessage;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ed08483/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 0e773cf..e9e36f0 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -27,6 +27,7 @@ import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.apache.usergrid.persistence.queue.QueueManagerFactory;
import com.google.inject.Inject;
@@ -49,6 +50,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
private final EventBuilder eventBuilder;
private final IndexLocationStrategyFactory indexLocationStrategyFactory;
private final EntityIndexFactory entityIndexFactory;
+ private final IndexProducer indexProducer;
private AsyncEventService asyncEventService;
@@ -61,7 +63,8 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
final EntityCollectionManagerFactory entityCollectionManagerFactory,
final EventBuilder eventBuilder,
final IndexLocationStrategyFactory indexLocationStrategyFactory,
- final EntityIndexFactory entityIndexFactory) {
+ final EntityIndexFactory entityIndexFactory,
+ final IndexProducer indexProducer) {
this.indexProcessorFig = indexProcessorFig;
this.queueManagerFactory = queueManagerFactory;
@@ -71,6 +74,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
this.eventBuilder = eventBuilder;
this.indexLocationStrategyFactory = indexLocationStrategyFactory;
this.entityIndexFactory = entityIndexFactory;
+ this.indexProducer = indexProducer;
}
@@ -92,12 +96,12 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
switch (impl) {
case LOCAL:
- return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProcessorFig.resolveSynchronously());
+ return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProducer,indexProcessorFig.resolveSynchronously());
case SQS:
- return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory,
+ return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler );
case SNS:
- return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory,
+ return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler );
default:
throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ed08483/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index 6a71b3e..fad6e48 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -20,6 +20,8 @@
package org.apache.usergrid.corepersistence.asyncevents;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,14 +50,19 @@ public class InMemoryAsyncEventService implements AsyncEventService {
private final EventBuilder eventBuilder;
private final RxTaskScheduler rxTaskScheduler;
+ private final IndexProducer indexProducer;
private final boolean resolveSynchronously;
@Inject
- public InMemoryAsyncEventService( final EventBuilder eventBuilder, final RxTaskScheduler rxTaskScheduler, boolean
- resolveSynchronously ) {
+ public InMemoryAsyncEventService( final EventBuilder eventBuilder,
+ final RxTaskScheduler rxTaskScheduler,
+ final IndexProducer indexProducer,
+ boolean resolveSynchronously
+ ) {
this.eventBuilder = eventBuilder;
this.rxTaskScheduler = rxTaskScheduler;
+ this.indexProducer = indexProducer;
this.resolveSynchronously = resolveSynchronously;
}
@@ -117,12 +124,13 @@ public class InMemoryAsyncEventService implements AsyncEventService {
}
public void run( Observable<?> observable ) {
+ Observable mapped = observable.map(message -> message instanceof IndexOperationMessage ? indexProducer.put((IndexOperationMessage)message) : Observable.just(message));
//start it in the background on an i/o thread
if ( !resolveSynchronously ) {
- observable.subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
+ mapped.subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).subscribe();
}
else {
- observable.toBlocking().lastOrDefault(null);
+ mapped.subscribe();
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ed08483/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
index 4660389..a14437c 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.corepersistence.index;
import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.junit.Rule;
import org.junit.runner.RunWith;
@@ -72,6 +73,8 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {
@Inject
public EventBuilder eventBuilder;
+ @Inject
+ public IndexProducer indexProducer;
@Inject
public IndexLocationStrategyFactory indexLocationStrategyFactory;
@@ -82,7 +85,7 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {
@Override
protected AsyncEventService getAsyncEventService() {
- return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler );
+ return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ed08483/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
index bd2bec8..0676314 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.Set;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.base.Optional;
/**
@@ -106,4 +107,8 @@ public class IndexOperationMessage implements Serializable {
public long getCreationTime() {
return creationTime;
}
+
+ public void injest(IndexOperationMessage singleMessage) {
+ si
+ }
}
[29/36] usergrid git commit: remove job scheduler log
Posted by sf...@apache.org.
remove job scheduler log
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/4e98ed79
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/4e98ed79
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/4e98ed79
Branch: refs/heads/master
Commit: 4e98ed7935d380009269255497e4431dddbb9497
Parents: a770024
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Sep 28 16:13:40 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Sep 28 16:37:25 2015 -0600
----------------------------------------------------------------------
.../org/apache/usergrid/batch/service/JobSchedulerService.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e98ed79/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java b/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java
index d76be6f..284e1db 100644
--- a/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java
@@ -131,7 +131,7 @@ public class JobSchedulerService extends AbstractScheduledService {
}
}
catch ( Throwable t ) {
- LOG.error( "Scheduler run failed, error is", t );
+ LOG.debug( "Scheduler run failed, error is", t );
}
}
[31/36] usergrid git commit: Merge commit 'refs/pull/391/head' of
github.com:apache/usergrid into 2.1-release
Posted by sf...@apache.org.
Merge commit 'refs/pull/391/head' of github.com:apache/usergrid into 2.1-release
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/6626b83e
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/6626b83e
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/6626b83e
Branch: refs/heads/master
Commit: 6626b83e35fda96d2601581affcf2b920df7a322
Parents: 4e98ed7 c19adb8
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Sep 29 10:09:14 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Sep 29 10:09:14 2015 -0600
----------------------------------------------------------------------
.../corepersistence/CpManagerCache.java | 12 +-
.../corepersistence/CpRelationManager.java | 4 +-
.../usergrid/corepersistence/ManagerCache.java | 7 +
.../asyncevents/AmazonAsyncEventService.java | 151 +++++++++++++------
.../asyncevents/AsyncIndexProvider.java | 12 +-
.../asyncevents/InMemoryAsyncEventService.java | 26 +++-
.../corepersistence/index/IndexServiceImpl.java | 8 +-
.../read/search/CandidateEntityFilter.java | 16 +-
.../pipeline/read/search/CandidateIdFilter.java | 16 +-
.../index/AmazonAsyncEventServiceTest.java | 5 +-
.../index/InMemoryAsycIndexServiceTest.java | 5 +-
.../corepersistence/index/IndexServiceTest.java | 18 ++-
.../persistence/index/EntityIndexBatch.java | 7 +-
.../index/impl/EsEntityIndexBatchImpl.java | 8 +-
.../index/impl/EsEntityIndexFactoryImpl.java | 4 -
.../index/impl/EsEntityIndexImpl.java | 5 +-
.../index/impl/IndexOperationMessage.java | 7 +
.../index/impl/IndexRefreshCommandImpl.java | 2 +-
.../persistence/index/impl/EntityIndexTest.java | 48 +++---
.../persistence/index/impl/GeoPagingTest.java | 4 +-
.../index/impl/IndexLoadTestsIT.java | 5 +-
.../exceptions/AbstractExceptionMapper.java | 2 +-
22 files changed, 254 insertions(+), 118 deletions(-)
----------------------------------------------------------------------
[35/36] usergrid git commit: merge from 2.1-release
Posted by sf...@apache.org.
merge from 2.1-release
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/bce1359b
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/bce1359b
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/bce1359b
Branch: refs/heads/master
Commit: bce1359b8874406e9d03da03eea4fe19e91dafcd
Parents: 3b838c8 9ceae77
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Oct 1 09:59:48 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Oct 1 09:59:48 2015 -0600
----------------------------------------------------------------------
stack/awscluster/gatling-cluster-cf.json | 2 +-
.../batch/service/JobSchedulerService.java | 2 +-
.../corepersistence/CpManagerCache.java | 12 +-
.../usergrid/corepersistence/ManagerCache.java | 7 +
.../asyncevents/AmazonAsyncEventService.java | 151 +++++++++++++------
.../asyncevents/AsyncIndexProvider.java | 12 +-
.../asyncevents/InMemoryAsyncEventService.java | 26 +++-
.../corepersistence/index/IndexServiceImpl.java | 22 +--
.../read/search/CandidateEntityFilter.java | 21 +--
.../pipeline/read/search/CandidateIdFilter.java | 15 +-
.../index/AmazonAsyncEventServiceTest.java | 5 +-
.../index/InMemoryAsycIndexServiceTest.java | 5 +-
.../corepersistence/index/IndexServiceTest.java | 18 ++-
.../persistence/index/EntityIndexBatch.java | 8 +-
.../index/impl/EsEntityIndexBatchImpl.java | 6 +-
.../index/impl/EsEntityIndexFactoryImpl.java | 4 -
.../index/impl/EsEntityIndexImpl.java | 5 +-
.../index/impl/EsIndexProducerImpl.java | 39 +++--
.../index/impl/IndexOperationMessage.java | 7 +
.../index/impl/IndexRefreshCommandImpl.java | 2 +-
.../persistence/index/impl/EntityIndexTest.java | 54 +++----
.../persistence/index/impl/GeoPagingTest.java | 3 +-
.../index/impl/IndexLoadTestsIT.java | 9 +-
stack/pom.xml | 2 +-
.../exceptions/AbstractExceptionMapper.java | 2 +-
stack/rest_integration_tests/lib/entities.js | 41 +++++
.../test/entities/create.js | 24 ++-
.../management/AppInfoMigrationPlugin.java | 11 ++
.../assets/data/AwsSdkS3BinaryStore.java | 37 ++---
29 files changed, 360 insertions(+), 192 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/bce1359b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
index ac2962a,ceb18ae..551d083
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
@@@ -177,7 -178,8 +178,7 @@@ public class CandidateEntityFilter exte
validate( candidateResult );
}
- producer.put(batch).subscribe();
+ indexProducer.put(batch.build()).subscribe();
-
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/bce1359b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/bce1359b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 3679887,68830ca..1ffcd02
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@@ -120,8 -118,9 +118,8 @@@ public class EsEntityIndexBatchImpl imp
return deindex( searchEdge, entity.getId(), entity.getVersion() );
}
-
@Override
- public IndexOperationMessage build(){
+ public IndexOperationMessage build() {
return container;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/bce1359b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
index 9223293,2b36fc8..b784c2d
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
@@@ -24,7 -22,6 +22,9 @@@ import java.util.Set
import java.util.concurrent.atomic.AtomicLong;
import com.codahale.metrics.Histogram;
++
++
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@@ -88,14 -85,8 +88,13 @@@ public class EsIndexProducerImpl implem
}
- public Observable<IndexOperationMessage> put( EntityIndexBatch batch ) {
- Preconditions.checkNotNull(batch, "Batch cannot be null");
- return put(batch.build());
++ @Override
++ public Observable<IndexOperationMessage> put(EntityIndexBatch message) {
++ return put(message.build());
+ }
+
public Observable<IndexOperationMessage> put( IndexOperationMessage message ) {
-
- Preconditions.checkNotNull(message, "Message cannot be null");
+ Preconditions.checkNotNull(message, "Message cannot be null");
indexSizeCounter.inc(message.getDeIndexRequests().size());
indexSizeCounter.inc(message.getIndexRequests().size());
return processBatch(message);
http://git-wip-us.apache.org/repos/asf/usergrid/blob/bce1359b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index b8136fa,5243d5a..3a91f12
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@@ -135,11 -136,11 +136,11 @@@ public class EntityIndexTest extends Ba
entity1.setField(new UUIDField(IndexingUtils.ENTITY_ID_FIELDNAME, UUID.randomUUID()));
entity1.setField( new StringField( "testfield", "test" ) );
entity1.setField(new IntegerField("ordinal", 0));
- entity1.setField(new UUIDField("testuuid",uuid));
+ entity1.setField(new UUIDField("testuuid", uuid));
- batch.index(indexEdge, entity1);
- indexProducer.put(batch).subscribe();
+ batch.index( indexEdge, entity1 );
+ indexProducer.put(batch.build()).subscribe();
Entity entity2 = new Entity( entityType );
@@@ -147,13 -148,13 +148,13 @@@
List<String> list = new ArrayList<>();
- list.add( "test" );
- entity2.setField( new ArrayField<>( "testfield", list ) );
- entity2.setField( new IntegerField( "ordinal", 1 ) );
+ list.add("test");
+ entity2.setField(new ArrayField<>("testfield", list));
+ entity2.setField(new IntegerField("ordinal", 1));
- batch.index(indexEdge, entity2);
- indexProducer.put(batch).subscribe();
+ batch.index( indexEdge, entity2 );
+ indexProducer.put(batch.build()).subscribe();;
entityIndex.refreshAsync().toBlocking().first();
@@@ -362,11 -363,11 +363,11 @@@
Entity entity = EntityIndexMapUtils.fromMap( entityMap );
- EntityUtils.setId( entity, new SimpleId( "fastcar" ) );
- EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() );
- entity.setField( new UUIDField( IndexingUtils.ENTITY_ID_FIELDNAME, UUID.randomUUID() ) );
+ EntityUtils.setId(entity, new SimpleId( "fastcar" ) );
+ EntityUtils.setVersion(entity, UUIDGenerator.newTimeUUID() );
+ entity.setField(new UUIDField(IndexingUtils.ENTITY_ID_FIELDNAME, UUID.randomUUID() ) );
- indexProducer.put(entityIndex.createBatch().index( searchEdge, entity )).toBlocking().last();
+ indexProducer.put(entityIndex.createBatch().index( searchEdge, entity ).build()).subscribe();
entityIndex.refreshAsync().toBlocking().first();
CandidateResults candidateResults = entityIndex
http://git-wip-us.apache.org/repos/asf/usergrid/blob/bce1359b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
index 4d727c3,98b85f1..e74e95e
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
@@@ -70,8 -70,10 +70,7 @@@ public class GeoPagingTest extends Base
@Inject
public EntityIndexFactory eif;
-
@Inject
- IndexProducer indexProducer;
-
- @Inject
@Rule
public MigrationManagerRule migrationManagerRule;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/bce1359b/stack/pom.xml
----------------------------------------------------------------------
[06/36] usergrid git commit: remove threading
Posted by sf...@apache.org.
remove threading
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2a168278
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2a168278
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2a168278
Branch: refs/heads/master
Commit: 2a16827831f65b61c7de25be262551fe87928425
Parents: 805113c
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Sep 15 13:45:08 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Sep 24 15:00:45 2015 -0600
----------------------------------------------------------------------
.../core/future/FutureObservable.java | 44 -------------
.../persistence/index/EntityIndexBatch.java | 1 -
.../index/impl/EsIndexBufferConsumerImpl.java | 65 +-------------------
.../index/impl/IndexOperationMessage.java | 18 ------
4 files changed, 1 insertion(+), 127 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2a168278/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java
deleted file mode 100644
index 06eed4d..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.usergrid.persistence.core.future;
-
-
-import rx.Observable;
-
-import java.util.concurrent.FutureTask;
-
-
-/**
- * Future without the exception nastiness
- */
-public class FutureObservable<T> {
-
- private final FutureTask<T> future;
-
-
- public FutureObservable(final T returnVal) {
- future = new FutureTask<>( () -> returnVal );
- }
-
- public void done() {
- future.run();
- }
-
- public Observable<T> observable() {
- return Observable.from(future);
- }
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2a168278/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
index 85b234a..d1b076d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
@@ -20,7 +20,6 @@ package org.apache.usergrid.persistence.index;/*
import java.util.UUID;
-import org.apache.usergrid.persistence.core.future.FutureObservable;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2a168278/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 91fab2f..93c0d85 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -33,7 +33,6 @@ import org.elasticsearch.client.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.usergrid.persistence.core.future.FutureObservable;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.index.IndexFig;
@@ -62,8 +61,6 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
private final Timer flushTimer;
private final IndexFig indexFig;
private final Counter indexSizeCounter;
- private final Timer offerTimer;
- private final BufferProducer bufferProducer;
private final Histogram roundtripTimer;
private final Timer indexTimer;
@@ -76,7 +73,6 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
final MetricsFactory metricsFactory, final IndexFig indexFig ) {
this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index_buffer.flush");
this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "index_buffer.size");
- this.offerTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index_buffer.producer");
this.roundtripTimer = metricsFactory.getHistogram(EsIndexBufferConsumerImpl.class, "index_buffer.message_cycle");
//wire up the gauge of inflight messages
@@ -90,11 +86,9 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
this.client = provider.getClient();
this.indexFig = indexFig;
- this.bufferProducer = new BufferProducer();
//batch up sets of some size and send them in batch
- startSubscription();
}
@@ -102,34 +96,8 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
Preconditions.checkNotNull(message, "Message cannot be null");
indexSizeCounter.inc(message.getDeIndexRequests().size());
indexSizeCounter.inc(message.getIndexRequests().size());
- Timer.Context time = offerTimer.time();
- bufferProducer.send(message);
- time.stop();
- return message.observable();
- }
-
-
- /**
- * Start the subscription
- */
- private void startSubscription() {
-
- //buffer on our new thread with a timeout
- final Observable<IndexOperationMessage> observable = Observable.create(bufferProducer);
- observable.subscribeOn(Schedulers.io()).flatMap(indexOpBuffer -> {
-
- //hand off to processor in new observable thread so we can continue to buffer faster
- return Observable.just(indexOpBuffer).flatMap(
- indexOpBufferObservable -> ObservableTimer.time(processBatch(indexOpBufferObservable), flushTimer)
- )
-
- //use the I/O scheduler for thread re-use and efficiency in context switching then use our concurrent
- // flatmap count or higher throughput of batches once buffered
- .subscribeOn(Schedulers.io());
- }, indexFig.getIndexFlushWorkerCount())
- //start in the background
- .subscribe();
+ return processBatch(message);
}
@@ -140,10 +108,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
*/
private Observable<IndexOperationMessage> processBatch( final IndexOperationMessage batch ) {
-
//take our stream of batches, then stream then into individual ops for consumption on ES
-
-
final Set<IndexOperation> indexOperationSet = batch.getIndexRequests();
final Set<DeIndexOperation> deIndexOperationSet = batch.getDeIndexRequests();
@@ -186,7 +151,6 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
//subscribe to the operations that generate requests on a new thread so that we can execute them quickly
//mark this as done
return processedIndexOperations.doOnNext(processedIndexOp -> {
- processedIndexOp.done();
roundtripTimer.update(System.currentTimeMillis() - processedIndexOp.getCreationTime());
});
}
@@ -251,31 +215,4 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
+ "entries" );
}
}
-
-
- public static class BufferProducer implements Observable.OnSubscribe<IndexOperationMessage> {
-
- private Subscriber<? super IndexOperationMessage> subscriber;
-
-
- /**
- * Send the data through the buffer
- */
- public void send( final IndexOperationMessage indexOps ) {
- try {
- subscriber.onNext( indexOps );
- }catch(Exception e){
- //re-throws so the caller can determine failover
- log.error( "Unable to process message for indexOp {}, error follows.", indexOps, e );
- throw e;
- }
- }
-
-
- @Override
- public void call( final Subscriber<? super IndexOperationMessage> subscriber ) {
- //just assigns for later use, doesn't do anything else
- this.subscriber = subscriber;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2a168278/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
index 0a49626..bd2bec8 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
@@ -24,10 +24,7 @@ import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;
-import org.apache.usergrid.persistence.core.future.FutureObservable;
-
import com.fasterxml.jackson.annotation.JsonIgnore;
-import rx.Observable;
/**
@@ -40,13 +37,11 @@ public class IndexOperationMessage implements Serializable {
private long creationTime;
- private final FutureObservable<IndexOperationMessage> containerFuture;
public IndexOperationMessage() {
this.indexRequests = new HashSet<>();
this.deIndexRequests = new HashSet<>();
- this.containerFuture = new FutureObservable<>( this );
this.creationTime = System.currentTimeMillis();
}
@@ -78,15 +73,6 @@ public class IndexOperationMessage implements Serializable {
return indexRequests.isEmpty() && deIndexRequests.isEmpty();
}
- /**
- * return the promise
- */
- @JsonIgnore
- public Observable<IndexOperationMessage> observable() {
- return containerFuture.observable();
- }
-
-
@Override
public boolean equals( final Object o ) {
if ( this == o ) {
@@ -116,10 +102,6 @@ public class IndexOperationMessage implements Serializable {
return result;
}
- public void done() {
- //if this has been serialized, it could be null. don't NPE if it is, there's nothing to ack
- containerFuture.done();
- }
public long getCreationTime() {
return creationTime;
[18/36] usergrid git commit: Merge branch '2.1-release' of
https://git-wip-us.apache.org/repos/asf/usergrid into 2.1-release
Posted by sf...@apache.org.
Merge branch '2.1-release' of https://git-wip-us.apache.org/repos/asf/usergrid into 2.1-release
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/175d5268
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/175d5268
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/175d5268
Branch: refs/heads/master
Commit: 175d5268fe5c74c4df7ad16fa6e2d2d370cfb77b
Parents: 5218cda cd623e6
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Sep 25 14:18:27 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Sep 25 14:18:27 2015 -0600
----------------------------------------------------------------------
.../usergrid/management/AppInfoMigrationPlugin.java | 11 +++++++++++
.../services/assets/data/AwsSdkS3BinaryStore.java | 11 ++++-------
2 files changed, 15 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
[36/36] usergrid git commit: fix rest tests
Posted by sf...@apache.org.
fix rest tests
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/a56c8ddd
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/a56c8ddd
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/a56c8ddd
Branch: refs/heads/master
Commit: a56c8dddfcde2d17ec03f0b5c6cb8fc6e0b34700
Parents: bce1359
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Oct 1 10:27:22 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Oct 1 10:27:22 2015 -0600
----------------------------------------------------------------------
.../collection/users/ConnectionResourceTest.java | 12 ++++++------
stack/rest_integration_tests/config/default.js | 1 +
stack/rest_integration_tests/test/entities/create.js | 2 +-
3 files changed, 8 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/a56c8ddd/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/ConnectionResourceTest.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/ConnectionResourceTest.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/ConnectionResourceTest.java
index 777b04e..893703e 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/ConnectionResourceTest.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/ConnectionResourceTest.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
+import org.apache.usergrid.services.exceptions.ServiceResourceNotFoundException;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,6 +35,8 @@ import org.apache.usergrid.rest.test.resource.model.QueryParameters;
import com.sun.jersey.api.client.UniformInterfaceException;
+import javax.ws.rs.NotFoundException;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
@@ -82,9 +85,8 @@ public class ConnectionResourceTest extends AbstractRestIT {
.get();
fail( "This should throw an exception" );
}
- catch ( UniformInterfaceException uie ) {
+ catch ( NotFoundException uie ) {
// Should return a 404 Not Found
- assertEquals( 404, uie.getResponse().getStatus() );
}
}
@@ -172,9 +174,8 @@ public class ConnectionResourceTest extends AbstractRestIT {
thing2 = this.app().collection( "things" ).entity( thing2 ).get();
fail( "This should throw an exception" );
}
- catch ( UniformInterfaceException uie ) {
+ catch ( NotFoundException uie ) {
// Should return a 404 Not Found
- assertEquals( 404, uie.getResponse().getStatus() );
}
}
@@ -208,9 +209,8 @@ public class ConnectionResourceTest extends AbstractRestIT {
thing1 = this.app().collection( "things" ).entity( thing1 ).get();
fail( "This should throw an exception" );
}
- catch ( UniformInterfaceException uie ) {
+ catch ( NotFoundException uie ) {
// Should return a 404 Not Found
- assertEquals( 404, uie.getResponse().getStatus() );
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/a56c8ddd/stack/rest_integration_tests/config/default.js
----------------------------------------------------------------------
diff --git a/stack/rest_integration_tests/config/default.js b/stack/rest_integration_tests/config/default.js
index 5140638..666f3ea 100644
--- a/stack/rest_integration_tests/config/default.js
+++ b/stack/rest_integration_tests/config/default.js
@@ -20,6 +20,7 @@ module.exports = {
appName: "test-app", //must pre create app
numberOfUsers: 5,
numberOfEntities: 20,
+ numberOfEntitiesConsistency: 100,
org: {
clientId: "",
clientSecret: ""
http://git-wip-us.apache.org/repos/asf/usergrid/blob/a56c8ddd/stack/rest_integration_tests/test/entities/create.js
----------------------------------------------------------------------
diff --git a/stack/rest_integration_tests/test/entities/create.js b/stack/rest_integration_tests/test/entities/create.js
index c7e6f42..3fa6831 100644
--- a/stack/rest_integration_tests/test/entities/create.js
+++ b/stack/rest_integration_tests/test/entities/create.js
@@ -20,7 +20,7 @@ var config = require('../../config');
module.exports = {
test: function() {
- var numberOfRecords = 30;
+ var numberOfRecords = config.numberOfEntitiesConsistency;
var uuid = require("uuid");
var id = "resttest_"+ uuid.v1().toString().replace("-", "");
[25/36] usergrid git commit: fix observables
Posted by sf...@apache.org.
fix observables
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/a9dc6ba2
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/a9dc6ba2
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/a9dc6ba2
Branch: refs/heads/master
Commit: a9dc6ba22e028dcae00cb03b210f12f1a598583b
Parents: 0428cd6
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Sep 28 14:51:29 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Sep 28 14:51:29 2015 -0600
----------------------------------------------------------------------
.../asyncevents/InMemoryAsyncEventService.java | 16 +++++++++++-----
.../corepersistence/index/IndexServiceTest.java | 18 +++++++++++++-----
.../index/impl/IndexRefreshCommandImpl.java | 2 +-
.../rest/exceptions/AbstractExceptionMapper.java | 2 +-
4 files changed, 26 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/a9dc6ba2/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index fad6e48..b29c39e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -124,14 +124,20 @@ public class InMemoryAsyncEventService implements AsyncEventService {
}
public void run( Observable<?> observable ) {
- Observable mapped = observable.map(message -> message instanceof IndexOperationMessage ? indexProducer.put((IndexOperationMessage)message) : Observable.just(message));
+
//start it in the background on an i/o thread
if ( !resolveSynchronously ) {
- mapped.subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).subscribe();
- }
- else {
- mapped.subscribe();
+ observable = observable.subscribeOn(rxTaskScheduler.getAsyncIOScheduler());
}
+
+ Observable mapped = observable.flatMap(message ->{
+ if(message instanceof IndexOperationMessage) {
+ return indexProducer.put((IndexOperationMessage)message);
+ } else{
+ return Observable.just(message);
+ }
+ });
+ mapped.subscribe();
}
@Override
http://git-wip-us.apache.org/repos/asf/usergrid/blob/a9dc6ba2/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
index 50c5501..6001dd4 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Set;
import java.util.UUID;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -80,6 +81,9 @@ public class IndexServiceTest {
public GraphManagerFactory graphManagerFactory;
@Inject
+ public IndexProducer indexProducer;
+
+ @Inject
public EntityCollectionManagerFactory entityCollectionManagerFactory;
@Inject
@@ -121,6 +125,7 @@ public class IndexServiceTest {
//real users should never call to blocking, we're not sure what we'll get
final IndexOperationMessage results = indexed.toBlocking().last();
+ indexProducer.put(results).subscribe();
final Set<IndexOperation> indexRequests = results.getIndexRequests();
@@ -170,7 +175,8 @@ public class IndexServiceTest {
//now index
- final int batches = indexService.indexEntity( applicationScope, testEntity ).count().toBlocking().last();
+ final int batches = indexService.indexEntity( applicationScope, testEntity )
+ .flatMap(mesage -> indexProducer.put(mesage)).count().toBlocking().last();
assertEquals(1, batches);
@@ -255,7 +261,8 @@ public class IndexServiceTest {
//now index
- final int batches = indexService.indexEntity( applicationScope, testEntity ).count().toBlocking().last();
+ final int batches = indexService.indexEntity( applicationScope, testEntity )
+ .flatMap(mesage -> indexProducer.put(mesage)).count().toBlocking().last();
//take our edge count + 1 and divided by batch sizes
final int expectedSize = ( int ) Math.ceil( ( (double)edgeCount + 1 ) / indexFig.getIndexBatchSize() );
@@ -372,7 +379,7 @@ public class IndexServiceTest {
final List<Edge> connectionSearchEdges = createConnectionSearchEdges( testEntity, graphManager, edgeCount );
- indexService.indexEntity( applicationScope, testEntity ).toBlocking().getIterator();
+ indexService.indexEntity( applicationScope, testEntity ).flatMap(mesage -> indexProducer.put(mesage)).toBlocking().getIterator();
//query until results are available for collections
final SearchEdge collectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( collectionEdge );
@@ -396,7 +403,7 @@ public class IndexServiceTest {
//step 2
IndexOperationMessage indexOperationMessage =
- indexService.deleteIndexEdge( applicationScope, toBeDeletedEdge ).toBlocking().lastOrDefault( null );
+ indexService.deleteIndexEdge( applicationScope, toBeDeletedEdge ) .flatMap(mesage ->indexProducer.put(mesage)).toBlocking().lastOrDefault( null );
//not sure if this is still valid.
assertEquals( 1, indexOperationMessage.getDeIndexRequests().size() );
@@ -436,7 +443,8 @@ public class IndexServiceTest {
final Edge connectionSearch = graphManager.writeEdge( connectionEdge ).toBlocking().last();
//now index
- indexService.indexEntity( applicationScope, testEntity ).count().toBlocking().last();
+ indexService.indexEntity( applicationScope, testEntity)
+ .flatMap(mesage ->indexProducer.put(mesage)).count().toBlocking().last();
//query until results are available for collections
final SearchEdge collectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( collectionEdge );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/a9dc6ba2/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
index 01942a8..087eefe 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
@@ -193,7 +193,7 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand {
//delete the item
IndexOperationMessage indexOperationMessage = new IndexOperationMessage();
indexOperationMessage.addDeIndexRequest( deIndexRequest );
- producer.put( indexOperationMessage );
+ producer.put( indexOperationMessage ).subscribe();
} );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/a9dc6ba2/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/AbstractExceptionMapper.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/AbstractExceptionMapper.java b/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/AbstractExceptionMapper.java
index 1dbffbd..a359618 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/AbstractExceptionMapper.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/AbstractExceptionMapper.java
@@ -76,7 +76,7 @@ public abstract class AbstractExceptionMapper<E extends java.lang.Throwable> imp
logger.debug(e.getClass().getCanonicalName() + " Server Error (" + status + ")", e);
}
switch (status){
- case 200 : logger.info("Uncaught Exception", e); break;
+ case 200 : logger.debug("Uncaught Exception", e); break;
default: logger.error("Uncaught Exception", e);
}
}
[13/36] usergrid git commit: Merge branch '2.1-release' of
https://git-wip-us.apache.org/repos/asf/usergrid into remove-buffer
Posted by sf...@apache.org.
Merge branch '2.1-release' of https://git-wip-us.apache.org/repos/asf/usergrid into remove-buffer
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/bdea1ff6
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/bdea1ff6
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/bdea1ff6
Branch: refs/heads/master
Commit: bdea1ff65c6dfdce3408afeae9f43be9cbd31149
Parents: c7a6ebf 5233567
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Sep 25 10:54:56 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Sep 25 10:54:56 2015 -0600
----------------------------------------------------------------------
.../usergrid/management/AppInfoMigrationPlugin.java | 11 +++++++++++
1 file changed, 11 insertions(+)
----------------------------------------------------------------------
[02/36] usergrid git commit: Merge branch '2.1-release' of
https://git-wip-us.apache.org/repos/asf/usergrid into 2.1-release
Posted by sf...@apache.org.
Merge branch '2.1-release' of https://git-wip-us.apache.org/repos/asf/usergrid into 2.1-release
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/cdfc575d
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/cdfc575d
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/cdfc575d
Branch: refs/heads/master
Commit: cdfc575dba453121fae080fd265ecf5c666a5ec4
Parents: 32fcbb5 8281430
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Sep 23 17:16:06 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Sep 23 17:16:06 2015 -0600
----------------------------------------------------------------------
.../corepersistence/CpRelationManager.java | 35 ++-
.../service/ConnectionServiceImpl.java | 5 +-
.../service/ConnectionServiceImplTest.java | 4 +-
.../users/ConnectionResourceTest.java | 302 ++++++++++++++-----
.../services/AbstractConnectionsService.java | 43 ++-
5 files changed, 298 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
[30/36] usergrid git commit: return consistent observable
Posted by sf...@apache.org.
return consistent observable
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/c19adb80
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/c19adb80
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/c19adb80
Branch: refs/heads/master
Commit: c19adb809712eafd3ca4a5b28c109cd050f678e2
Parents: 456f80d
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Sep 28 17:06:46 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Sep 28 17:06:46 2015 -0600
----------------------------------------------------------------------
.../corepersistence/asyncevents/AmazonAsyncEventService.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/c19adb80/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 50b210e..d1670ed 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -239,7 +239,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
if (event == null) {
logger.error("AsyncEvent type or event is null!");
- return Observable.empty();
+ return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), false));
}
try {
//merge each operation to a master observable;
[09/36] usergrid git commit: change consumer to producer
Posted by sf...@apache.org.
change consumer to producer
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/5218cda9
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/5218cda9
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/5218cda9
Branch: refs/heads/master
Commit: 5218cda9d5732ae4ee2ee2ce1e7ae297975a7fad
Parents: a5ece51
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Sep 15 16:17:13 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Sep 24 15:01:20 2015 -0600
----------------------------------------------------------------------
.../persistence/index/guice/IndexModule.java | 2 +-
.../index/impl/EsEntityIndexBatchImpl.java | 4 +-
.../index/impl/EsEntityIndexFactoryImpl.java | 8 +-
.../index/impl/EsEntityIndexImpl.java | 6 +-
.../index/impl/EsIndexBufferConsumerImpl.java | 216 -------------------
.../index/impl/EsIndexProducerImpl.java | 209 ++++++++++++++++++
.../index/impl/IndexBufferConsumer.java | 40 ----
.../persistence/index/impl/IndexProducer.java | 37 ++++
.../index/impl/IndexRefreshCommandImpl.java | 5 +-
9 files changed, 257 insertions(+), 270 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
index 7279174..46559ad 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
@@ -52,7 +52,7 @@ public abstract class IndexModule extends AbstractModule {
bind(IndexCache.class).to(EsIndexCacheImpl.class);
bind(IndexRefreshCommand.class).to(IndexRefreshCommandImpl.class);
- bind(IndexBufferConsumer.class).to(EsIndexBufferConsumerImpl.class).asEagerSingleton();
+ bind(IndexProducer.class).to(EsIndexProducerImpl.class).asEagerSingleton();
//wire up the edg migration. A no-op ATM, but retained for future development
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index c11feed..64a1c6a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -41,7 +41,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
private final IndexAlias alias;
private final IndexLocationStrategy indexLocationStrategy;
- private final IndexBufferConsumer indexBatchBufferProducer;
+ private final IndexProducer indexBatchBufferProducer;
private final EntityIndex entityIndex;
private final ApplicationScope applicationScope;
@@ -49,7 +49,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
public EsEntityIndexBatchImpl( final IndexLocationStrategy locationStrategy,
- final IndexBufferConsumer indexBatchBufferProducer,
+ final IndexProducer indexBatchBufferProducer,
final EntityIndex entityIndex
) {
this.indexLocationStrategy = locationStrategy;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
index 869d079..b66fd40 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
@@ -37,7 +37,7 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
private final IndexFig config;
private final IndexCache indexCache;
private final EsProvider provider;
- private final IndexBufferConsumer indexBufferConsumer;
+ private final IndexProducer indexProducer;
private final MetricsFactory metricsFactory;
private final IndexRefreshCommand refreshCommand;
@@ -50,7 +50,7 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
config,
refreshCommand,
metricsFactory,
- indexBufferConsumer,
+ indexProducer,
locationStrategy
);
index.initialize();
@@ -62,7 +62,7 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
public EsEntityIndexFactoryImpl( final IndexFig indexFig,
final IndexCache indexCache,
final EsProvider provider,
- final IndexBufferConsumer indexBufferConsumer,
+ final IndexProducer indexProducer,
final MetricsFactory metricsFactory,
final IndexRefreshCommand refreshCommand
@@ -70,7 +70,7 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
this.config = indexFig;
this.indexCache = indexCache;
this.provider = provider;
- this.indexBufferConsumer = indexBufferConsumer;
+ this.indexProducer = indexProducer;
this.metricsFactory = metricsFactory;
this.refreshCommand = refreshCommand;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 1c63a7b..6317a69 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -63,8 +63,6 @@ import org.elasticsearch.index.query.*;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
-import org.elasticsearch.search.aggregations.Aggregation;
-import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder;
import org.slf4j.Logger;
@@ -118,7 +116,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
private final SearchRequestBuilderStrategyV2 searchRequestBuilderStrategyV2;
private final int cursorTimeout;
private final long queryTimeout;
- private final IndexBufferConsumer indexBatchBufferProducer;
+ private final IndexProducer indexBatchBufferProducer;
private final FailureMonitorImpl failureMonitor;
private final Timer aggregationTimer;
@@ -133,7 +131,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
final IndexFig indexFig,
final IndexRefreshCommand indexRefreshCommand,
final MetricsFactory metricsFactory,
- final IndexBufferConsumer indexBatchBufferProducer,
+ final IndexProducer indexBatchBufferProducer,
final IndexLocationStrategy indexLocationStrategy
) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
deleted file mode 100644
index d126b5d..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.index.impl;
-
-
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.codahale.metrics.Histogram;
-import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.elasticsearch.action.WriteConsistencyLevel;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.client.Client;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.index.IndexFig;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Timer;
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-import rx.Observable;
-import rx.Subscriber;
-import rx.schedulers.Schedulers;
-
-
-/**
- * Consumer for IndexOperationMessages
- */
-@Singleton
-public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
- private static final Logger log = LoggerFactory.getLogger( EsIndexBufferConsumerImpl.class );
-
- private final IndexFig config;
- private final FailureMonitorImpl failureMonitor;
- private final Client client;
- private final Timer flushTimer;
- private final IndexFig indexFig;
- private final Counter indexSizeCounter;
- private final Histogram roundtripTimer;
- private final Timer indexTimer;
-
-
- private AtomicLong inFlight = new AtomicLong();
-
-
- @Inject
- public EsIndexBufferConsumerImpl( final IndexFig config, final EsProvider provider,
- final MetricsFactory metricsFactory, final IndexFig indexFig ) {
- this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index_buffer.flush");
- this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "index_buffer.size");
- this.roundtripTimer = metricsFactory.getHistogram(EsIndexBufferConsumerImpl.class, "index_buffer.message_cycle");
-
- //wire up the gauge of inflight messages
- metricsFactory.addGauge(EsIndexBufferConsumerImpl.class, "index_buffer.inflight", () -> inFlight.longValue());
-
-
- this.indexTimer = metricsFactory.getTimer( EsIndexBufferConsumerImpl.class, "index" );
-
- this.config = config;
- this.failureMonitor = new FailureMonitorImpl(config, provider);
- this.client = provider.getClient();
- this.indexFig = indexFig;
-
-
- //batch up sets of some size and send them in batch
-
- }
-
- public Observable<IndexOperationMessage> put( IndexOperationMessage message ) {
- Preconditions.checkNotNull(message, "Message cannot be null");
- indexSizeCounter.inc(message.getDeIndexRequests().size());
- indexSizeCounter.inc(message.getIndexRequests().size());
- return processBatch(message);
- }
-
-
- /**
- * Process the buffer of batches
- * @param batch
- * @return
- */
- private Observable<IndexOperationMessage> processBatch( final IndexOperationMessage batch ) {
-
- //take our stream of batches, then stream then into individual ops for consumption on ES
- final Set<IndexOperation> indexOperationSet = batch.getIndexRequests();
- final Set<DeIndexOperation> deIndexOperationSet = batch.getDeIndexRequests();
-
- final int indexOperationSetSize = indexOperationSet.size();
- final int deIndexOperationSetSize = deIndexOperationSet.size();
-
- log.debug("Emitting {} add and {} remove operations", indexOperationSetSize, deIndexOperationSetSize);
-
- indexSizeCounter.dec(indexOperationSetSize);
- indexSizeCounter.dec(deIndexOperationSetSize);
-
- final Observable<IndexOperation> index = Observable.from(batch.getIndexRequests());
- final Observable<DeIndexOperation> deIndex = Observable.from(batch.getDeIndexRequests());
-
- final Observable<BatchOperation> batchOps = Observable.merge(index, deIndex);
-
- //buffer into the max size we can send ES and fire them all off until we're completed
- final Observable<BulkRequestBuilder> requests = batchOps.buffer(indexFig.getIndexBatchSize())
- //flatten the buffer into a single batch execution
- .flatMap(individualOps -> Observable.from(individualOps)
- //collect them
- .collect(() -> initRequest(), (bulkRequestBuilder, batchOperation) -> {
- log.debug("adding operation {} to bulkRequestBuilder {}", batchOperation, bulkRequestBuilder);
- batchOperation.doOperation(client, bulkRequestBuilder);
- }))
- //write them
- .doOnNext(bulkRequestBuilder -> sendRequest(bulkRequestBuilder));
-
-
- //now that we've processed them all, ack the futures after our last batch comes through
- final Observable<IndexOperationMessage> processedIndexOperations =
- requests.lastOrDefault(null).flatMap(lastRequest -> {
- if (lastRequest != null) {
- return Observable.just(batch);
- } else {
- return Observable.empty();
- }
- });
-
- //subscribe to the operations that generate requests on a new thread so that we can execute them quickly
- //mark this as done
- return processedIndexOperations.doOnNext(processedIndexOp -> {
- roundtripTimer.update(System.currentTimeMillis() - processedIndexOp.getCreationTime());
- });
- }
-
-
- /*
-
- /**
- * initialize request
- */
- private BulkRequestBuilder initRequest() {
- BulkRequestBuilder bulkRequest = client.prepareBulk();
- bulkRequest.setConsistencyLevel( WriteConsistencyLevel.fromString( config.getWriteConsistencyLevel() ) );
- bulkRequest.setRefresh( config.isForcedRefresh() );
- return bulkRequest;
- }
-
-
- /**
- * send bulk request
- */
- private void sendRequest( BulkRequestBuilder bulkRequest ) {
- //nothing to do, we haven't added anything to the index
- if ( bulkRequest.numberOfActions() == 0 ) {
- return;
- }
-
- final BulkResponse responses;
-
-
- final Timer.Context timer = indexTimer.time();
-
- try {
- responses = bulkRequest.execute().actionGet( );
- } catch ( Throwable t ) {
- log.error( "Unable to communicate with elasticsearch" );
- failureMonitor.fail( "Unable to execute batch", t );
- throw t;
- }finally{
- timer.stop();
- }
-
- failureMonitor.success();
-
- boolean error = false;
-
- for ( BulkItemResponse response : responses ) {
-
- if ( response.isFailed() ) {
- // log error and continue processing
- log.error( "Unable to index id={}, type={}, index={}, failureMessage={} ", response.getId(),
- response.getType(), response.getIndex(), response.getFailureMessage() );
-
- error = true;
- }
- }
-
- if ( error ) {
- throw new RuntimeException(
- "Error during processing of bulk index operations one of the responses failed. Check previous log "
- + "entries" );
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
new file mode 100644
index 0000000..a2c8663
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.codahale.metrics.Histogram;
+import org.elasticsearch.action.WriteConsistencyLevel;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.index.IndexFig;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+
+
+/**
+ * Consumer for IndexOperationMessages
+ */
+@Singleton
+public class EsIndexProducerImpl implements IndexProducer {
+ private static final Logger log = LoggerFactory.getLogger( EsIndexProducerImpl.class );
+
+ private final IndexFig config;
+ private final FailureMonitorImpl failureMonitor;
+ private final Client client;
+ private final Timer flushTimer;
+ private final IndexFig indexFig;
+ private final Counter indexSizeCounter;
+ private final Histogram roundtripTimer;
+ private final Timer indexTimer;
+
+
+ private AtomicLong inFlight = new AtomicLong();
+
+
+ @Inject
+ public EsIndexProducerImpl(final IndexFig config, final EsProvider provider,
+ final MetricsFactory metricsFactory, final IndexFig indexFig) {
+ this.flushTimer = metricsFactory.getTimer(EsIndexProducerImpl.class, "index_buffer.flush");
+ this.indexSizeCounter = metricsFactory.getCounter(EsIndexProducerImpl.class, "index_buffer.size");
+ this.roundtripTimer = metricsFactory.getHistogram(EsIndexProducerImpl.class, "index_buffer.message_cycle");
+
+ //wire up the gauge of inflight messages
+ metricsFactory.addGauge(EsIndexProducerImpl.class, "index_buffer.inflight", () -> inFlight.longValue());
+
+
+ this.indexTimer = metricsFactory.getTimer( EsIndexProducerImpl.class, "index" );
+
+ this.config = config;
+ this.failureMonitor = new FailureMonitorImpl(config, provider);
+ this.client = provider.getClient();
+ this.indexFig = indexFig;
+
+
+ //batch up sets of some size and send them in batch
+
+ }
+
+ public Observable<IndexOperationMessage> put( IndexOperationMessage message ) {
+ Preconditions.checkNotNull(message, "Message cannot be null");
+ indexSizeCounter.inc(message.getDeIndexRequests().size());
+ indexSizeCounter.inc(message.getIndexRequests().size());
+ return processBatch(message);
+ }
+
+
+ /**
+ * Process the buffer of batches
+ * @param batch
+ * @return
+ */
+ private Observable<IndexOperationMessage> processBatch( final IndexOperationMessage batch ) {
+
+ //take our stream of batches, then stream then into individual ops for consumption on ES
+ final Set<IndexOperation> indexOperationSet = batch.getIndexRequests();
+ final Set<DeIndexOperation> deIndexOperationSet = batch.getDeIndexRequests();
+
+ final int indexOperationSetSize = indexOperationSet.size();
+ final int deIndexOperationSetSize = deIndexOperationSet.size();
+
+ log.debug("Emitting {} add and {} remove operations", indexOperationSetSize, deIndexOperationSetSize);
+
+ indexSizeCounter.dec(indexOperationSetSize);
+ indexSizeCounter.dec(deIndexOperationSetSize);
+
+ final Observable<IndexOperation> index = Observable.from(batch.getIndexRequests());
+ final Observable<DeIndexOperation> deIndex = Observable.from(batch.getDeIndexRequests());
+
+ final Observable<BatchOperation> batchOps = Observable.merge(index, deIndex);
+
+ //buffer into the max size we can send ES and fire them all off until we're completed
+ final Observable<BulkRequestBuilder> requests = batchOps.buffer(indexFig.getIndexBatchSize())
+ //flatten the buffer into a single batch execution
+ .flatMap(individualOps -> Observable.from(individualOps)
+ //collect them
+ .collect(() -> initRequest(), (bulkRequestBuilder, batchOperation) -> {
+ log.debug("adding operation {} to bulkRequestBuilder {}", batchOperation, bulkRequestBuilder);
+ batchOperation.doOperation(client, bulkRequestBuilder);
+ }))
+ //write them
+ .doOnNext(bulkRequestBuilder -> sendRequest(bulkRequestBuilder));
+
+
+ //now that we've processed them all, ack the futures after our last batch comes through
+ final Observable<IndexOperationMessage> processedIndexOperations =
+ requests.lastOrDefault(null).flatMap(lastRequest -> {
+ if (lastRequest != null) {
+ return Observable.just(batch);
+ } else {
+ return Observable.empty();
+ }
+ });
+
+ //subscribe to the operations that generate requests on a new thread so that we can execute them quickly
+ //mark this as done
+ return processedIndexOperations.doOnNext(processedIndexOp -> {
+ roundtripTimer.update(System.currentTimeMillis() - processedIndexOp.getCreationTime());
+ });
+ }
+
+
+ /*
+
+ /**
+ * initialize request
+ */
+ private BulkRequestBuilder initRequest() {
+ BulkRequestBuilder bulkRequest = client.prepareBulk();
+ bulkRequest.setConsistencyLevel( WriteConsistencyLevel.fromString( config.getWriteConsistencyLevel() ) );
+ bulkRequest.setRefresh( config.isForcedRefresh() );
+ return bulkRequest;
+ }
+
+
+ /**
+ * send bulk request
+ */
+ private void sendRequest( BulkRequestBuilder bulkRequest ) {
+ //nothing to do, we haven't added anything to the index
+ if ( bulkRequest.numberOfActions() == 0 ) {
+ return;
+ }
+
+ final BulkResponse responses;
+
+
+ final Timer.Context timer = indexTimer.time();
+
+ try {
+ responses = bulkRequest.execute().actionGet( );
+ } catch ( Throwable t ) {
+ log.error( "Unable to communicate with elasticsearch" );
+ failureMonitor.fail( "Unable to execute batch", t );
+ throw t;
+ }finally{
+ timer.stop();
+ }
+
+ failureMonitor.success();
+
+ boolean error = false;
+
+ for ( BulkItemResponse response : responses ) {
+
+ if ( response.isFailed() ) {
+ // log error and continue processing
+ log.error( "Unable to index id={}, type={}, index={}, failureMessage={} ", response.getId(),
+ response.getType(), response.getIndex(), response.getFailureMessage() );
+
+ error = true;
+ }
+ }
+
+ if ( error ) {
+ throw new RuntimeException(
+ "Error during processing of bulk index operations one of the responses failed. Check previous log "
+ + "entries" );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
deleted file mode 100644
index cfeb505..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.index.impl;
-
-
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import rx.Observable;
-
-import java.util.List;
-
-
-/**
- * Buffer index requests
- */
-public interface IndexBufferConsumer {
-
-
- /**
- * Put this operation into our collapsing bufer
- * @param message
- * @return
- */
- Observable<IndexOperationMessage> put(IndexOperationMessage message);
-
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexProducer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexProducer.java
new file mode 100644
index 0000000..ba7027e
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexProducer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.index.impl;
+
+
+import rx.Observable;
+
+
+/**
+ * Buffer index requests
+ */
+public interface IndexProducer {
+
+
+ /**
+ * Put this operation into our collapsing bufer
+ * @param message
+ * @return
+ */
+ Observable<IndexOperationMessage> put(IndexOperationMessage message);
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
index 7b9bc5d..01942a8 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
@@ -20,7 +20,6 @@
package org.apache.usergrid.persistence.index.impl;
-import java.util.Collections;
import java.util.Map;
import java.util.UUID;
@@ -58,7 +57,7 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand {
private final IndexCache indexCache;
private final EsProvider esProvider;
- private final IndexBufferConsumer producer;
+ private final IndexProducer producer;
private final IndexFig indexFig;
private final Timer timer;
@@ -66,7 +65,7 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand {
@Inject
public IndexRefreshCommandImpl(
final EsProvider esProvider,
- final IndexBufferConsumer producer,
+ final IndexProducer producer,
final IndexFig indexFig,
final MetricsFactory metricsFactory,
final IndexCache indexCache ) {
[17/36] usergrid git commit: remove rx
Posted by sf...@apache.org.
remove rx
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/28919aeb
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/28919aeb
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/28919aeb
Branch: refs/heads/master
Commit: 28919aebea35c027a5b720ebeebb2200c1d214c3
Parents: bdea1ff
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Sep 25 13:47:29 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Sep 25 13:47:29 2015 -0600
----------------------------------------------------------------------
.../usergrid/persistence/index/impl/EsIndexProducerImpl.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/28919aeb/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
index a2c8663..2b36fc8 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
@@ -132,7 +132,7 @@ public class EsIndexProducerImpl implements IndexProducer {
//now that we've processed them all, ack the futures after our last batch comes through
final Observable<IndexOperationMessage> processedIndexOperations =
- requests.lastOrDefault(null).flatMap(lastRequest -> {
+ requests.flatMap(lastRequest -> {
if (lastRequest != null) {
return Observable.just(batch);
} else {
[03/36] usergrid git commit: Merge branch '2.1-release' of
https://git-wip-us.apache.org/repos/asf/usergrid into 2.1-release
Posted by sf...@apache.org.
Merge branch '2.1-release' of https://git-wip-us.apache.org/repos/asf/usergrid into 2.1-release
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/ce5a96ff
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/ce5a96ff
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/ce5a96ff
Branch: refs/heads/master
Commit: ce5a96ffe8f919358f6b39b2d4ffe2dcb564a27a
Parents: cdfc575 0b243c4
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Sep 24 14:58:30 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Sep 24 14:58:30 2015 -0600
----------------------------------------------------------------------
stack/scripts/migrate_entity_data.py | 66 ++++++++++++++-----
.../management/AppInfoMigrationPlugin.java | 69 +++++++++-----------
2 files changed, 79 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
[34/36] usergrid git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/usergrid
Posted by sf...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/usergrid
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/3b838c82
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/3b838c82
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/3b838c82
Branch: refs/heads/master
Commit: 3b838c8265abde705ea3b70734aa7de24a8a42d0
Parents: 407ebe0 60acf84
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Oct 1 09:41:09 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Oct 1 09:41:09 2015 -0600
----------------------------------------------------------------------
README-Docs.md | 13 +
README.md | 10 +-
content/bootstrap/bootstrap.min.css | 9 +
content/community/index.html | 22 +-
content/css/bootflat-extensions.css | 356 +
content/css/bootflat-square.css | 69 +
content/css/bootflat.css | 1560 +
content/css/font-awesome.min.css | 405 +
content/css/usergrid-site.css | 1553 +
.../docs/_sources/jersey2skeleton/README.txt | 9 +
.../creating-and-managing-notifications.txt | 2 +
.../docs/_sources/rest-endpoints/api-docs.txt | 653 +-
content/docs/jersey2skeleton/README.html | 303 +
.../creating-and-managing-notifications.html | 1 +
content/docs/rest-endpoints/api-docs.html | 533 +-
content/docs/searchindex.js | 2 +-
content/favicon.ico | Bin 0 -> 3989 bytes
content/font/FontAwesome.otf | Bin 0 -> 61896 bytes
content/font/fontawesome-webfont-eot.eot | Bin 0 -> 37405 bytes
content/font/fontawesome-webfont-svg.svg | 399 +
content/font/fontawesome-webfont-ttf.ttf | Bin 0 -> 79076 bytes
content/font/fontawesome-webfont-woff.woff | Bin 0 -> 43572 bytes
content/img/alberto.jpg | Bin 0 -> 16137 bytes
content/img/alex.png | Bin 0 -> 40842 bytes
content/img/apache_usergrid_favicon.png | Bin 0 -> 10735 bytes
content/img/apache_usergrid_logo_white.png | Bin 0 -> 26418 bytes
.../img/apache_usergrid_logo_white_small.png | Bin 0 -> 11905 bytes
content/img/check_flat/default.png | Bin 0 -> 25851 bytes
content/img/dave.jpg | Bin 0 -> 14005 bytes
content/img/ed.jpg | Bin 0 -> 20460 bytes
content/img/egg-logo.png | Bin 0 -> 9938 bytes
content/img/github.png | Bin 0 -> 8936 bytes
content/img/grey.png | Bin 0 -> 37896 bytes
content/img/intellij.png | Bin 0 -> 9199 bytes
content/img/jeff.jpg | Bin 0 -> 13857 bytes
content/img/michael_r.jpg | Bin 0 -> 10244 bytes
content/img/mike_d.JPG | Bin 0 -> 36443 bytes
content/img/nate.jpg | Bin 0 -> 4291 bytes
content/img/rod.jpg | Bin 0 -> 40313 bytes
content/img/scott.jpg | Bin 0 -> 8555 bytes
content/img/shawn.jpg | Bin 0 -> 69304 bytes
content/img/stliu.jpg | Bin 0 -> 51303 bytes
content/img/strong.jpg | Bin 0 -> 7434 bytes
content/img/structure101.png | Bin 0 -> 6475 bytes
content/img/sungju.jpg | Bin 0 -> 11440 bytes
content/img/tim.jpg | Bin 0 -> 7611 bytes
content/img/todd.jpg | Bin 0 -> 18142 bytes
content/img/usergrid-logo.pdf | 398 +
content/img/usergrid.png | Bin 0 -> 21994 bytes
content/img/usergrid_160.png | Bin 0 -> 2126 bytes
content/img/usergrid_200.png | Bin 0 -> 6397 bytes
content/img/usergrid_300.png | Bin 0 -> 16330 bytes
content/img/usergrid_300_transparent.png | Bin 0 -> 16308 bytes
content/img/usergrid_400.png | Bin 0 -> 8746 bytes
content/img/usergrid_800.png | Bin 0 -> 14452 bytes
content/img/usergrid_card.png | Bin 0 -> 23295 bytes
content/img/usergrid_logo.png | Bin 0 -> 118086 bytes
content/img/usergrid_logo_205_50.png | Bin 0 -> 7058 bytes
content/img/usergrid_logo_260_50.png | Bin 0 -> 8682 bytes
content/img/usergrid_logo_720.png | Bin 0 -> 27610 bytes
content/img/usergrid_logo_720p.png | Bin 0 -> 27608 bytes
content/img/usergrid_logo_900_200.png | Bin 0 -> 12273 bytes
content/img/usergrid_logo_white.png | Bin 0 -> 16900 bytes
content/img/usergrid_profile_128.png | Bin 0 -> 6689 bytes
content/img/usergrid_profile_256.png | Bin 0 -> 10470 bytes
content/img/usergrid_profile_256_white.png | Bin 0 -> 10724 bytes
content/img/usergrid_profile_512_margins.png | Bin 0 -> 19112 bytes
content/img/usergrid_profile_64_white.png | Bin 0 -> 4839 bytes
content/img/usergrid_profile_background.png | Bin 0 -> 6710 bytes
content/img/usergrid_screencast_bg.png | Bin 0 -> 20478 bytes
content/img/usergrid_small_100.png | Bin 0 -> 6681 bytes
content/img/usergrid_wiki.png | Bin 0 -> 7135 bytes
content/img/yourkit.jpeg | Bin 0 -> 7763 bytes
content/js/bootstrap.min.js | 8 +
content/js/head.js | 708 +
content/js/html5shiv.js | 8 +
content/js/jquery-1.10.1.min.js | 6 +
content/js/jquery.icheck.js | 397 +
content/js/respond.min.js | 6 +
content/js/usergrid-site.js | 50 +
content/static/github-btn.html | 2 +
content/v101-portal-demo/config.js | 129 +
content/v101-portal-demo/css/entypo/entypo.eot | Bin 0 -> 35540 bytes
content/v101-portal-demo/css/entypo/entypo.svg | 13 +
content/v101-portal-demo/css/entypo/entypo.ttf | Bin 0 -> 35392 bytes
content/v101-portal-demo/css/entypo/entypo.woff | Bin 0 -> 21916 bytes
content/v101-portal-demo/css/main.css | 1990 +
content/v101-portal-demo/css/main.min.css | 1 +
content/v101-portal-demo/favicon.ico | Bin 0 -> 3989 bytes
content/v101-portal-demo/helpJson.json | 47 +
.../img/appswitcher/apiPlatform_lg.png | Bin 0 -> 2397 bytes
.../img/appswitcher/appServices_lg.png | Bin 0 -> 2295 bytes
.../img/appswitcher/console_lg.png | Bin 0 -> 1453 bytes
.../img/appswitcher/home_lg.png | Bin 0 -> 1522 bytes
.../img/appswitcher/logo_color.png | Bin 0 -> 3459 bytes
.../v101-portal-demo/img/appswitcher/max_lg.png | Bin 0 -> 1970 bytes
.../img/appswitcher/triangleMenuItem_right.png | Bin 0 -> 1158 bytes
.../triangleMenuItem_right_hover.png | Bin 0 -> 1169 bytes
content/v101-portal-demo/img/blue-bars.png | Bin 0 -> 3635 bytes
content/v101-portal-demo/img/blue-bolt.png | Bin 0 -> 3942 bytes
content/v101-portal-demo/img/blue-carat.png | Bin 0 -> 1006 bytes
content/v101-portal-demo/img/green_dot.png | Bin 0 -> 3472 bytes
.../img/introjs_arrow_step_next.png | Bin 0 -> 219 bytes
.../img/introjs_arrow_step_next_disabled.png | Bin 0 -> 220 bytes
.../img/introjs_arrow_step_prev.png | Bin 0 -> 217 bytes
.../img/introjs_arrow_step_prev_disabled.png | Bin 0 -> 218 bytes
content/v101-portal-demo/img/introjs_close.png | Bin 0 -> 274 bytes
content/v101-portal-demo/img/logo.png | Bin 0 -> 7758 bytes
content/v101-portal-demo/img/nav-device.gif | Bin 0 -> 2184 bytes
content/v101-portal-demo/img/nav-sprites.png | Bin 0 -> 7953 bytes
content/v101-portal-demo/img/no-data1.png | Bin 0 -> 45300 bytes
content/v101-portal-demo/img/phone-small.gif | Bin 0 -> 1300 bytes
.../img/push/APNS_cert_upload.png | Bin 0 -> 33956 bytes
.../img/push/APNS_certification.png | Bin 0 -> 16855 bytes
.../img/push/android-notification.png | Bin 0 -> 41629 bytes
.../img/push/google_api_key.png | Bin 0 -> 98118 bytes
.../img/push/iphone_message.png | Bin 0 -> 90307 bytes
content/v101-portal-demo/img/push/step_1.png | Bin 0 -> 1953 bytes
content/v101-portal-demo/img/push/step_2.png | Bin 0 -> 2117 bytes
content/v101-portal-demo/img/push/step_3.png | Bin 0 -> 2162 bytes
content/v101-portal-demo/img/red_dot.png | Bin 0 -> 3482 bytes
.../v101-portal-demo/img/sdk-sprites-large.png | Bin 0 -> 15115 bytes
content/v101-portal-demo/img/sdk-sprites.png | Bin 0 -> 4401 bytes
content/v101-portal-demo/img/tablet-small.gif | Bin 0 -> 1390 bytes
content/v101-portal-demo/img/user-photo.png | Bin 0 -> 3849 bytes
content/v101-portal-demo/img/user_profile.png | Bin 0 -> 3775 bytes
content/v101-portal-demo/img/verify.png | Bin 0 -> 22934 bytes
content/v101-portal-demo/img/yellow_dot.png | Bin 0 -> 3475 bytes
content/v101-portal-demo/index-debug.html | 151 +
content/v101-portal-demo/index-template.html | 156 +
content/v101-portal-demo/index.html | 151 +
.../js/generated/usergrid-dev.js | 4886 +
.../js/generated/usergrid-libs.min.js | 38 +
.../js/generated/usergrid.min.js | 25 +
content/v101-portal-demo/js/libs/MD5.min.js | 1 +
.../js/libs/angular-1.2.5/angular-animate.js | 1323 +
.../libs/angular-1.2.5/angular-animate.min.js | 23 +
.../angular-1.2.5/angular-animate.min.js.map | 8 +
.../js/libs/angular-1.2.5/angular-cookies.js | 202 +
.../libs/angular-1.2.5/angular-cookies.min.js | 8 +
.../angular-1.2.5/angular-cookies.min.js.map | 8 +
.../js/libs/angular-1.2.5/angular-csp.css | 24 +
.../js/libs/angular-1.2.5/angular-loader.js | 410 +
.../js/libs/angular-1.2.5/angular-loader.min.js | 9 +
.../angular-1.2.5/angular-loader.min.js.map | 8 +
.../js/libs/angular-1.2.5/angular-mocks.js | 2116 +
.../js/libs/angular-1.2.5/angular-resource.js | 565 +
.../libs/angular-1.2.5/angular-resource.min.js | 13 +
.../angular-1.2.5/angular-resource.min.js.map | 8 +
.../js/libs/angular-1.2.5/angular-route.js | 911 +
.../js/libs/angular-1.2.5/angular-route.min.js | 14 +
.../libs/angular-1.2.5/angular-route.min.js.map | 8 +
.../js/libs/angular-1.2.5/angular-sanitize.js | 622 +
.../libs/angular-1.2.5/angular-sanitize.min.js | 14 +
.../angular-1.2.5/angular-sanitize.min.js.map | 8 +
.../js/libs/angular-1.2.5/angular-scenario.js | 32374 ++++++
.../js/libs/angular-1.2.5/angular-touch.js | 563 +
.../js/libs/angular-1.2.5/angular-touch.min.js | 13 +
.../libs/angular-1.2.5/angular-touch.min.js.map | 8 +
.../js/libs/angular-1.2.5/angular.js | 20369 ++++
.../js/libs/angular-1.2.5/angular.min.js | 201 +
.../js/libs/angular-1.2.5/angular.min.js.map | 8 +
.../js/libs/angular-1.2.5/errors.json | 1 +
.../js/libs/angular-1.2.5/version.json | 1 +
.../js/libs/angular-1.2.5/version.txt | 1 +
.../libs/bootstrap/css/bootstrap-responsive.css | 1345 +
.../bootstrap/css/bootstrap-responsive.min.css | 1245 +
.../js/libs/bootstrap/css/bootstrap.css | 6169 ++
.../js/libs/bootstrap/css/bootstrap.min.css | 5469 ++
.../js/libs/bootstrap/custom/css/bootstrap.css | 6316 ++
.../libs/bootstrap/custom/css/bootstrap.min.css | 9 +
.../custom/img/glyphicons-halflings-white.png | Bin 0 -> 8777 bytes
.../custom/img/glyphicons-halflings.png | Bin 0 -> 12799 bytes
.../js/libs/bootstrap/custom/js/bootstrap.js | 2291 +
.../libs/bootstrap/custom/js/bootstrap.min.js | 7 +
.../img/glyphicons-halflings-white.png | Bin 0 -> 8777 bytes
.../libs/bootstrap/img/glyphicons-halflings.png | Bin 0 -> 12799 bytes
.../js/libs/bootstrap/js/bootstrap.js | 2117 +
.../js/libs/bootstrap/js/bootstrap.min.js | 644 +
.../v101-portal-demo/js/libs/google-viz-api.js | 49 +
.../js/libs/jquery/jquery-1.9.1.min.js | 5 +
.../js/libs/jquery/jquery-migrate-1.1.1.min.js | 3 +
.../js/libs/jquery/jquery.sparkline.min.js | 5 +
.../js/libs/jqueryui/date.min.js | 2 +
.../ui-bg_diagonals-thick_90_eeeeee_40x40.png | Bin 0 -> 251 bytes
.../images/ui-bg_flat_100_deedf7_40x100.png | Bin 0 -> 182 bytes
.../images/ui-bg_flat_100_e4f1fb_40x100.png | Bin 0 -> 213 bytes
.../images/ui-bg_flat_100_f2f5f7_40x100.png | Bin 0 -> 212 bytes
.../images/ui-bg_flat_15_cd0a0a_40x100.png | Bin 0 -> 181 bytes
.../images/ui-bg_flat_50_3baae3_40x100.png | Bin 0 -> 182 bytes
.../images/ui-bg_flat_80_d7ebf9_40x100.png | Bin 0 -> 183 bytes
.../ui-bg_highlight-hard_70_000000_1x100.png | Bin 0 -> 118 bytes
.../ui-bg_highlight-soft_25_ffef8f_1x100.png | Bin 0 -> 153 bytes
.../jqueryui/images/ui-icons_000000_256x240.png | Bin 0 -> 4369 bytes
.../jqueryui/images/ui-icons_2694e8_256x240.png | Bin 0 -> 4369 bytes
.../jqueryui/images/ui-icons_2e83ff_256x240.png | Bin 0 -> 4369 bytes
.../jqueryui/images/ui-icons_3d80b3_256x240.png | Bin 0 -> 4369 bytes
.../jqueryui/images/ui-icons_72a7cf_256x240.png | Bin 0 -> 4369 bytes
.../jqueryui/images/ui-icons_ffffff_256x240.png | Bin 0 -> 4369 bytes
.../js/libs/jqueryui/jquery-ui-1.8.18.min.js | 15 +
.../js/libs/jqueryui/jquery-ui-1.8.9.custom.css | 1 +
.../js/libs/jqueryui/jquery-ui-timepicker.css | 1 +
.../libs/jqueryui/jquery.ui.timepicker.min.js | 1 +
.../ui-bootstrap-custom-0.3.0.min.js | 1 +
.../ui-bootstrap-custom-tpls-0.3.0.min.js | 1 +
.../v101-portal-demo/js/libs/usergrid.sdk.js | 2568 +
docs/rest-endpoints/api-docs.html | 750 +-
docs/rest-endpoints/api-docs.md | 142 +-
website/README.md | 3 +-
website/Rules | 52 +
website/build.sh | 1 +
website/content/bootstrap/bootstrap.min.css | 9 +
website/content/community/index.html | 22 +-
website/content/css/bootflat-extensions.css | 356 +
website/content/css/bootflat-square.css | 69 +
website/content/css/bootflat.css | 1560 +
website/content/css/font-awesome.min.css | 405 +
website/content/css/usergrid-site.css | 1553 +
website/content/favicon.ico | Bin 0 -> 3989 bytes
website/content/font/FontAwesome.otf | Bin 0 -> 61896 bytes
.../content/font/fontawesome-webfont-eot.eot | Bin 0 -> 37405 bytes
.../content/font/fontawesome-webfont-svg.svg | 399 +
.../content/font/fontawesome-webfont-ttf.ttf | Bin 0 -> 79076 bytes
.../content/font/fontawesome-webfont-woff.woff | Bin 0 -> 43572 bytes
website/content/img/alberto.jpg | Bin 0 -> 16137 bytes
website/content/img/alex.png | Bin 0 -> 40842 bytes
website/content/img/apache_usergrid_favicon.png | Bin 0 -> 10735 bytes
.../content/img/apache_usergrid_logo_white.png | Bin 0 -> 26418 bytes
.../img/apache_usergrid_logo_white_small.png | Bin 0 -> 11905 bytes
website/content/img/check_flat/default.png | Bin 0 -> 25851 bytes
website/content/img/dave.jpg | Bin 0 -> 14005 bytes
website/content/img/ed.jpg | Bin 0 -> 20460 bytes
website/content/img/egg-logo.png | Bin 0 -> 9938 bytes
website/content/img/github.png | Bin 0 -> 8936 bytes
website/content/img/grey.png | Bin 0 -> 37896 bytes
website/content/img/intellij.png | Bin 0 -> 9199 bytes
website/content/img/jeff.jpg | Bin 0 -> 13857 bytes
website/content/img/michael_r.jpg | Bin 0 -> 10244 bytes
website/content/img/mike_d.JPG | Bin 0 -> 36443 bytes
website/content/img/nate.jpg | Bin 0 -> 4291 bytes
website/content/img/rod.jpg | Bin 0 -> 40313 bytes
website/content/img/scott.jpg | Bin 0 -> 8555 bytes
website/content/img/shawn.jpg | Bin 0 -> 69304 bytes
website/content/img/stliu.jpg | Bin 0 -> 51303 bytes
website/content/img/strong.jpg | Bin 0 -> 7434 bytes
website/content/img/structure101.png | Bin 0 -> 6475 bytes
website/content/img/sungju.jpg | Bin 0 -> 11440 bytes
website/content/img/tim.jpg | Bin 0 -> 7611 bytes
website/content/img/todd.jpg | Bin 0 -> 18142 bytes
website/content/img/usergrid-logo.pdf | 398 +
website/content/img/usergrid.png | Bin 0 -> 21994 bytes
website/content/img/usergrid_160.png | Bin 0 -> 2126 bytes
website/content/img/usergrid_200.png | Bin 0 -> 6397 bytes
website/content/img/usergrid_300.png | Bin 0 -> 16330 bytes
.../content/img/usergrid_300_transparent.png | Bin 0 -> 16308 bytes
website/content/img/usergrid_400.png | Bin 0 -> 8746 bytes
website/content/img/usergrid_800.png | Bin 0 -> 14452 bytes
website/content/img/usergrid_card.png | Bin 0 -> 23295 bytes
website/content/img/usergrid_logo.png | Bin 0 -> 118086 bytes
website/content/img/usergrid_logo_205_50.png | Bin 0 -> 7058 bytes
website/content/img/usergrid_logo_260_50.png | Bin 0 -> 8682 bytes
website/content/img/usergrid_logo_720.png | Bin 0 -> 27610 bytes
website/content/img/usergrid_logo_720p.png | Bin 0 -> 27608 bytes
website/content/img/usergrid_logo_900_200.png | Bin 0 -> 12273 bytes
website/content/img/usergrid_logo_white.png | Bin 0 -> 16900 bytes
website/content/img/usergrid_profile_128.png | Bin 0 -> 6689 bytes
website/content/img/usergrid_profile_256.png | Bin 0 -> 10470 bytes
.../content/img/usergrid_profile_256_white.png | Bin 0 -> 10724 bytes
.../img/usergrid_profile_512_margins.png | Bin 0 -> 19112 bytes
.../content/img/usergrid_profile_64_white.png | Bin 0 -> 4839 bytes
.../content/img/usergrid_profile_background.png | Bin 0 -> 6710 bytes
website/content/img/usergrid_screencast_bg.png | Bin 0 -> 20478 bytes
website/content/img/usergrid_small_100.png | Bin 0 -> 6681 bytes
website/content/img/usergrid_wiki.png | Bin 0 -> 7135 bytes
website/content/img/yourkit.jpeg | Bin 0 -> 7763 bytes
website/content/js/bootstrap.min.js | 8 +
website/content/js/head.js | 708 +
website/content/js/html5shiv.js | 8 +
website/content/js/jquery-1.10.1.min.js | 6 +
website/content/js/jquery.icheck.js | 397 +
website/content/js/respond.min.js | 6 +
website/content/js/usergrid-site.js | 50 +
website/content/static/github-btn.html | 2 +
website/crash.log | 143 +
website/layouts/community.html | 1 +
website/layouts/docs.html | 1 +
website/lib/default.rb | 43 +
website/lib/helpers_.rb | 0
website/lib/pandoc.template | 4 +
website/nanoc.yaml | 77 +
website/run.sh | 1 +
website/tmp/checksums | 4 +-
website/tmp/compiled_content | 87099 +----------------
website/tmp/dependencies | Bin 2372 -> 2397 bytes
website/tmp/rule_memory | Bin 0 -> 5163 bytes
website/utilities/map-markers.rb | 62 +
website/utilities/markers.txt | 440 +
website/utilities/snapshot-apigee.rb | 71 +
website/utilities/usergrid.csv | 290 +
299 files changed, 111298 insertions(+), 87017 deletions(-)
----------------------------------------------------------------------
[05/36] usergrid git commit: remove batch consumer
Posted by sf...@apache.org.
remove batch consumer
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/805113c7
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/805113c7
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/805113c7
Branch: refs/heads/master
Commit: 805113c76695fe415b99ebd6f8aba47a6e0aa781
Parents: 7681600
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Sep 15 13:14:08 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Sep 24 15:00:33 2015 -0600
----------------------------------------------------------------------
.../java/org/apache/usergrid/persistence/index/IndexFig.java | 4 ----
1 file changed, 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/805113c7/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index db1ef3d..4f35730 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -48,10 +48,6 @@ public interface IndexFig extends GuicyFig {
String ELASTICSEARCH_FORCE_REFRESH = "elasticsearch.force_refresh";
- String INDEX_BUFFER_SIZE = "elasticsearch.buffer_size";
-
- String INDEX_BUFFER_TIMEOUT = "elasticsearch.buffer_timeout";
-
String INDEX_BATCH_SIZE = "elasticsearch.batch_size";
String INDEX_WRITE_CONSISTENCY_LEVEL = "elasticsearch.write_consistency_level";
[28/36] usergrid git commit: remove job scheduler log
Posted by sf...@apache.org.
remove job scheduler log
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/456f80d4
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/456f80d4
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/456f80d4
Branch: refs/heads/master
Commit: 456f80d4a7a94ac8b67261cbdfc1f3af828fde61
Parents: d73f98d
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Sep 28 16:13:40 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Sep 28 16:13:40 2015 -0600
----------------------------------------------------------------------
.../org/apache/usergrid/batch/service/JobSchedulerService.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/456f80d4/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java b/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java
index d76be6f..284e1db 100644
--- a/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java
@@ -131,7 +131,7 @@ public class JobSchedulerService extends AbstractScheduledService {
}
}
catch ( Throwable t ) {
- LOG.error( "Scheduler run failed, error is", t );
+ LOG.debug( "Scheduler run failed, error is", t );
}
}
[33/36] usergrid git commit: add consistency check for create
Posted by sf...@apache.org.
add consistency check for create
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9ceae774
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9ceae774
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9ceae774
Branch: refs/heads/master
Commit: 9ceae7748ce522778fbd0d9a9bbbd6b240cf9b9b
Parents: 2e51d06
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Oct 1 09:40:14 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Oct 1 09:40:14 2015 -0600
----------------------------------------------------------------------
stack/rest_integration_tests/lib/entities.js | 41 ++++++++++++++++++++
.../test/entities/create.js | 24 +++++++++++-
2 files changed, 64 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9ceae774/stack/rest_integration_tests/lib/entities.js
----------------------------------------------------------------------
diff --git a/stack/rest_integration_tests/lib/entities.js b/stack/rest_integration_tests/lib/entities.js
index c62ca0d..3693dcb 100644
--- a/stack/rest_integration_tests/lib/entities.js
+++ b/stack/rest_integration_tests/lib/entities.js
@@ -46,6 +46,37 @@ module.exports = {
cb(error, error ? error : body);
});
},
+ createEach: function(collection, numberOfEntities, cb) {
+ var url = urls.appendOrgCredentials(urls.appUrl() + collection);
+ var requestArray = []
+ geos = random.geo(config.location, 2000, numberOfEntities);
+ // console.log(geos);
+ for (var i = 0; i < numberOfEntities; i++) {
+ requestArray.push({
+ consistentProperty: "somethingConsistent",
+ randomProperty: "somethingRandom - " + random.randomString(10),
+ intProperty: random.randomNumber(5),
+ optionsProperty: random.abc(),
+ location: geos[i],
+ title: "A Tale of Two Cities"
+ });
+ }
+ var returnBody = [];
+ async.each(requestArray, function(options, cb) {
+ request.post({
+ url: url,
+ json: true,
+ body: options
+ }, function(e, r, body) {
+ var error = responseLib.getError(e, r);
+ returnBody.push(body.entities[0]);
+ cb(error, error ? error : body.entities[0]);
+ });
+ }, function(err,bodies) {
+ cb(err,returnBody);
+ });
+
+ },
deleteAll: function(collection, cb) {
var url = urls.appendOrgCredentials(urls.appUrl() + collection);
deleteAllEntities(collection, function(e) {
@@ -79,6 +110,16 @@ module.exports = {
cb(error, error ? error : body);
})
},
+ getByUuid: function(collection, uuid, cb) {
+ var url = urls.appendOrgCredentials(urls.appUrl() + collection + "/"+uuid);
+ request.get({
+ url: url,
+ json: true
+ }, function(e, r, body) {
+ var error = responseLib.getError(e, r);
+ cb(error, error ? error : body);
+ })
+ },
getWithQuery: function(collection, query, numberOfEntities, cb) {
var url = urls.appendOrgCredentials(urls.appUrl() + collection + "?ql=" + encodeURIComponent(query) + "&limit=" + numberOfEntities.toString());
request.get({
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9ceae774/stack/rest_integration_tests/test/entities/create.js
----------------------------------------------------------------------
diff --git a/stack/rest_integration_tests/test/entities/create.js b/stack/rest_integration_tests/test/entities/create.js
index 97e820c..c7e6f42 100644
--- a/stack/rest_integration_tests/test/entities/create.js
+++ b/stack/rest_integration_tests/test/entities/create.js
@@ -21,6 +21,9 @@ var config = require('../../config');
module.exports = {
test: function() {
var numberOfRecords = 30;
+ var uuid = require("uuid");
+ var id = "resttest_"+ uuid.v1().toString().replace("-", "");
+
describe("create entities", function() {
it("should create " + numberOfRecords.toString() + " entities in the " + config.entitiesTestCollection + " collection", function(done) {
this.slow(numberOfRecords * 500);
@@ -29,10 +32,29 @@ module.exports = {
body.entities.should.be.an.instanceOf(Array).and.have.lengthOf(numberOfRecords);
body.entities.forEach(function(entity) {
entity.should.have.property("uuid").and.match(/(\w{8}(-\w{4}){3}-\w{12}?)/);
- })
+ });
done();
})
});
+ it("should create " + numberOfRecords.toString() + " entities in the " + id + " collection and check for consistency", function(done) {
+ this.slow(numberOfRecords * 500);
+ entities.createEach(id, numberOfRecords, function(err, bodies) {
+ should(err).be.null;
+ bodies.should.be.an.instanceOf(Array).and.have.lengthOf(numberOfRecords);
+ bodyMap = {};
+ bodies.forEach(function(body){
+ bodyMap[body.uuid] = body;
+ });
+ entities.get(id, numberOfRecords, function (err,entityArray) {
+ should(err).be.null;
+ entityArray.entities.forEach(function(entity){
+ delete(bodyMap[entity.uuid]);
+ });
+ should(Object.keys(bodyMap)).have.lengthOf(0);
+ done();
+ });
+ });
+ });
});
}
};
[32/36] usergrid git commit: change branch in cf config
Posted by sf...@apache.org.
change branch in cf config
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2e51d06d
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2e51d06d
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2e51d06d
Branch: refs/heads/master
Commit: 2e51d06da9a8234a28354ba8769457696ed8f159
Parents: 6626b83
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Sep 29 16:40:29 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Sep 29 16:40:29 2015 -0600
----------------------------------------------------------------------
stack/awscluster/gatling-cluster-cf.json | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2e51d06d/stack/awscluster/gatling-cluster-cf.json
----------------------------------------------------------------------
diff --git a/stack/awscluster/gatling-cluster-cf.json b/stack/awscluster/gatling-cluster-cf.json
index 4cc4ab0..5f2f61b 100644
--- a/stack/awscluster/gatling-cluster-cf.json
+++ b/stack/awscluster/gatling-cluster-cf.json
@@ -77,7 +77,7 @@
"Branch": {
"Description": "The branch of usergrid to check out",
"Type": "String",
- "Default": "two-dot-o-dev"
+ "Default": "master"
}
},
[19/36] usergrid git commit: Merge branch '2.1-release' into
remove-buffer
Posted by sf...@apache.org.
Merge branch '2.1-release' into remove-buffer
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/e6c6c36f
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/e6c6c36f
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/e6c6c36f
Branch: refs/heads/master
Commit: e6c6c36fa3e00514a76d0e690c961f3d73ad2e60
Parents: 28919ae 175d526
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Sep 25 14:18:48 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Sep 25 14:18:48 2015 -0600
----------------------------------------------------------------------
.../services/assets/data/AwsSdkS3BinaryStore.java | 11 ++++-------
1 file changed, 4 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
[14/36] usergrid git commit: add delete queue for cleanup and poll
for messages
Posted by sf...@apache.org.
add delete queue for cleanup and poll for messages
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/a5b51c9a
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/a5b51c9a
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/a5b51c9a
Branch: refs/heads/master
Commit: a5b51c9a8bdc5249fdae1ac618da4c6cd3e0a70d
Parents: 109ce2d
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Sep 25 10:36:40 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Sep 25 11:30:12 2015 -0600
----------------------------------------------------------------------
.../persistence/queue/DefaultQueueManager.java | 5 +++
.../persistence/queue/QueueManager.java | 5 +++
.../queue/impl/SNSQueueManagerImpl.java | 9 +++++
.../queue/impl/SQSQueueManagerImpl.java | 7 ++++
.../persistence/queue/QueueManagerTest.java | 37 ++++++++++++++++----
.../services/queues/ImportQueueManager.java | 5 +++
6 files changed, 61 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/a5b51c9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
index c72e109..d974529 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
@@ -72,4 +72,9 @@ public class DefaultQueueManager implements QueueManager {
String uuid = UUID.randomUUID().toString();
queue.add(new QueueMessage(uuid,"handle_"+uuid,body,"put type here"));
}
+
+ @Override
+ public void deleteQueue() {
+
+ }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/a5b51c9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index 0ec2337..027abb2 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -69,4 +69,9 @@ public interface QueueManager {
* @throws IOException
*/
void sendMessage(Object body)throws IOException;
+
+ /**
+ * purge messages
+ */
+ void deleteQueue();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/a5b51c9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 8fb0f52..bc63f53 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -449,6 +449,15 @@ public class SNSQueueManagerImpl implements QueueManager {
}
+ @Override
+ public void deleteQueue() {
+ logger.warn("Deleting queue: "+getReadQueue().getUrl());
+ sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getReadQueue().getUrl()));
+ logger.warn("Deleting queue: "+getReadQueue().getUrl()+"_dead");
+ sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getReadQueue().getUrl()+"_dead"));
+
+ }
+
@Override
public void commitMessage(final QueueMessage queueMessage) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/a5b51c9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index 075e90c..daa1cb5 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -327,6 +327,13 @@ public class SQSQueueManagerImpl implements QueueManager {
return region;
}
+ @Override
+ public void deleteQueue() {
+ logger.warn("Deleting queue: "+getQueue().getUrl());
+ sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getQueue().getUrl()));
+ }
+
+
/**
* Create the SQS client for the specified settings
http://git-wip-us.apache.org/repos/asf/usergrid/blob/a5b51c9a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index 29e88ce..ac70af6 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -20,7 +20,6 @@
package org.apache.usergrid.persistence.queue;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -63,16 +62,24 @@ public class QueueManagerTest {
protected QueueScope scope;
private QueueManager qm;
+ public static long queueSeed = System.currentTimeMillis();
+
@Before
public void mockApp() {
- this.scope = new QueueScopeImpl( "testQueue", QueueScope.RegionImplementation.LOCAL);
+
+ this.scope = new QueueScopeImpl( "testQueue"+queueSeed++, QueueScope.RegionImplementation.LOCAL);
qm = qmf.getQueueManager(scope);
}
+ @org.junit.After
+ public void cleanup(){
+ qm.deleteQueue();
+ }
+
@Test
- public void send() throws IOException,ClassNotFoundException{
+ public void send() throws Exception{
String value = "bodytest";
qm.sendMessage(value);
List<QueueMessage> messageList = qm.getMessages(1,5000,5000,String.class).toList().toBlocking().last();
@@ -81,13 +88,14 @@ public class QueueManagerTest {
assertTrue(message.getBody().equals(value));
qm.commitMessage(message);
}
+
messageList = qm.getMessages(1,5000,5000,String.class).toList().toBlocking().last();
assertTrue(messageList.size() <= 0);
}
@Test
- public void sendMore() throws IOException,ClassNotFoundException{
+ public void sendMore() throws Exception{
HashMap<String,String> values = new HashMap<>();
values.put("test","Test");
@@ -107,7 +115,7 @@ public class QueueManagerTest {
}
@Test
- public void queueSize() throws IOException,ClassNotFoundException{
+ public void queueSize() throws Exception{
HashMap<String,String> values = new HashMap<>();
values.put("test", "Test");
@@ -115,8 +123,16 @@ public class QueueManagerTest {
bodies.add(values);
long initialDepth = qm.getQueueDepth();
qm.sendMessages(bodies);
- long depth = qm.getQueueDepth();
+ long depth = 0;
+ for(int i=0; i<10;i++){
+ depth = qm.getQueueDepth();
+ if(depth>0){
+ break;
+ }
+ Thread.sleep(1000);
+ }
assertTrue(depth>0);
+
List<QueueMessage> messageList = qm.getMessages(10,5000,5000,values.getClass()).toList().toBlocking().last();
assertTrue(messageList.size() <= 500);
for(QueueMessage message : messageList){
@@ -125,9 +141,16 @@ public class QueueManagerTest {
if(messageList.size()>0) {
qm.commitMessages(messageList);
}
- depth = qm.getQueueDepth();
+ for(int i=0; i<10;i++){
+ depth = qm.getQueueDepth();
+ if(depth==initialDepth){
+ break;
+ }
+ Thread.sleep(1000);
+ }
assertEquals(initialDepth, depth);
}
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/a5b51c9a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
index d74f688..bca9a49 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
@@ -68,4 +68,9 @@ public class ImportQueueManager implements QueueManager {
public void sendMessage( final Object body ) throws IOException {
}
+
+ @Override
+ public void deleteQueue() {
+
+ }
}
[15/36] usergrid git commit: remove observable
Posted by sf...@apache.org.
remove observable
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/407ebe03
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/407ebe03
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/407ebe03
Branch: refs/heads/master
Commit: 407ebe037beb275c92183b450b0885d2b421a27b
Parents: a5b51c9
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Sep 25 13:35:59 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Sep 25 13:35:59 2015 -0600
----------------------------------------------------------------------
.../usergrid/persistence/index/impl/EsIndexProducerImpl.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/407ebe03/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
index ef59abb..9223293 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
@@ -141,7 +141,7 @@ public class EsIndexProducerImpl implements IndexProducer {
//now that we've processed them all, ack the futures after our last batch comes through
final Observable<IndexOperationMessage> processedIndexOperations =
- requests.lastOrDefault(null).flatMap(lastRequest -> {
+ requests.flatMap(lastRequest -> {
if (lastRequest != null) {
return Observable.just(batch);
} else {
[08/36] usergrid git commit: add subscribe to entity verifier
Posted by sf...@apache.org.
add subscribe to entity verifier
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/a5ece510
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/a5ece510
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/a5ece510
Branch: refs/heads/master
Commit: a5ece510eca65dfd3606825d72975b90c17ea826
Parents: 3ece30f
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Sep 15 15:39:20 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Sep 24 15:01:10 2015 -0600
----------------------------------------------------------------------
.../pipeline/read/search/CandidateEntityFilter.java | 5 +++--
.../persistence/index/impl/EsIndexBufferConsumerImpl.java | 2 +-
.../usergrid/persistence/index/impl/IndexBufferConsumer.java | 2 ++
3 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/a5ece510/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
index c056b33..14c880f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
@@ -61,7 +61,8 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
@Inject
public CandidateEntityFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
final EntityIndexFactory entityIndexFactory,
- final IndexLocationStrategyFactory indexLocationStrategyFactory) {
+ final IndexLocationStrategyFactory indexLocationStrategyFactory
+ ) {
this.entityCollectionManagerFactory = entityCollectionManagerFactory;
this.entityIndexFactory = entityIndexFactory;
this.indexLocationStrategyFactory = indexLocationStrategyFactory;
@@ -170,7 +171,7 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
validate( candidateResult );
}
- batch.execute();
+ batch.execute().subscribe();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/a5ece510/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 514e6eb..d126b5d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicLong;
import com.codahale.metrics.Histogram;
import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -91,7 +92,6 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
}
-
public Observable<IndexOperationMessage> put( IndexOperationMessage message ) {
Preconditions.checkNotNull(message, "Message cannot be null");
indexSizeCounter.inc(message.getDeIndexRequests().size());
http://git-wip-us.apache.org/repos/asf/usergrid/blob/a5ece510/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
index df2119c..cfeb505 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
@@ -18,6 +18,7 @@
package org.apache.usergrid.persistence.index.impl;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
import rx.Observable;
import java.util.List;
@@ -35,4 +36,5 @@ public interface IndexBufferConsumer {
* @return
*/
Observable<IndexOperationMessage> put(IndexOperationMessage message);
+
}
[10/36] usergrid git commit: Fixes NPE from missing org during testing
Posted by sf...@apache.org.
Fixes NPE from missing org during testing
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/52335673
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/52335673
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/52335673
Branch: refs/heads/master
Commit: 52335673b3f9b1dd63300654785a28c9278d22ea
Parents: 0b243c4
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Sep 24 18:32:04 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Sep 24 18:34:14 2015 -0600
----------------------------------------------------------------------
.../usergrid/management/AppInfoMigrationPlugin.java | 11 +++++++++++
1 file changed, 11 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/52335673/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java b/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java
index ff6bbe3..b78b646 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java
@@ -178,6 +178,8 @@ public class AppInfoMigrationPlugin implements MigrationPlugin {
final String name = ( String ) oldAppInfoMap.get( PROPERTY_NAME );
+ logger.info( "Attempting to migrate app {}", name );
+
try {
final String orgName = name.split( "/" )[0];
final String appName = name.split( "/" )[1];
@@ -190,6 +192,15 @@ public class AppInfoMigrationPlugin implements MigrationPlugin {
//avoid management org
EntityRef orgRef = managementEm.getAlias( Group.ENTITY_TYPE, orgName );
+
+ /**
+ * No op, we couldn't find the org, so we can't roll the app forward
+ */
+ if(orgRef == null){
+ logger.error( "Unable to retrieve ref for org {}. Not migrating app {}", orgName, appName );
+ return;
+ }
+
// create and connect new APPLICATION_INFO oldAppInfo to Organization
managementService.createApplication( orgRef.getUuid(), name, applicationId, null );
[27/36] usergrid git commit: rewrite observable
Posted by sf...@apache.org.
rewrite observable
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/d73f98db
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/d73f98db
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/d73f98db
Branch: refs/heads/master
Commit: d73f98dbfaec511c74c7801ff046755f29da3a69
Parents: 0c4c1ab
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Sep 28 15:59:48 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Sep 28 15:59:48 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 81 ++++++++++++--------
1 file changed, 50 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d73f98db/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 5f681e7..50b210e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -232,56 +232,75 @@ public class AmazonAsyncEventService implements AsyncEventService {
logger.debug("handleMessages with {} message", messages.size());
}
- Observable<IndexEventResult> merged = Observable.empty();
- for (QueueMessage message : messages) {
+ Observable<IndexEventResult> masterObservable = Observable.from(messages).flatMap(message -> {
final AsyncEvent event = (AsyncEvent) message.getBody();
logger.debug("Processing {} event", event);
if (event == null) {
logger.error("AsyncEvent type or event is null!");
- continue;
+ return Observable.empty();
}
+ try {
+ //merge each operation to a master observable;
+ if (event instanceof EdgeDeleteEvent) {
+ return handleIndexOperation(message, queueMessage -> handleEdgeDelete(queueMessage));
+ } else if (event instanceof EdgeIndexEvent) {
+ return handleIndexOperation(message, queueMessage -> handleEdgeIndex(queueMessage));
+ } else if (event instanceof EntityDeleteEvent) {
+ return handleIndexOperation(message, queueMessage -> handleEntityDelete(queueMessage));
+ } else if (event instanceof EntityIndexEvent) {
+ return handleIndexOperation(message, queueMessage -> handleEntityIndexUpdate(queueMessage));
+ } else if (event instanceof InitializeApplicationIndexEvent) {
+ //does not return observable
+ handleInitializeApplicationIndex(message);
+ return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), true));
+ } else {
+ logger.error("Unknown EventType: {}", event);
+ return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), false));
+ }
+ }catch (Exception e){
+ logger.error("Failed to index entity", e,message);
+ return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), false));
+ }finally {
+ messageCycle.update(System.currentTimeMillis() - event.getCreationTime());
- if (event instanceof EdgeDeleteEvent) {
- merged = merged.mergeWith(callHandleIndex(message, queueMessage -> handleEdgeDelete(queueMessage)));
- } else if (event instanceof EdgeIndexEvent) {
- merged = merged.mergeWith(callHandleIndex(message, queueMessage -> handleEdgeIndex(queueMessage)));
- } else if (event instanceof EntityDeleteEvent) {
- merged = merged.mergeWith( callHandleIndex(message, queueMessage -> handleEntityDelete(queueMessage)));
- } else if (event instanceof EntityIndexEvent) {
- merged = merged.mergeWith(callHandleIndex(message, queueMessage -> handleEntityIndexUpdate(queueMessage)));
- } else if (event instanceof InitializeApplicationIndexEvent) {
- //does not return observable
- handleInitializeApplicationIndex(message);
- } else {
- logger.error("Unknown EventType: {}", event);
}
+ });
- messageCycle.update(System.currentTimeMillis() - event.getCreationTime());
- }
-
- merged
+ masterObservable
+ //remove unsuccessful
.filter(indexEventResult -> indexEventResult.success() && indexEventResult.getIndexOperationMessage().isPresent())
+ //take the max
.buffer(MAX_TAKE)
- .flatMap(indexEventResults -> {
+ //map them to index results and return them
+ .map(indexEventResults -> {
IndexOperationMessage combined = new IndexOperationMessage();
- Observable.from(indexEventResults)
- .doOnNext(indexEventResult -> combined.ingest(indexEventResult.getIndexOperationMessage().get())).subscribe();
- indexProducer.put(combined).subscribe();
- return Observable.from(indexEventResults);
+ indexEventResults.stream()
+ .forEach(indexEventResult -> combined.ingest(indexEventResult.getIndexOperationMessage().get()));
+ indexProducer.put(combined).subscribe();//execute the index operation
+ return indexEventResults;
+ })
+ //flat map the ops so they are back to individual
+ .flatMap(indexEventResults -> Observable.from(indexEventResults))
+ //ack each message
+ .map(indexEventResult -> {
+ ack(indexEventResult.queueMessage);
+ return indexEventResult;
})
- .doOnNext(indexEventResult ->ack(indexEventResult.queueMessage))
.subscribe();
}
- private Observable<IndexEventResult> callHandleIndex(QueueMessage message, Func1<QueueMessage, Observable<IndexOperationMessage>> toCall){
+ //transform index operation to
+ private Observable<IndexEventResult> handleIndexOperation(QueueMessage queueMessage,
+ Func1<QueueMessage, Observable<IndexOperationMessage>> operation
+ ){
try{
- IndexOperationMessage indexOperationMessage = toCall.call(message).toBlocking().lastOrDefault(null);
- return Observable.just(new IndexEventResult(message,Optional.fromNullable(indexOperationMessage),true));
+ return operation.call(queueMessage)
+ .map(indexOperationMessage -> new IndexEventResult(queueMessage, Optional.fromNullable(indexOperationMessage), true));
}catch (Exception e){
logger.error("failed to run index",e);
- return Observable.just( new IndexEventResult(message, Optional.<IndexOperationMessage>absent(),false));
+ return Observable.just( new IndexEventResult(queueMessage, Optional.<IndexOperationMessage>absent(),false));
}
}
@@ -548,7 +567,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
private void subscribeAndAck( final Observable<?> observable, final QueueMessage message ){
observable.doOnCompleted( ()-> ack(message) ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
}
- public static class IndexEventResult{
+ public class IndexEventResult{
private final QueueMessage queueMessage;
private final Optional<IndexOperationMessage> indexOperationMessage;
private final boolean success;
[07/36] usergrid git commit: removing async processing
Posted by sf...@apache.org.
removing async processing
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/3ece30f0
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/3ece30f0
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/3ece30f0
Branch: refs/heads/master
Commit: 3ece30f018bbd28feea53537b62724b247904bfa
Parents: 2a16827
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Sep 15 15:09:28 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Sep 24 15:00:59 2015 -0600
----------------------------------------------------------------------
.../apache/usergrid/corepersistence/CoreModule.java | 4 ----
.../usergrid/corepersistence/CpEntityManager.java | 9 ---------
.../corepersistence/index/IndexServiceImpl.java | 4 ----
.../corepersistence/StaleIndexCleanupTest.java | 14 +++++---------
.../index/impl/EsEntityIndexBatchImpl.java | 5 +----
.../index/impl/EsIndexBufferConsumerImpl.java | 4 +---
6 files changed, 7 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ece30f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index 486aa6f..959edec 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -77,14 +77,10 @@ public class CoreModule extends AbstractModule {
- public static final String EVENTS_DISABLED = "corepersistence.events.disabled";
-
-
@Override
protected void configure() {
-
install( new CommonModule());
install( new CollectionModule() {
/**
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ece30f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index dc3d6a2..d46c112 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -2627,15 +2627,6 @@ public class CpEntityManager implements EntityManager {
handleWriteUniqueVerifyException( entity, wuve );
}
-
- // Index CP entity into default collection scope
- // IndexScope defaultIndexScope = new IndexScopeImpl(
- // applicationScope.getApplication(),
- // applicationScope.getApplication(),
- // CpEntityManager.getCollectionScopeNameFromEntityType( entity.getType() ) );
- // EntityIndex ei = managerCache.getEntityIndex( applicationScope );
- // ei.createBatch().index( defaultIndexScope, cpEntity ).execute();
-
// reflect changes in the legacy Entity
entity.setUuid( cpEntity.getId().getUuid() );
entity.setProperties( cpEntity );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ece30f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 3ca33a3..8a8dba1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -104,10 +104,6 @@ public class IndexServiceImpl implements IndexService {
//do our observable for batching
//try to send a whole batch if we can
-
-
- //do our observable for batching
- //try to send a whole batch if we can
final Observable<IndexOperationMessage> batches = sourceEdgesToIndex.buffer( indexFig.getIndexBatchSize() )
//map into batches based on our buffer size
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ece30f0/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index 832409b..366d41c 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -60,7 +60,6 @@ import com.google.inject.Injector;
import net.jcip.annotations.NotThreadSafe;
-import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED;
import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
import static org.junit.Assert.assertEquals;
@@ -73,6 +72,7 @@ import static org.junit.Assert.assertTrue;
@NotThreadSafe
public class StaleIndexCleanupTest extends AbstractCoreIT {
private static final Logger logger = LoggerFactory.getLogger( StaleIndexCleanupTest.class );
+ public static final String EVENTS_DISABLED = "corepersistence.events.disabled";
// take it easy on Cassandra
private static final long writeDelayMs = 0;
@@ -379,7 +379,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
app.refreshIndex();
Thread.sleep(250); // refresh happens asynchronously, wait for some time
-
+
//we can't use our candidate result sets here. The repair won't happen since we now have orphaned documents in our index
//us the EM so the repair process happens
@@ -401,7 +401,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
* Test that the EntityDeleteImpl cleans up stale indexes on update. Ensures that when an
* entity is updated its old indexes are cleared from ElasticSearch.
*/
- @Test(timeout=30000)
+ @Test()
public void testCleanupOnUpdate() throws Exception {
logger.info( "Started testCleanupOnUpdate()" );
@@ -425,20 +425,17 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
app.refreshIndex();
CandidateResults crs = queryCollectionCp( "dogs", "dog", "select *");
- Assert.assertEquals( "Expect no stale candidates yet", numEntities, crs.size() );
+ Assert.assertEquals("Expect no stale candidates yet", numEntities, crs.size());
// turn off post processing stuff that cleans up stale entities
- System.setProperty( EVENTS_DISABLED, "false" );
// update each entity a bunch of times
- List<Entity> maxVersions = new ArrayList<>(numEntities);
int count = 0;
for ( Entity dog : dogs ) {
- Entity toUpdate = null;
for ( int j=0; j<numUpdates; j++) {
- toUpdate = em.get( dog.getUuid() );
+ Entity toUpdate = em.get( dog.getUuid() );
toUpdate.setProperty( "property", RandomStringUtils.randomAlphanumeric(10));
em.update(toUpdate);
count++;
@@ -447,7 +444,6 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
}
}
- maxVersions.add( toUpdate );
}
app.refreshIndex();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ece30f0/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 87b2368..c11feed 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -123,10 +123,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
@Override
public Observable execute() {
- IndexOperationMessage tempContainer = container;
- container = new IndexOperationMessage();
-
- return indexBatchBufferProducer.put( tempContainer );
+ return indexBatchBufferProducer.put( container );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ece30f0/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 93c0d85..514e6eb 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -96,7 +96,6 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
Preconditions.checkNotNull(message, "Message cannot be null");
indexSizeCounter.inc(message.getDeIndexRequests().size());
indexSizeCounter.inc(message.getIndexRequests().size());
-
return processBatch(message);
}
@@ -185,8 +184,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
try {
responses = bulkRequest.execute().actionGet( );
- }
- catch ( Throwable t ) {
+ } catch ( Throwable t ) {
log.error( "Unable to communicate with elasticsearch" );
failureMonitor.fail( "Unable to execute batch", t );
throw t;
[12/36] usergrid git commit: add delete queue for cleanup and poll
for messages
Posted by sf...@apache.org.
add delete queue for cleanup and poll for messages
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/c7a6ebf0
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/c7a6ebf0
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/c7a6ebf0
Branch: refs/heads/master
Commit: c7a6ebf0329a95e6813667d81fa387ecf0240d14
Parents: da6afb1
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Sep 25 10:36:40 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Sep 25 10:36:40 2015 -0600
----------------------------------------------------------------------
.../persistence/queue/DefaultQueueManager.java | 5 +++
.../persistence/queue/QueueManager.java | 5 +++
.../queue/impl/SNSQueueManagerImpl.java | 9 +++++
.../queue/impl/SQSQueueManagerImpl.java | 7 ++++
.../persistence/queue/QueueManagerTest.java | 37 ++++++++++++++++----
.../services/queues/ImportQueueManager.java | 5 +++
6 files changed, 61 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/c7a6ebf0/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
index c72e109..d974529 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
@@ -72,4 +72,9 @@ public class DefaultQueueManager implements QueueManager {
String uuid = UUID.randomUUID().toString();
queue.add(new QueueMessage(uuid,"handle_"+uuid,body,"put type here"));
}
+
+ @Override
+ public void deleteQueue() {
+
+ }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/c7a6ebf0/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index 0ec2337..027abb2 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -69,4 +69,9 @@ public interface QueueManager {
* @throws IOException
*/
void sendMessage(Object body)throws IOException;
+
+ /**
+ * purge messages
+ */
+ void deleteQueue();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/c7a6ebf0/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 8fb0f52..bc63f53 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -449,6 +449,15 @@ public class SNSQueueManagerImpl implements QueueManager {
}
+ @Override
+ public void deleteQueue() {
+ logger.warn("Deleting queue: "+getReadQueue().getUrl());
+ sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getReadQueue().getUrl()));
+ logger.warn("Deleting queue: "+getReadQueue().getUrl()+"_dead");
+ sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getReadQueue().getUrl()+"_dead"));
+
+ }
+
@Override
public void commitMessage(final QueueMessage queueMessage) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/c7a6ebf0/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index 075e90c..daa1cb5 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -327,6 +327,13 @@ public class SQSQueueManagerImpl implements QueueManager {
return region;
}
+ @Override
+ public void deleteQueue() {
+ logger.warn("Deleting queue: "+getQueue().getUrl());
+ sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getQueue().getUrl()));
+ }
+
+
/**
* Create the SQS client for the specified settings
http://git-wip-us.apache.org/repos/asf/usergrid/blob/c7a6ebf0/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index 29e88ce..ac70af6 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -20,7 +20,6 @@
package org.apache.usergrid.persistence.queue;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -63,16 +62,24 @@ public class QueueManagerTest {
protected QueueScope scope;
private QueueManager qm;
+ public static long queueSeed = System.currentTimeMillis();
+
@Before
public void mockApp() {
- this.scope = new QueueScopeImpl( "testQueue", QueueScope.RegionImplementation.LOCAL);
+
+ this.scope = new QueueScopeImpl( "testQueue"+queueSeed++, QueueScope.RegionImplementation.LOCAL);
qm = qmf.getQueueManager(scope);
}
+ @org.junit.After
+ public void cleanup(){
+ qm.deleteQueue();
+ }
+
@Test
- public void send() throws IOException,ClassNotFoundException{
+ public void send() throws Exception{
String value = "bodytest";
qm.sendMessage(value);
List<QueueMessage> messageList = qm.getMessages(1,5000,5000,String.class).toList().toBlocking().last();
@@ -81,13 +88,14 @@ public class QueueManagerTest {
assertTrue(message.getBody().equals(value));
qm.commitMessage(message);
}
+
messageList = qm.getMessages(1,5000,5000,String.class).toList().toBlocking().last();
assertTrue(messageList.size() <= 0);
}
@Test
- public void sendMore() throws IOException,ClassNotFoundException{
+ public void sendMore() throws Exception{
HashMap<String,String> values = new HashMap<>();
values.put("test","Test");
@@ -107,7 +115,7 @@ public class QueueManagerTest {
}
@Test
- public void queueSize() throws IOException,ClassNotFoundException{
+ public void queueSize() throws Exception{
HashMap<String,String> values = new HashMap<>();
values.put("test", "Test");
@@ -115,8 +123,16 @@ public class QueueManagerTest {
bodies.add(values);
long initialDepth = qm.getQueueDepth();
qm.sendMessages(bodies);
- long depth = qm.getQueueDepth();
+ long depth = 0;
+ for(int i=0; i<10;i++){
+ depth = qm.getQueueDepth();
+ if(depth>0){
+ break;
+ }
+ Thread.sleep(1000);
+ }
assertTrue(depth>0);
+
List<QueueMessage> messageList = qm.getMessages(10,5000,5000,values.getClass()).toList().toBlocking().last();
assertTrue(messageList.size() <= 500);
for(QueueMessage message : messageList){
@@ -125,9 +141,16 @@ public class QueueManagerTest {
if(messageList.size()>0) {
qm.commitMessages(messageList);
}
- depth = qm.getQueueDepth();
+ for(int i=0; i<10;i++){
+ depth = qm.getQueueDepth();
+ if(depth==initialDepth){
+ break;
+ }
+ Thread.sleep(1000);
+ }
assertEquals(initialDepth, depth);
}
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/c7a6ebf0/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
index d74f688..bca9a49 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
@@ -68,4 +68,9 @@ public class ImportQueueManager implements QueueManager {
public void sendMessage( final Object body ) throws IOException {
}
+
+ @Override
+ public void deleteQueue() {
+
+ }
}
[21/36] usergrid git commit: remove index batch execute
Posted by sf...@apache.org.
remove index batch execute
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/4c263b87
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/4c263b87
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/4c263b87
Branch: refs/heads/master
Commit: 4c263b870d0b45f53e198cad876373485c188669
Parents: a770024
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Sep 28 10:12:54 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Sep 28 10:12:54 2015 -0600
----------------------------------------------------------------------
.../corepersistence/CpManagerCache.java | 12 ++++-
.../corepersistence/CpRelationManager.java | 4 +-
.../usergrid/corepersistence/ManagerCache.java | 7 +++
.../corepersistence/index/IndexServiceImpl.java | 8 ++--
.../read/search/CandidateEntityFilter.java | 16 +++++--
.../pipeline/read/search/CandidateIdFilter.java | 16 +++++--
.../persistence/index/EntityIndexBatch.java | 7 ++-
.../index/impl/EsEntityIndexBatchImpl.java | 8 ++--
.../index/impl/EsEntityIndexFactoryImpl.java | 4 --
.../index/impl/EsEntityIndexImpl.java | 5 +-
.../persistence/index/impl/EntityIndexTest.java | 48 +++++++++++---------
.../persistence/index/impl/GeoPagingTest.java | 4 +-
.../index/impl/IndexLoadTestsIT.java | 5 +-
13 files changed, 88 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
index 89f2ab2..0408bbd 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
@@ -26,6 +26,7 @@ import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.IndexLocationStrategy;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.apache.usergrid.persistence.map.MapManager;
import org.apache.usergrid.persistence.map.MapManagerFactory;
import org.apache.usergrid.persistence.map.MapScope;
@@ -43,6 +44,7 @@ public class CpManagerCache implements ManagerCache {
private final GraphManagerFactory gmf;
private final MapManagerFactory mmf;
private final IndexLocationStrategyFactory indexLocationStrategyFactory;
+ private final IndexProducer indexProducer;
// TODO: consider making these cache sizes and timeouts configurable
@@ -52,13 +54,16 @@ public class CpManagerCache implements ManagerCache {
final EntityIndexFactory eif,
final GraphManagerFactory gmf,
final MapManagerFactory mmf,
- final IndexLocationStrategyFactory indexLocationStrategyFactory) {
+ final IndexLocationStrategyFactory indexLocationStrategyFactory,
+ final IndexProducer indexProducer
+ ) {
this.ecmf = ecmf;
this.eif = eif;
this.gmf = gmf;
this.mmf = mmf;
this.indexLocationStrategyFactory = indexLocationStrategyFactory;
+ this.indexProducer = indexProducer;
}
@@ -88,6 +93,11 @@ public class CpManagerCache implements ManagerCache {
return mmf.createMapManager( mapScope );
}
+ @Override
+ public IndexProducer getIndexProducer() {
+ return indexProducer;
+ }
+
@Override
public void invalidate() {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index de687b3..b4b39a0 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -535,9 +535,7 @@ public class CpRelationManager implements RelationManager {
batch.deindex( indexScope, memberEntity );
-
- batch.execute();
-
+ managerCache.getIndexProducer().put( batch.build()).subscribe();
// special handling for roles collection of a group
if ( headEntity.getType().equals( Group.ENTITY_TYPE ) ) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagerCache.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagerCache.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagerCache.java
index 6425b61..1dee80a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagerCache.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagerCache.java
@@ -25,6 +25,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.IndexLocationStrategy;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.apache.usergrid.persistence.map.MapManager;
import org.apache.usergrid.persistence.map.MapScope;
@@ -66,6 +67,12 @@ public interface ManagerCache {
MapManager getMapManager(MapScope mapScope);
/**
+ * gets index producer
+ * @return
+ */
+ IndexProducer getIndexProducer();
+
+ /**
* invalidate the cache
*/
void invalidate();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 8a8dba1..d160aac 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -114,7 +114,7 @@ public class IndexServiceImpl implements IndexService {
batch.index( indexEdge, entity );
} )
//return the future from the batch execution
- .flatMap( batch -> batch.execute() ) );
+ .flatMap( batch -> Observable.just(batch.build()) ) );
return ObservableTimer.time( batches, indexTimer );
}
@@ -142,7 +142,7 @@ public class IndexServiceImpl implements IndexService {
batch.index( indexEdge, entity );
- return batch.execute();
+ return Observable.just(batch.build());
} );
return ObservableTimer.time( batches, addTimer );
@@ -185,7 +185,7 @@ public class IndexServiceImpl implements IndexService {
batch = deindexBatchIteratorResolver( fromTarget, sourceEdgesToBeDeindexed, batch );
- return batch.execute();
+ return Observable.just(batch.build());
} );
return ObservableTimer.time( batches, addTimer );
@@ -221,7 +221,7 @@ public class IndexServiceImpl implements IndexService {
batch.deindex( searchEdge, candidateResult );
} )
//return the future from the batch execution
- .flatMap( batch -> batch.execute() );
+ .flatMap( batch ->Observable.just(batch.build()) );
return ObservableTimer.time(batches, indexTimer);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
index 14c880f..ceb18ae 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
@@ -24,6 +24,7 @@ import java.util.*;
import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
import org.apache.usergrid.persistence.index.*;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.apache.usergrid.persistence.model.field.Field;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,16 +57,19 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
private final EntityIndexFactory entityIndexFactory;
private final IndexLocationStrategyFactory indexLocationStrategyFactory;
+ private final IndexProducer indexProducer;
@Inject
public CandidateEntityFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
final EntityIndexFactory entityIndexFactory,
- final IndexLocationStrategyFactory indexLocationStrategyFactory
+ final IndexLocationStrategyFactory indexLocationStrategyFactory,
+ final IndexProducer indexProducer
) {
this.entityCollectionManagerFactory = entityCollectionManagerFactory;
this.entityIndexFactory = entityIndexFactory;
this.indexLocationStrategyFactory = indexLocationStrategyFactory;
+ this.indexProducer = indexProducer;
}
@@ -108,7 +112,7 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
//now we have a collection, validate our canidate set is correct.
return entitySets.map(
entitySet -> new EntityVerifier(
- applicationIndex.createBatch(), entitySet, candidateResults)
+ applicationIndex.createBatch(), entitySet, candidateResults,indexProducer)
)
.doOnNext(entityCollector -> entityCollector.merge())
.flatMap(entityCollector -> Observable.from(entityCollector.getResults()))
@@ -150,14 +154,17 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
private final EntityIndexBatch batch;
private final List<FilterResult<Candidate>> candidateResults;
+ private final IndexProducer indexProducer;
private final EntitySet entitySet;
public EntityVerifier( final EntityIndexBatch batch, final EntitySet entitySet,
- final List<FilterResult<Candidate>> candidateResults ) {
+ final List<FilterResult<Candidate>> candidateResults,
+ final IndexProducer indexProducer) {
this.batch = batch;
this.entitySet = entitySet;
this.candidateResults = candidateResults;
+ this.indexProducer = indexProducer;
this.results = new ArrayList<>( entitySet.size() );
}
@@ -171,7 +178,8 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
validate( candidateResult );
}
- batch.execute().subscribe();
+ indexProducer.put(batch.build()).subscribe();
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java
index 3b1c102..b2fd675 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java
@@ -26,6 +26,7 @@ import java.util.UUID;
import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
import org.apache.usergrid.persistence.index.*;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,15 +54,19 @@ public class CandidateIdFilter extends AbstractFilter<FilterResult<Candidate>, F
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
private final EntityIndexFactory entityIndexFactory;
private final IndexLocationStrategyFactory indexLocationStrategyFactory;
+ private final IndexProducer indexProducer;
@Inject
public CandidateIdFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
final EntityIndexFactory entityIndexFactory,
- final IndexLocationStrategyFactory indexLocationStrategyFactory) {
+ final IndexLocationStrategyFactory indexLocationStrategyFactory,
+ final IndexProducer indexProducer
+ ) {
this.entityCollectionManagerFactory = entityCollectionManagerFactory;
this.entityIndexFactory = entityIndexFactory;
this.indexLocationStrategyFactory = indexLocationStrategyFactory;
+ this.indexProducer = indexProducer;
}
@@ -97,7 +102,7 @@ public class CandidateIdFilter extends AbstractFilter<FilterResult<Candidate>, F
return versionSetObservable.map(
entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet,
- candidateResults ) ).doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
+ candidateResults, indexProducer ) ).doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
entityCollector -> Observable.from( entityCollector.collectResults() ) );
} );
@@ -115,14 +120,16 @@ public class CandidateIdFilter extends AbstractFilter<FilterResult<Candidate>, F
private final EntityIndexBatch batch;
private final List<FilterResult<Candidate>> candidateResults;
+ private final IndexProducer indexProducer;
private final VersionSet versionSet;
public EntityCollector( final EntityIndexBatch batch, final VersionSet versionSet,
- final List<FilterResult<Candidate>> candidateResults ) {
+ final List<FilterResult<Candidate>> candidateResults, final IndexProducer indexProducer ) {
this.batch = batch;
this.versionSet = versionSet;
this.candidateResults = candidateResults;
+ this.indexProducer = indexProducer;
this.results = new ArrayList<>( versionSet.size() );
}
@@ -136,7 +143,8 @@ public class CandidateIdFilter extends AbstractFilter<FilterResult<Candidate>, F
validate( candidateResult );
}
- batch.execute();
+ indexProducer.put( batch.build()).subscribe();
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
index d1b076d..98652c1 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
@@ -18,6 +18,7 @@ package org.apache.usergrid.persistence.index;/*
*/
+import java.util.List;
import java.util.UUID;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
@@ -62,11 +63,13 @@ public interface EntityIndexBatch {
EntityIndexBatch deindex( final SearchEdge searchEdge, final Id id, final UUID version );
+
+
/**
- * Execute the batch
+ * get the batches
* @return future to guarantee execution
*/
- Observable<IndexOperationMessage> execute();
+ IndexOperationMessage build();
/**
* Get the number of operations in the batch
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 64a1c6a..68830ca 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -19,6 +19,7 @@
package org.apache.usergrid.persistence.index.impl;
+import java.util.List;
import java.util.UUID;
import org.apache.usergrid.persistence.index.*;
@@ -41,7 +42,6 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
private final IndexAlias alias;
private final IndexLocationStrategy indexLocationStrategy;
- private final IndexProducer indexBatchBufferProducer;
private final EntityIndex entityIndex;
private final ApplicationScope applicationScope;
@@ -49,12 +49,10 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
public EsEntityIndexBatchImpl( final IndexLocationStrategy locationStrategy,
- final IndexProducer indexBatchBufferProducer,
final EntityIndex entityIndex
) {
this.indexLocationStrategy = locationStrategy;
- this.indexBatchBufferProducer = indexBatchBufferProducer;
this.entityIndex = entityIndex;
this.applicationScope = indexLocationStrategy.getApplicationScope();
@@ -122,8 +120,8 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
@Override
- public Observable execute() {
- return indexBatchBufferProducer.put( container );
+ public IndexOperationMessage build() {
+ return container;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
index b66fd40..c91057b 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
@@ -37,7 +37,6 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
private final IndexFig config;
private final IndexCache indexCache;
private final EsProvider provider;
- private final IndexProducer indexProducer;
private final MetricsFactory metricsFactory;
private final IndexRefreshCommand refreshCommand;
@@ -50,7 +49,6 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
config,
refreshCommand,
metricsFactory,
- indexProducer,
locationStrategy
);
index.initialize();
@@ -62,7 +60,6 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
public EsEntityIndexFactoryImpl( final IndexFig indexFig,
final IndexCache indexCache,
final EsProvider provider,
- final IndexProducer indexProducer,
final MetricsFactory metricsFactory,
final IndexRefreshCommand refreshCommand
@@ -70,7 +67,6 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
this.config = indexFig;
this.indexCache = indexCache;
this.provider = provider;
- this.indexProducer = indexProducer;
this.metricsFactory = metricsFactory;
this.refreshCommand = refreshCommand;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 6317a69..f6ebce2 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -116,7 +116,6 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
private final SearchRequestBuilderStrategyV2 searchRequestBuilderStrategyV2;
private final int cursorTimeout;
private final long queryTimeout;
- private final IndexProducer indexBatchBufferProducer;
private final FailureMonitorImpl failureMonitor;
private final Timer aggregationTimer;
@@ -131,13 +130,11 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
final IndexFig indexFig,
final IndexRefreshCommand indexRefreshCommand,
final MetricsFactory metricsFactory,
- final IndexProducer indexBatchBufferProducer,
final IndexLocationStrategy indexLocationStrategy
) {
this.indexFig = indexFig;
this.indexLocationStrategy = indexLocationStrategy;
- this.indexBatchBufferProducer = indexBatchBufferProducer;
this.failureMonitor = new FailureMonitorImpl( indexFig, provider );
this.esProvider = provider;
this.indexRefreshCommand = indexRefreshCommand;
@@ -374,7 +371,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
@Override
public EntityIndexBatch createBatch() {
EntityIndexBatch batch =
- new EsEntityIndexBatchImpl(indexLocationStrategy , indexBatchBufferProducer, this );
+ new EsEntityIndexBatchImpl(indexLocationStrategy, this );
return batch;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index 6348a44..5243d5a 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -82,6 +82,9 @@ public class EntityIndexTest extends BaseIT {
public IndexFig fig;
@Inject
+ public IndexProducer indexProducer;
+
+ @Inject
public CassandraFig cassandraFig;
@Inject
@@ -137,7 +140,7 @@ public class EntityIndexTest extends BaseIT {
batch.index( indexEdge, entity1 );
- batch.execute().toBlocking().last();
+ indexProducer.put(batch.build()).subscribe();
Entity entity2 = new Entity( entityType );
@@ -151,7 +154,7 @@ public class EntityIndexTest extends BaseIT {
batch.index( indexEdge, entity2 );
- batch.execute().toBlocking().last();
+ indexProducer.put(batch.build()).subscribe();;
entityIndex.refreshAsync().toBlocking().first();
@@ -215,7 +218,7 @@ public class EntityIndexTest extends BaseIT {
EntityIndexBatch batch = entityIndex.createBatch();
insertJsonBlob( sampleJson, batch, entityType, indexEdge, size, 0 );
- batch.execute().toBlocking().last();
+ indexProducer.put(batch.build()).subscribe();;
}
catch ( Exception e ) {
synchronized ( failTime ) {
@@ -290,7 +293,7 @@ public class EntityIndexTest extends BaseIT {
EntityIndexBatch entityIndexBatch = entityIndex.createBatch();
entityIndexBatch.deindex(searchEdge, crs.get(0));
- entityIndexBatch.execute().toBlocking().last();
+ indexProducer.put(entityIndexBatch.build()).subscribe();
entityIndex.refreshAsync().toBlocking().first();
//Hilda Youn
@@ -306,7 +309,7 @@ public class EntityIndexTest extends BaseIT {
});
EntityIndexBatch batch = entityIndex.createBatch();
insertJsonBlob(sampleJson, batch, entityType, indexEdge, max, startIndex);
- batch.execute().toBlocking().last();
+ indexProducer.put(batch.build()).subscribe();;
IndexRefreshCommandImpl.IndexRefreshCommandInfo info = entityIndex.refreshAsync().toBlocking().first();
long time = info.getExecutionTime();
log.info("refresh took ms:" + time);
@@ -364,7 +367,7 @@ public class EntityIndexTest extends BaseIT {
EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() );
entity.setField( new UUIDField( IndexingUtils.ENTITY_ID_FIELDNAME, UUID.randomUUID() ) );
- entityIndex.createBatch().index( searchEdge, entity ).execute().toBlocking().last();
+ indexProducer.put(entityIndex.createBatch().index( searchEdge, entity ).build()).subscribe();
entityIndex.refreshAsync().toBlocking().first();
CandidateResults candidateResults = entityIndex
@@ -373,7 +376,7 @@ public class EntityIndexTest extends BaseIT {
EntityIndexBatch batch = entityIndex.createBatch();
batch.deindex( searchEdge, entity );
- batch.execute().toBlocking().last();
+ indexProducer.put(batch.build()).subscribe();;
entityIndex.refreshAsync().toBlocking().first();
candidateResults = entityIndex
@@ -410,7 +413,8 @@ public class EntityIndexTest extends BaseIT {
entity[i].setField( new UUIDField( IndexingUtils.ENTITY_ID_FIELDNAME, entityUUID ) );
//index the new entity. This is where the loop will be set to create like 100 entities.
- entityIndex.createBatch().index( searchEdge, entity[i] ).execute().toBlocking().last();
+ indexProducer.put(entityIndex.createBatch().index( searchEdge, entity[i] ).build()).subscribe();
+
}
entityIndex.refreshAsync().toBlocking().first();
@@ -541,16 +545,16 @@ public class EntityIndexTest extends BaseIT {
EntityIndexBatch batch = entityIndex.createBatch();
batch.index( indexSCope, user );
- batch.execute().toBlocking().last();
+ indexProducer.put(batch.build()).subscribe();;
entityIndex.refreshAsync().toBlocking().first();
final String query = "where username = 'edanuff'";
CandidateResults r = entityIndex.search( indexSCope, SearchTypes.fromTypes( "edanuff" ), query, 10, 0);
- assertEquals( user.getId(), r.get( 0 ).getId() );
+ assertEquals( user.getId(), r.get( 0 ).getId());
batch.deindex( indexSCope, user.getId(), user.getVersion() );
- batch.execute().toBlocking().last();
+ indexProducer.put(batch.build()).subscribe();;
entityIndex.refreshAsync().toBlocking().first();
// EntityRef
@@ -611,7 +615,7 @@ public class EntityIndexTest extends BaseIT {
EntityUtils.setVersion( fred, UUIDGenerator.newTimeUUID() );
batch.index( indexScope, fred);
- batch.execute().toBlocking().last();
+ indexProducer.put(batch.build()).subscribe();;
entityIndex.refreshAsync().toBlocking().first();
final SearchTypes searchTypes = SearchTypes.fromTypes( "user" );
@@ -681,14 +685,14 @@ public class EntityIndexTest extends BaseIT {
EntityUtils.setId( user, userId );
EntityUtils.setVersion( user, UUIDGenerator.newTimeUUID() );
- entityIds.add( userId );
+ entityIds.add(userId );
batch.index( indexEdge, user );
}
- batch.execute().toBlocking().last();
+ indexProducer.put(batch.build()).subscribe();;
entityIndex.refreshAsync().toBlocking().first();
@@ -755,7 +759,7 @@ public class EntityIndexTest extends BaseIT {
EntityIndexBatch batch = entityIndex.createBatch();
batch.index( indexSCope, user );
- batch.execute().toBlocking().last();
+ indexProducer.put(batch.build()).subscribe();;
entityIndex.refreshAsync().toBlocking().first();
final String query = "where searchUUID = " + searchUUID;
@@ -794,7 +798,7 @@ public class EntityIndexTest extends BaseIT {
EntityIndexBatch batch = entityIndex.createBatch();
batch.index(indexSCope, user);
- batch.execute().toBlocking().last();
+ indexProducer.put(batch.build()).subscribe();;
entityIndex.refreshAsync().toBlocking().first();
final String query = "where string = 'I am*'";
@@ -849,9 +853,9 @@ public class EntityIndexTest extends BaseIT {
EntityIndexBatch batch = entityIndex.createBatch();
- batch.index( indexSCope, first );
+ batch.index(indexSCope, first );
batch.index( indexSCope, second );
- batch.execute().toBlocking().last();
+ indexProducer.put(batch.build()).subscribe();;
entityIndex.refreshAsync().toBlocking().first();
@@ -916,7 +920,7 @@ public class EntityIndexTest extends BaseIT {
batch.index(indexScope2, second);
- batch.execute().toBlocking().last();
+ indexProducer.put(batch.build()).subscribe();;
entityIndex.refreshAsync().toBlocking().first();
@@ -997,7 +1001,7 @@ public class EntityIndexTest extends BaseIT {
batch.index( indexScope2, second);
- batch.execute().toBlocking().last();
+ indexProducer.put(batch.build()).subscribe();;
entityIndex.refreshAsync().toBlocking().first();
@@ -1107,7 +1111,7 @@ public class EntityIndexTest extends BaseIT {
batch.index( indexScope2, second);
- batch.execute().toBlocking().last();
+ indexProducer.put(batch.build()).subscribe();;
entityIndex.refreshAsync().toBlocking().first();
@@ -1262,7 +1266,7 @@ public class EntityIndexTest extends BaseIT {
batch.index( indexScope2, second);
- batch.execute().toBlocking().last();
+ indexProducer.put(batch.build()).subscribe();;
entityIndex.refreshAsync().toBlocking().first();
long size = entityIndex.getEntitySize(new SearchEdgeImpl(ownerId,type, SearchEdge.NodeType.SOURCE));
assertTrue( size == 100 );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
index dce69f9..98b85f1 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
@@ -70,6 +70,8 @@ public class GeoPagingTest extends BaseIT {
@Inject
public EntityIndexFactory eif;
+ @Inject
+ IndexProducer indexProducer;
@Inject
@Rule
@@ -128,7 +130,7 @@ public class GeoPagingTest extends BaseIT {
}
- batch.execute().toBlocking().last();
+ indexProducer.put( batch.build()).subscribe();
entityIndex.refreshAsync().toBlocking().last();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
index cca3f0f..1be1195 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
@@ -92,7 +92,7 @@ public class IndexLoadTestsIT extends BaseIT {
public IndexTestFig indexTestFig;
@Inject
- public EntityIndexFactory entityIndexFactory;
+ public IndexProducer indexProducer;
@Inject
public MetricsFactory metricsFactory;
@@ -347,7 +347,8 @@ public class IndexLoadTestsIT extends BaseIT {
//execute
- entityIndexBatch.execute();
+ IndexOperationMessage message = entityIndexBatch.build();
+ indexProducer.put(message);
//stop
time.close();
} ).toBlocking().last();
[26/36] usergrid git commit: add missing subscribe
Posted by sf...@apache.org.
add missing subscribe
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/0c4c1abf
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/0c4c1abf
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/0c4c1abf
Branch: refs/heads/master
Commit: 0c4c1abf3bcbdb1a62d1ec74d892698e7e211d40
Parents: a9dc6ba
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Sep 28 15:13:00 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Sep 28 15:13:00 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c4c1abf/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 82e6d19..5f681e7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -243,15 +243,14 @@ public class AmazonAsyncEventService implements AsyncEventService {
continue;
}
-
if (event instanceof EdgeDeleteEvent) {
- merged = merged.mergeWith(callHandleIndex(queueMessage -> handleEdgeDelete(queueMessage), message));
+ merged = merged.mergeWith(callHandleIndex(message, queueMessage -> handleEdgeDelete(queueMessage)));
} else if (event instanceof EdgeIndexEvent) {
- merged = merged.mergeWith(callHandleIndex(queueMessage -> handleEdgeIndex(queueMessage),message));
+ merged = merged.mergeWith(callHandleIndex(message, queueMessage -> handleEdgeIndex(queueMessage)));
} else if (event instanceof EntityDeleteEvent) {
- merged = merged.mergeWith( callHandleIndex(queueMessage -> handleEntityDelete(queueMessage),message));
+ merged = merged.mergeWith( callHandleIndex(message, queueMessage -> handleEntityDelete(queueMessage)));
} else if (event instanceof EntityIndexEvent) {
- merged = merged.mergeWith(callHandleIndex(queueMessage -> handleEntityIndexUpdate(queueMessage),message));
+ merged = merged.mergeWith(callHandleIndex(message, queueMessage -> handleEntityIndexUpdate(queueMessage)));
} else if (event instanceof InitializeApplicationIndexEvent) {
//does not return observable
handleInitializeApplicationIndex(message);
@@ -272,10 +271,11 @@ public class AmazonAsyncEventService implements AsyncEventService {
indexProducer.put(combined).subscribe();
return Observable.from(indexEventResults);
})
- .doOnNext(indexEventResult ->ack(indexEventResult.queueMessage));
+ .doOnNext(indexEventResult ->ack(indexEventResult.queueMessage))
+ .subscribe();
}
- private Observable<IndexEventResult> callHandleIndex(Func1<QueueMessage,Observable<IndexOperationMessage>> toCall, QueueMessage message){
+ private Observable<IndexEventResult> callHandleIndex(QueueMessage message, Func1<QueueMessage, Observable<IndexOperationMessage>> toCall){
try{
IndexOperationMessage indexOperationMessage = toCall.call(message).toBlocking().lastOrDefault(null);
return Observable.just(new IndexEventResult(message,Optional.fromNullable(indexOperationMessage),true));