You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/10/26 23:24:39 UTC

usergrid git commit: First pass. Need to test.

Repository: usergrid
Updated Branches:
  refs/heads/USERGRID-1064 [created] e41b5f02e


First pass.  Need to test.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/e41b5f02
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/e41b5f02
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/e41b5f02

Branch: refs/heads/USERGRID-1064
Commit: e41b5f02eb12f72b9c19e5ae079b32d528951b47
Parents: 07d2ad3
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Oct 26 16:24:37 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Oct 26 16:24:37 2015 -0600

----------------------------------------------------------------------
 .../service/StatusServiceImpl.java              |   3 +-
 .../usergrid/rest/ConnectionResource.java       | 198 +++++++++++++++++++
 .../apache/usergrid/rest/SystemResource.java    |   5 +
 3 files changed, 205 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/e41b5f02/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/StatusServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/StatusServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/StatusServiceImpl.java
index 93fe653..282929e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/StatusServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/StatusServiceImpl.java
@@ -93,8 +93,9 @@ public class StatusServiceImpl implements StatusService {
             final MapManager mapManager = mapManagerFactory.createMapManager(new MapScopeImpl(appId, "status"));
             try {
                 String statusVal = mapManager.getString(jobString + statusKey);
+                //nothing to emit
                 if(statusVal==null){
-                    subscriber.onNext(null);
+                    subscriber.onCompleted();
                 }else {
                     final Map<String, Object> data = MAPPER.readValue(mapManager.getString(jobString + dataKey), Map.class);
                     final Status status = Status.valueOf(statusVal);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/e41b5f02/stack/rest/src/main/java/org/apache/usergrid/rest/ConnectionResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/ConnectionResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/ConnectionResource.java
new file mode 100644
index 0000000..b6a38e1
--- /dev/null
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/ConnectionResource.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.rest;
+
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Component;
+
+import org.apache.usergrid.corepersistence.service.ConnectionService;
+import org.apache.usergrid.corepersistence.service.ConnectionServiceImpl;
+import org.apache.usergrid.corepersistence.service.StatusService;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.utils.UUIDUtils;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import org.apache.usergrid.rest.security.annotations.RequireSystemAccess;
+
+import com.google.common.base.Preconditions;
+import com.sun.jersey.api.json.JSONWithPadding;
+
+import rx.Observable;
+import rx.schedulers.Schedulers;
+
+
+/**
+ * system/index/otherstuff
+ */
+@Component
+@Scope( "singleton" )
+@Produces( {
+    MediaType.APPLICATION_JSON, "application/javascript", "application/x-javascript", "text/ecmascript",
+    "application/ecmascript", "text/jscript"
+} )
+public class ConnectionResource extends AbstractContextResource {
+
+    private static final Logger logger = LoggerFactory.getLogger( ConnectionResource.class );
+
+    public ConnectionResource() {
+        super();
+    }
+
+
+    @RequireSystemAccess
+    @POST
+    @Path( "dedup/" + RootResource.APPLICATION_ID_PATH )
+    public JSONWithPadding rebuildIndexesPost( @PathParam( "applicationId" ) String applicationIdStr,
+                                               @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
+        throws Exception {
+
+
+        logger.info( "Rebuilding all applications" );
+
+        final UUID applicationId = UUIDUtils.tryGetUUID( applicationIdStr );
+
+        Preconditions.checkNotNull( applicationId, "applicationId must be specified" );
+
+        return executeAndCreateResponse( applicationId, callback );
+    }
+
+
+    @RequireSystemAccess
+    @GET
+    @Path( "dedup/{jobId}" )
+    public JSONWithPadding rebuildIndexesGet( @PathParam( "jobId" ) String jobId,
+                                              @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
+        throws Exception {
+        logger.info( "Getting status for index jobs" );
+
+        Preconditions.checkNotNull( jobId, "query param jobId must not be null" );
+
+
+        final UUID jobUUID = UUIDUtils.tryGetUUID( jobId );
+
+        final StatusService.JobStatus
+            job = getStatusService().getStatus( CpNamingUtils.MANAGEMENT_APPLICATION_ID, jobUUID ).toBlocking().lastOrDefault(
+            null );
+
+        Preconditions.checkNotNull( job, "job with id '" + jobId + "' does not exist" );
+
+
+        return createResult( job, callback );
+    }
+
+
+    private ConnectionService getConnectionService() {
+        return injector.getInstance( ConnectionServiceImpl.class );
+    }
+
+
+    private StatusService getStatusService() {
+        return injector.getInstance( StatusService.class );
+    }
+
+
+
+    /**
+     * Execute the request and return the response.
+     */
+    private JSONWithPadding executeAndCreateResponse( final UUID applicationId, final String callback ) {
+
+        final Observable<ApplicationScope> applicationScopeObservable =
+            Observable.just( CpNamingUtils.getApplicationScope( applicationId ) );
+
+        final UUID jobId = UUIDGenerator.newTimeUUID();
+
+        final StatusService statusService = getStatusService();
+        final ConnectionService connectionService = getConnectionService();
+
+        final AtomicLong count = new AtomicLong( 0 );
+
+        //start de duping and run in the background
+        connectionService.deDupeConnections( applicationScopeObservable ).buffer( 10, TimeUnit.SECONDS, 1000 )
+                         .doOnNext( buffer -> {
+
+
+                             final long runningTotal = count.addAndGet( buffer.size() );
+
+                             final Map<String, Object> status = new HashMap<String, Object>() {{
+                                 put( "countProcessed", runningTotal );
+                                 put( "updatedTimestamp", System.currentTimeMillis() );
+                             }};
+
+                             statusService.setStatus( CpNamingUtils.MANAGEMENT_APPLICATION_ID, jobId,
+                                 StatusService.Status.INPROGRESS, status );
+                         } ).doOnSubscribe( () -> {
+            statusService.setStatus( CpNamingUtils.MANAGEMENT_APPLICATION_ID, jobId, StatusService.Status.STARTED,
+                new HashMap<>() );
+        } ).doOnCompleted( () -> {
+
+            final long runningTotal = count.get();
+
+            final Map<String, Object> status = new HashMap<String, Object>() {{
+                put( "countProcessed", runningTotal );
+                put( "updatedTimestamp", System.currentTimeMillis() );
+            }};
+
+            statusService
+                .setStatus( CpNamingUtils.MANAGEMENT_APPLICATION_ID, jobId, StatusService.Status.COMPLETE, status );
+        } ).subscribeOn( Schedulers.newThread() ).subscribe();
+
+
+        final StatusService.JobStatus status = new StatusService.JobStatus( jobId, StatusService.Status.STARTED, new HashMap<>(  ) );
+        return createResult( status, callback );
+    }
+
+
+    /**
+     * Create a response with the specified data.
+     * @param jobStatus
+     * @param callback
+     * @return
+     */
+    private JSONWithPadding createResult(final StatusService.JobStatus jobStatus, final String callback){
+
+        final ApiResponse response = createApiResponse();
+
+        response.setAction( "de-dup connections" );
+        response.setProperty( "status", jobStatus );
+        response.setSuccess();
+
+        return new JSONWithPadding( response, callback );
+    }
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/usergrid/blob/e41b5f02/stack/rest/src/main/java/org/apache/usergrid/rest/SystemResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/SystemResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/SystemResource.java
index f266441..aaee596 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/SystemResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/SystemResource.java
@@ -100,4 +100,9 @@ public class SystemResource extends AbstractContextResource {
     public ApplicationsResource applications() {
         return getSubResource( ApplicationsResource.class );
     }
+
+
+    @Path( "connection" )
+    public ConnectionResource connection() { return getSubResource( ConnectionResource.class ); }
+
 }