You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/10/14 23:50:22 UTC

[07/32] usergrid git commit: add and comment on subscribe

add and comment on subscribe


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/5f20ece6
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/5f20ece6
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/5f20ece6

Branch: refs/heads/master
Commit: 5f20ece66f64305f522f13fa02d34ab2920f7089
Parents: e0d3cd5
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Oct 5 10:57:47 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Oct 5 10:57:47 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      | 24 ++++++++---------
 .../asyncevents/InMemoryAsyncEventService.java  |  7 ++++-
 .../index/ReIndexServiceImpl.java               |  2 +-
 .../migration/DeDupConnectionDataMigration.java |  2 +-
 .../read/search/CandidateEntityFilter.java      |  2 +-
 .../pipeline/read/search/CandidateIdFilter.java |  2 +-
 .../EsIndexMappingMigrationPlugin.java          |  2 +-
 .../usergrid/rest/ApplicationsResource.java     | 28 +++++++++++++-------
 .../management/AppInfoMigrationPlugin.java      |  2 +-
 .../services/AbstractConnectionsService.java    |  2 +-
 10 files changed, 44 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f20ece6/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index b4b39a0..aad7610 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -518,8 +518,8 @@ public class CpRelationManager implements RelationManager {
         //run our delete
         gm.loadEdgeVersions(
             CpNamingUtils.createEdgeFromCollectionName( cpHeadEntity.getId(), collectionName, memberEntity.getId() ) )
-          .flatMap( edge -> gm.markEdge( edge ) ).flatMap( edge -> gm.deleteEdge( edge ) ).toBlocking()
-          .lastOrDefault( null );
+          .flatMap(edge -> gm.markEdge(edge)).flatMap(edge -> gm.deleteEdge(edge)).toBlocking()
+          .lastOrDefault(null);
 
 
         /**
@@ -535,7 +535,7 @@ public class CpRelationManager implements RelationManager {
 
         batch.deindex( indexScope, memberEntity );
 
-        managerCache.getIndexProducer().put( batch.build()).subscribe();
+        managerCache.getIndexProducer().put( batch.build()).toBlocking().lastOrDefault(null); // this should throw an exception
 
         // special handling for roles collection of a group
         if ( headEntity.getType().equals( Group.ENTITY_TYPE ) ) {
@@ -695,7 +695,7 @@ public class CpRelationManager implements RelationManager {
 
         //write new edge
 
-        gm.writeEdge( edge ).subscribe();
+        gm.writeEdge(edge).toBlocking().lastOrDefault(null); //throw an exception if this fails
 
         indexService.queueNewEdge( applicationScope, targetEntity, edge );
 
@@ -707,21 +707,21 @@ public class CpRelationManager implements RelationManager {
 
 
         //load our versions, only retain the most recent one
-        gm.loadEdgeVersions( searchByEdge ).skip( 1 ).flatMap( edgeToDelete -> {
-            if ( logger.isDebugEnabled() ) {
-                logger.debug( "Marking edge {} for deletion", edgeToDelete );
+        gm.loadEdgeVersions(searchByEdge).skip(1).flatMap(edgeToDelete -> {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Marking edge {} for deletion", edgeToDelete);
             }
-            return gm.markEdge( edgeToDelete );
-        } ).lastOrDefault( null ).doOnNext( lastEdge -> {
+            return gm.markEdge(edgeToDelete );
+        }).lastOrDefault(null).doOnNext(lastEdge -> {
             //no op if we hit our default
-            if(lastEdge == null){
+            if (lastEdge == null) {
                 return;
             }
 
             //don't queue delete b/c that de-indexes, we need to delete the edges only since we have a version still existing to index.
 
-            gm.deleteEdge( lastEdge ).subscribe();
-        }).subscribe();
+            gm.deleteEdge(lastEdge).toBlocking().lastOrDefault(null); // this should throw an exception
+        }).toBlocking().lastOrDefault(null);//this should throw an exception
 
 
         return connection;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f20ece6/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index b29c39e..d5a0398 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -137,7 +137,12 @@ public class InMemoryAsyncEventService implements AsyncEventService {
                 return Observable.just(message);
             }
         });
-        mapped.subscribe();
+        if(!resolveSynchronously){
+            mapped.subscribe(); //only subscribe for async
+        }else {
+            mapped.toBlocking().lastOrDefault(null);
+        }
+
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f20ece6/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index 1353982..f108f37 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -141,7 +141,7 @@ public class ReIndexServiceImpl implements ReIndexService {
         runningReIndex.collect(() -> new FlushingCollector(jobId),
             ((flushingCollector, edgeScopes) -> flushingCollector.flushBuffer(edgeScopes))).doOnNext( flushingCollector-> flushingCollector.complete() )
                 //subscribe on our I/O scheduler and run the task
-            .subscribeOn( Schedulers.io() ).subscribe();
+            .subscribeOn( Schedulers.io() ).subscribe(); //want reindex to continually run so leave subscribe.
 
 
         return new ReIndexStatus( jobId, Status.STARTED, 0, 0 );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f20ece6/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/DeDupConnectionDataMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/DeDupConnectionDataMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/DeDupConnectionDataMigration.java
index bec8d6c..77deac8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/DeDupConnectionDataMigration.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/DeDupConnectionDataMigration.java
@@ -72,7 +72,7 @@ public class DeDupConnectionDataMigration implements DataMigration {
         } ).doOnNext( total -> {
             logger.info( "Completed de-duping {} edges", total );
             observer.complete();
-        } ).subscribe();
+        } ).subscribe(); //want this to run through all records
 
         return migrationVersion;
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f20ece6/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
index ceb18ae..9a9636f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
@@ -178,7 +178,7 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
                 validate( candidateResult );
             }
 
-            indexProducer.put(batch.build()).subscribe();
+            indexProducer.put(batch.build()).toBlocking().lastOrDefault(null); // want to rethrow if batch fails
 
         }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f20ece6/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java
index b2fd675..a627b4f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java
@@ -143,7 +143,7 @@ public class CandidateIdFilter extends AbstractFilter<FilterResult<Candidate>, F
                 validate( candidateResult );
             }
 
-            indexProducer.put( batch.build()).subscribe();
+            indexProducer.put( batch.build()).toBlocking().lastOrDefault(null);//want to rethrow if batch fails
 
         }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f20ece6/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexMappingMigrationPlugin.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexMappingMigrationPlugin.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexMappingMigrationPlugin.java
index 5c2e9c1..de32e0e 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexMappingMigrationPlugin.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexMappingMigrationPlugin.java
@@ -94,7 +94,7 @@ public class EsIndexMappingMigrationPlugin implements MigrationPlugin {
                     migrationInfoSerialization.setVersion(getName(), getMaxVersion());
                     observer.complete();
                 })
-                .subscribe();
+                .subscribe(); //should run through
 
 
         }catch (Exception ee){

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f20ece6/stack/rest/src/main/java/org/apache/usergrid/rest/ApplicationsResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/ApplicationsResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/ApplicationsResource.java
index 6127efc..55cf0f6 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/ApplicationsResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/ApplicationsResource.java
@@ -105,21 +105,27 @@ public class ApplicationsResource extends AbstractContextResource {
                                 Map<String,Object> map = new LinkedHashMap<>();
                                 map.put("count",itemsDeleted.intValue());
                                 final StatusService statusService = injector.getInstance(StatusService.class);
-                                statusService.setStatus(applicationId, jobId, StatusService.Status.INPROGRESS,map).subscribe();
+                                statusService.setStatus(applicationId, jobId, StatusService.Status.INPROGRESS,map)
+                                    .subscribe();//do not want to throw this exception
                             }
                         })
-                        .doOnCompleted(() ->{
-                            Map<String,Object> map = new LinkedHashMap<>();
-                            map.put("count",itemsDeleted.intValue());
+                        .doOnCompleted(() -> {
+                            Map<String, Object> map = new LinkedHashMap<>();
+                            map.put("count", itemsDeleted.intValue());
                             final StatusService statusService = injector.getInstance(StatusService.class);
-                            statusService.setStatus(applicationId,jobId, StatusService.Status.COMPLETE,map).subscribe();
+                            statusService.setStatus(applicationId, jobId, StatusService.Status.COMPLETE, map)
+                                .toBlocking().lastOrDefault(null);//want to rethrow this exception
                         })
-                        .subscribe();
+                        .toBlocking().lastOrDefault(null);//expecting exception to be caught if job fails
 
                 } catch ( Exception e ) {
                     Map<String,Object> map = new LinkedHashMap<>();
                     map.put("exception",e);
-                    statusService.setStatus(applicationId,jobId, StatusService.Status.FAILED,map).subscribe();
+                    try {
+                        statusService.setStatus(applicationId, jobId, StatusService.Status.FAILED, map).toBlocking().lastOrDefault(null);//leave as subscribe if fails retry
+                    }catch (Exception subE){
+                        logger.error("failed to update status "+jobId,subE);
+                    }
                     logger.error( "Failed to delete appid:"+applicationId + " jobid:"+jobId+" count:"+itemsDeleted, e );
                 }
             }
@@ -129,8 +135,12 @@ public class ApplicationsResource extends AbstractContextResource {
         delete.setDaemon(true);
         delete.start();
 
-        statusService.setStatus(applicationId,jobId, StatusService.Status.STARTED,new LinkedHashMap<>()).subscribe();
-
+        try {
+            //should throw exception if can't start
+            statusService.setStatus(applicationId, jobId, StatusService.Status.STARTED, new LinkedHashMap<>()).toBlocking().lastOrDefault(null);
+        }catch (Exception e){
+            logger.error("failed to set status for " + jobId, e);
+        }
         Map<String,Object> data = new HashMap<>();
         data.put("jobId",jobId);
         data.put("status",StatusService.Status.STARTED);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f20ece6/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java b/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java
index b78b646..d9d3d0d 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java
@@ -165,7 +165,7 @@ public class AppInfoMigrationPlugin implements MigrationPlugin {
             .doOnCompleted( () -> {
                 migrationInfoSerialization.setVersion( getName(), getMaxVersion() );
                 observer.complete();
-            } ).subscribe();
+            } ).subscribe();//let this run through since it handles errors
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f20ece6/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java b/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java
index ee322d2..cd56d1d 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java
@@ -459,7 +459,7 @@ public class AbstractConnectionsService extends AbstractService {
                         throw new RuntimeException( "Unable to save connection", e );
                     }
                 }).subscribeOn( Schedulers.io() );
-            }, 10).subscribe();
+            }, 10).toBlocking().lastOrDefault(null); //needs to rethrow
 
 
         }