You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/10/26 23:24:39 UTC
usergrid git commit: First pass. Need to test.
Repository: usergrid
Updated Branches:
refs/heads/USERGRID-1064 [created] e41b5f02e
First pass. Need to test.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/e41b5f02
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/e41b5f02
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/e41b5f02
Branch: refs/heads/USERGRID-1064
Commit: e41b5f02eb12f72b9c19e5ae079b32d528951b47
Parents: 07d2ad3
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Oct 26 16:24:37 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Oct 26 16:24:37 2015 -0600
----------------------------------------------------------------------
.../service/StatusServiceImpl.java | 3 +-
.../usergrid/rest/ConnectionResource.java | 198 +++++++++++++++++++
.../apache/usergrid/rest/SystemResource.java | 5 +
3 files changed, 205 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/e41b5f02/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/StatusServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/StatusServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/StatusServiceImpl.java
index 93fe653..282929e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/StatusServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/StatusServiceImpl.java
@@ -93,8 +93,9 @@ public class StatusServiceImpl implements StatusService {
final MapManager mapManager = mapManagerFactory.createMapManager(new MapScopeImpl(appId, "status"));
try {
String statusVal = mapManager.getString(jobString + statusKey);
+ //nothing to emit
if(statusVal==null){
- subscriber.onNext(null);
+ subscriber.onCompleted();
}else {
final Map<String, Object> data = MAPPER.readValue(mapManager.getString(jobString + dataKey), Map.class);
final Status status = Status.valueOf(statusVal);
http://git-wip-us.apache.org/repos/asf/usergrid/blob/e41b5f02/stack/rest/src/main/java/org/apache/usergrid/rest/ConnectionResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/ConnectionResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/ConnectionResource.java
new file mode 100644
index 0000000..b6a38e1
--- /dev/null
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/ConnectionResource.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.rest;
+
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Component;
+
+import org.apache.usergrid.corepersistence.service.ConnectionService;
+import org.apache.usergrid.corepersistence.service.ConnectionServiceImpl;
+import org.apache.usergrid.corepersistence.service.StatusService;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.utils.UUIDUtils;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import org.apache.usergrid.rest.security.annotations.RequireSystemAccess;
+
+import com.google.common.base.Preconditions;
+import com.sun.jersey.api.json.JSONWithPadding;
+
+import rx.Observable;
+import rx.schedulers.Schedulers;
+
+
+/**
+ * system/index/otherstuff
+ */
+@Component
+@Scope( "singleton" )
+@Produces( {
+ MediaType.APPLICATION_JSON, "application/javascript", "application/x-javascript", "text/ecmascript",
+ "application/ecmascript", "text/jscript"
+} )
+public class ConnectionResource extends AbstractContextResource {
+
+ private static final Logger logger = LoggerFactory.getLogger( ConnectionResource.class );
+
+ public ConnectionResource() {
+ super();
+ }
+
+
+ @RequireSystemAccess
+ @POST
+ @Path( "dedup/" + RootResource.APPLICATION_ID_PATH )
+ public JSONWithPadding rebuildIndexesPost( @PathParam( "applicationId" ) String applicationIdStr,
+ @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
+ throws Exception {
+
+
+ logger.info( "Rebuilding all applications" );
+
+ final UUID applicationId = UUIDUtils.tryGetUUID( applicationIdStr );
+
+ Preconditions.checkNotNull( applicationId, "applicationId must be specified" );
+
+ return executeAndCreateResponse( applicationId, callback );
+ }
+
+
+ @RequireSystemAccess
+ @GET
+ @Path( "dedup/{jobId}" )
+ public JSONWithPadding rebuildIndexesGet( @PathParam( "jobId" ) String jobId,
+ @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
+ throws Exception {
+ logger.info( "Getting status for index jobs" );
+
+ Preconditions.checkNotNull( jobId, "query param jobId must not be null" );
+
+
+ final UUID jobUUID = UUIDUtils.tryGetUUID( jobId );
+
+ final StatusService.JobStatus
+ job = getStatusService().getStatus( CpNamingUtils.MANAGEMENT_APPLICATION_ID, jobUUID ).toBlocking().lastOrDefault(
+ null );
+
+ Preconditions.checkNotNull( job, "job with id '" + jobId + "' does not exist" );
+
+
+ return createResult( job, callback );
+ }
+
+
+ private ConnectionService getConnectionService() {
+ return injector.getInstance( ConnectionServiceImpl.class );
+ }
+
+
+ private StatusService getStatusService() {
+ return injector.getInstance( StatusService.class );
+ }
+
+
+
+ /**
+ * Execute the request and return the response.
+ */
+ private JSONWithPadding executeAndCreateResponse( final UUID applicationId, final String callback ) {
+
+ final Observable<ApplicationScope> applicationScopeObservable =
+ Observable.just( CpNamingUtils.getApplicationScope( applicationId ) );
+
+ final UUID jobId = UUIDGenerator.newTimeUUID();
+
+ final StatusService statusService = getStatusService();
+ final ConnectionService connectionService = getConnectionService();
+
+ final AtomicLong count = new AtomicLong( 0 );
+
+ //start de duping and run in the background
+ connectionService.deDupeConnections( applicationScopeObservable ).buffer( 10, TimeUnit.SECONDS, 1000 )
+ .doOnNext( buffer -> {
+
+
+ final long runningTotal = count.addAndGet( buffer.size() );
+
+ final Map<String, Object> status = new HashMap<String, Object>() {{
+ put( "countProcessed", runningTotal );
+ put( "updatedTimestamp", System.currentTimeMillis() );
+ }};
+
+ statusService.setStatus( CpNamingUtils.MANAGEMENT_APPLICATION_ID, jobId,
+ StatusService.Status.INPROGRESS, status );
+ } ).doOnSubscribe( () -> {
+ statusService.setStatus( CpNamingUtils.MANAGEMENT_APPLICATION_ID, jobId, StatusService.Status.STARTED,
+ new HashMap<>() );
+ } ).doOnCompleted( () -> {
+
+ final long runningTotal = count.get();
+
+ final Map<String, Object> status = new HashMap<String, Object>() {{
+ put( "countProcessed", runningTotal );
+ put( "updatedTimestamp", System.currentTimeMillis() );
+ }};
+
+ statusService
+ .setStatus( CpNamingUtils.MANAGEMENT_APPLICATION_ID, jobId, StatusService.Status.COMPLETE, status );
+ } ).subscribeOn( Schedulers.newThread() ).subscribe();
+
+
+ final StatusService.JobStatus status = new StatusService.JobStatus( jobId, StatusService.Status.STARTED, new HashMap<>( ) );
+ return createResult( status, callback );
+ }
+
+
+ /**
+ * Create a response with the specified data.
+ * @param jobStatus
+ * @param callback
+ * @return
+ */
+ private JSONWithPadding createResult(final StatusService.JobStatus jobStatus, final String callback){
+
+ final ApiResponse response = createApiResponse();
+
+ response.setAction( "de-dup connections" );
+ response.setProperty( "status", jobStatus );
+ response.setSuccess();
+
+ return new JSONWithPadding( response, callback );
+ }
+}
+
+
+
http://git-wip-us.apache.org/repos/asf/usergrid/blob/e41b5f02/stack/rest/src/main/java/org/apache/usergrid/rest/SystemResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/SystemResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/SystemResource.java
index f266441..aaee596 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/SystemResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/SystemResource.java
@@ -100,4 +100,9 @@ public class SystemResource extends AbstractContextResource {
public ApplicationsResource applications() {
return getSubResource( ApplicationsResource.class );
}
+
+
+ @Path( "connection" )
+ public ConnectionResource connection() { return getSubResource( ConnectionResource.class ); }
+
}