You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/11/02 23:56:23 UTC
[32/50] [abbrv] usergrid git commit: Ensure that status is updated
properly.
Ensure that status is updated properly.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f8c703c0
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f8c703c0
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f8c703c0
Branch: refs/heads/USERGRID-909
Commit: f8c703c02c1182ad63ad86587749eb1ae09c202a
Parents: 471dc35
Author: Dave Johnson <sn...@apache.org>
Authored: Thu Oct 29 17:57:32 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Thu Oct 29 17:57:32 2015 -0400
----------------------------------------------------------------------
.../rest/system/ConnectionResource.java | 48 +++++++++++++-------
1 file changed, 32 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c703c0/stack/rest/src/main/java/org/apache/usergrid/rest/system/ConnectionResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/system/ConnectionResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/system/ConnectionResource.java
index 6e683ed..14b79f3 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/system/ConnectionResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/system/ConnectionResource.java
@@ -55,6 +55,7 @@ import com.google.common.base.Preconditions;
import com.sun.jersey.api.json.JSONWithPadding;
import rx.Observable;
+import rx.functions.Action1;
import rx.schedulers.Schedulers;
@@ -146,36 +147,51 @@ public class ConnectionResource extends AbstractContextResource {
//start de duping and run in the background
connectionService.deDupeConnections( applicationScopeObservable ).buffer( 10, TimeUnit.SECONDS, 1000 )
- .doOnNext( buffer -> {
+ .doOnNext(buffer -> {
- final long runningTotal = count.addAndGet( buffer.size() );
+ final long runningTotal = count.addAndGet(buffer.size());
final Map<String, Object> status = new HashMap<String, Object>() {{
- put( "countProcessed", runningTotal );
- put( "updatedTimestamp", System.currentTimeMillis() );
+ put("countProcessed", runningTotal);
+ put("updatedTimestamp", System.currentTimeMillis());
}};
- statusService.setStatus( CpNamingUtils.MANAGEMENT_APPLICATION_ID, jobId,
- StatusService.Status.INPROGRESS, status ).toBlocking().lastOrDefault( null );
- } ).doOnSubscribe( () -> {
- statusService.setStatus( CpNamingUtils.MANAGEMENT_APPLICATION_ID, jobId, StatusService.Status.STARTED,
- new HashMap<>() ).toBlocking().lastOrDefault( null );
- } ).doOnCompleted( () -> {
+ statusService.setStatus(CpNamingUtils.MANAGEMENT_APPLICATION_ID, jobId,
+ StatusService.Status.INPROGRESS, status).toBlocking().lastOrDefault(null);
+ }).doOnSubscribe(() -> {
+
+ statusService.setStatus(CpNamingUtils.MANAGEMENT_APPLICATION_ID,
+ jobId, StatusService.Status.STARTED, new HashMap<>()).toBlocking().lastOrDefault(null);
+
+ }).doOnCompleted(() -> {
final long runningTotal = count.get();
final Map<String, Object> status = new HashMap<String, Object>() {{
- put( "countProcessed", runningTotal );
- put( "updatedTimestamp", System.currentTimeMillis() );
+ put("countProcessed", runningTotal);
+ put("updatedTimestamp", System.currentTimeMillis());
}};
- statusService
- .setStatus( CpNamingUtils.MANAGEMENT_APPLICATION_ID, jobId, StatusService.Status.COMPLETE, status );
- } ).subscribeOn( Schedulers.newThread() ).subscribe();
+ statusService.setStatus(CpNamingUtils.MANAGEMENT_APPLICATION_ID,
+ jobId, StatusService.Status.COMPLETE, status).toBlocking().lastOrDefault(null);
+
+ }).doOnError( (throwable) -> {
+ logger.error("Error deduping connections", throwable);
+
+ final Map<String, Object> status = new HashMap<String, Object>() {{
+ put("error", throwable.getMessage() );
+ }};
+
+ statusService.setStatus(CpNamingUtils.MANAGEMENT_APPLICATION_ID,
+ jobId, StatusService.Status.FAILED, status).toBlocking().lastOrDefault(null);;
+
+ } ).subscribeOn(Schedulers.newThread()).subscribe();
+
+ final StatusService.JobStatus status =
+ new StatusService.JobStatus( jobId, StatusService.Status.STARTED, new HashMap<>( ) );
- final StatusService.JobStatus status = new StatusService.JobStatus( jobId, StatusService.Status.STARTED, new HashMap<>( ) );
return createResult( status, callback );
}