You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/10/14 23:50:22 UTC
[07/32] usergrid git commit: add and comment on subscribe
add and comment on subscribe
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/5f20ece6
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/5f20ece6
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/5f20ece6
Branch: refs/heads/master
Commit: 5f20ece66f64305f522f13fa02d34ab2920f7089
Parents: e0d3cd5
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Oct 5 10:57:47 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Oct 5 10:57:47 2015 -0600
----------------------------------------------------------------------
.../corepersistence/CpRelationManager.java | 24 ++++++++---------
.../asyncevents/InMemoryAsyncEventService.java | 7 ++++-
.../index/ReIndexServiceImpl.java | 2 +-
.../migration/DeDupConnectionDataMigration.java | 2 +-
.../read/search/CandidateEntityFilter.java | 2 +-
.../pipeline/read/search/CandidateIdFilter.java | 2 +-
.../EsIndexMappingMigrationPlugin.java | 2 +-
.../usergrid/rest/ApplicationsResource.java | 28 +++++++++++++-------
.../management/AppInfoMigrationPlugin.java | 2 +-
.../services/AbstractConnectionsService.java | 2 +-
10 files changed, 44 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f20ece6/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index b4b39a0..aad7610 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -518,8 +518,8 @@ public class CpRelationManager implements RelationManager {
//run our delete
gm.loadEdgeVersions(
CpNamingUtils.createEdgeFromCollectionName( cpHeadEntity.getId(), collectionName, memberEntity.getId() ) )
- .flatMap( edge -> gm.markEdge( edge ) ).flatMap( edge -> gm.deleteEdge( edge ) ).toBlocking()
- .lastOrDefault( null );
+ .flatMap(edge -> gm.markEdge(edge)).flatMap(edge -> gm.deleteEdge(edge)).toBlocking()
+ .lastOrDefault(null);
/**
@@ -535,7 +535,7 @@ public class CpRelationManager implements RelationManager {
batch.deindex( indexScope, memberEntity );
- managerCache.getIndexProducer().put( batch.build()).subscribe();
+ managerCache.getIndexProducer().put( batch.build()).toBlocking().lastOrDefault(null); // this should throw an exception
// special handling for roles collection of a group
if ( headEntity.getType().equals( Group.ENTITY_TYPE ) ) {
@@ -695,7 +695,7 @@ public class CpRelationManager implements RelationManager {
//write new edge
- gm.writeEdge( edge ).subscribe();
+ gm.writeEdge(edge).toBlocking().lastOrDefault(null); //throw an exception if this fails
indexService.queueNewEdge( applicationScope, targetEntity, edge );
@@ -707,21 +707,21 @@ public class CpRelationManager implements RelationManager {
//load our versions, only retain the most recent one
- gm.loadEdgeVersions( searchByEdge ).skip( 1 ).flatMap( edgeToDelete -> {
- if ( logger.isDebugEnabled() ) {
- logger.debug( "Marking edge {} for deletion", edgeToDelete );
+ gm.loadEdgeVersions(searchByEdge).skip(1).flatMap(edgeToDelete -> {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Marking edge {} for deletion", edgeToDelete);
}
- return gm.markEdge( edgeToDelete );
- } ).lastOrDefault( null ).doOnNext( lastEdge -> {
+ return gm.markEdge(edgeToDelete );
+ }).lastOrDefault(null).doOnNext(lastEdge -> {
//no op if we hit our default
- if(lastEdge == null){
+ if (lastEdge == null) {
return;
}
//don't queue delete b/c that de-indexes, we need to delete the edges only since we have a version still existing to index.
- gm.deleteEdge( lastEdge ).subscribe();
- }).subscribe();
+ gm.deleteEdge(lastEdge).toBlocking().lastOrDefault(null); // this should throw an exception
+ }).toBlocking().lastOrDefault(null);//this should throw an exception
return connection;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f20ece6/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index b29c39e..d5a0398 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -137,7 +137,12 @@ public class InMemoryAsyncEventService implements AsyncEventService {
return Observable.just(message);
}
});
- mapped.subscribe();
+ if(!resolveSynchronously){
+ mapped.subscribe(); //only subscribe for async
+ }else {
+ mapped.toBlocking().lastOrDefault(null);
+ }
+
}
@Override
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f20ece6/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index 1353982..f108f37 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -141,7 +141,7 @@ public class ReIndexServiceImpl implements ReIndexService {
runningReIndex.collect(() -> new FlushingCollector(jobId),
((flushingCollector, edgeScopes) -> flushingCollector.flushBuffer(edgeScopes))).doOnNext( flushingCollector-> flushingCollector.complete() )
//subscribe on our I/O scheduler and run the task
- .subscribeOn( Schedulers.io() ).subscribe();
+ .subscribeOn( Schedulers.io() ).subscribe(); //want reindex to continually run so leave subscribe.
return new ReIndexStatus( jobId, Status.STARTED, 0, 0 );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f20ece6/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/DeDupConnectionDataMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/DeDupConnectionDataMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/DeDupConnectionDataMigration.java
index bec8d6c..77deac8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/DeDupConnectionDataMigration.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/DeDupConnectionDataMigration.java
@@ -72,7 +72,7 @@ public class DeDupConnectionDataMigration implements DataMigration {
} ).doOnNext( total -> {
logger.info( "Completed de-duping {} edges", total );
observer.complete();
- } ).subscribe();
+ } ).subscribe(); //want this to run through all records
return migrationVersion;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f20ece6/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
index ceb18ae..9a9636f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
@@ -178,7 +178,7 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
validate( candidateResult );
}
- indexProducer.put(batch.build()).subscribe();
+ indexProducer.put(batch.build()).toBlocking().lastOrDefault(null); // want to rethrow if batch fails
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f20ece6/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java
index b2fd675..a627b4f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java
@@ -143,7 +143,7 @@ public class CandidateIdFilter extends AbstractFilter<FilterResult<Candidate>, F
validate( candidateResult );
}
- indexProducer.put( batch.build()).subscribe();
+ indexProducer.put( batch.build()).toBlocking().lastOrDefault(null);//want to rethrow if batch fails
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f20ece6/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexMappingMigrationPlugin.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexMappingMigrationPlugin.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexMappingMigrationPlugin.java
index 5c2e9c1..de32e0e 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexMappingMigrationPlugin.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexMappingMigrationPlugin.java
@@ -94,7 +94,7 @@ public class EsIndexMappingMigrationPlugin implements MigrationPlugin {
migrationInfoSerialization.setVersion(getName(), getMaxVersion());
observer.complete();
})
- .subscribe();
+ .subscribe(); //should run through
}catch (Exception ee){
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f20ece6/stack/rest/src/main/java/org/apache/usergrid/rest/ApplicationsResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/ApplicationsResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/ApplicationsResource.java
index 6127efc..55cf0f6 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/ApplicationsResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/ApplicationsResource.java
@@ -105,21 +105,27 @@ public class ApplicationsResource extends AbstractContextResource {
Map<String,Object> map = new LinkedHashMap<>();
map.put("count",itemsDeleted.intValue());
final StatusService statusService = injector.getInstance(StatusService.class);
- statusService.setStatus(applicationId, jobId, StatusService.Status.INPROGRESS,map).subscribe();
+ statusService.setStatus(applicationId, jobId, StatusService.Status.INPROGRESS,map)
+ .subscribe();//do not want to throw this exception
}
})
- .doOnCompleted(() ->{
- Map<String,Object> map = new LinkedHashMap<>();
- map.put("count",itemsDeleted.intValue());
+ .doOnCompleted(() -> {
+ Map<String, Object> map = new LinkedHashMap<>();
+ map.put("count", itemsDeleted.intValue());
final StatusService statusService = injector.getInstance(StatusService.class);
- statusService.setStatus(applicationId,jobId, StatusService.Status.COMPLETE,map).subscribe();
+ statusService.setStatus(applicationId, jobId, StatusService.Status.COMPLETE, map)
+ .toBlocking().lastOrDefault(null);//want to rethrow this exception
})
- .subscribe();
+ .toBlocking().lastOrDefault(null);//expecting exception to be caught if job fails
} catch ( Exception e ) {
Map<String,Object> map = new LinkedHashMap<>();
map.put("exception",e);
- statusService.setStatus(applicationId,jobId, StatusService.Status.FAILED,map).subscribe();
+ try {
+ statusService.setStatus(applicationId, jobId, StatusService.Status.FAILED, map).toBlocking().lastOrDefault(null);//leave as subscribe if fails retry
+ }catch (Exception subE){
+ logger.error("failed to update status "+jobId,subE);
+ }
logger.error( "Failed to delete appid:"+applicationId + " jobid:"+jobId+" count:"+itemsDeleted, e );
}
}
@@ -129,8 +135,12 @@ public class ApplicationsResource extends AbstractContextResource {
delete.setDaemon(true);
delete.start();
- statusService.setStatus(applicationId,jobId, StatusService.Status.STARTED,new LinkedHashMap<>()).subscribe();
-
+ try {
+ //should throw exception if can't start
+ statusService.setStatus(applicationId, jobId, StatusService.Status.STARTED, new LinkedHashMap<>()).toBlocking().lastOrDefault(null);
+ }catch (Exception e){
+ logger.error("failed to set status for " + jobId, e);
+ }
Map<String,Object> data = new HashMap<>();
data.put("jobId",jobId);
data.put("status",StatusService.Status.STARTED);
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f20ece6/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java b/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java
index b78b646..d9d3d0d 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java
@@ -165,7 +165,7 @@ public class AppInfoMigrationPlugin implements MigrationPlugin {
.doOnCompleted( () -> {
migrationInfoSerialization.setVersion( getName(), getMaxVersion() );
observer.complete();
- } ).subscribe();
+ } ).subscribe();//let this run through since it handles errors
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f20ece6/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java b/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java
index ee322d2..cd56d1d 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java
@@ -459,7 +459,7 @@ public class AbstractConnectionsService extends AbstractService {
throw new RuntimeException( "Unable to save connection", e );
}
}).subscribeOn( Schedulers.io() );
- }, 10).subscribe();
+ }, 10).toBlocking().lastOrDefault(null); //needs to rethrow
}