You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2017/03/31 07:44:47 UTC

[1/4] usergrid git commit: Initial commit of export application API.

Repository: usergrid
Updated Branches:
  refs/heads/master ea1ba360d -> 8b63aae7d


Initial commit of export application API.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/82e7ec57
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/82e7ec57
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/82e7ec57

Branch: refs/heads/master
Commit: 82e7ec57bef24c8b75d3f3479952522fdd916bfa
Parents: ea1ba36
Author: Michael Russo <ru...@google.com>
Authored: Mon Mar 27 00:01:38 2017 -0700
Committer: Michael Russo <ru...@google.com>
Committed: Mon Mar 27 00:01:38 2017 -0700

----------------------------------------------------------------------
 .../usergrid/corepersistence/CoreModule.java    |   4 +
 .../export/ExportRequestBuilder.java            |  47 ++++
 .../export/ExportRequestBuilderImpl.java        |  65 +++++
 .../corepersistence/export/ExportService.java   |  49 ++++
 .../export/ExportServiceImpl.java               | 235 +++++++++++++++++++
 .../util/ObjectJsonSerializer.java              |  17 ++
 .../rest/applications/ApplicationResource.java  |  48 ++++
 7 files changed, 465 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/82e7ec57/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index ef4bb04..af297f2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -20,6 +20,8 @@ import com.google.inject.*;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.multibindings.Multibinder;
 import org.apache.usergrid.corepersistence.asyncevents.*;
+import org.apache.usergrid.corepersistence.export.ExportService;
+import org.apache.usergrid.corepersistence.export.ExportServiceImpl;
 import org.apache.usergrid.corepersistence.index.*;
 import org.apache.usergrid.corepersistence.migration.CoreMigration;
 import org.apache.usergrid.corepersistence.migration.CoreMigrationPlugin;
@@ -137,6 +139,8 @@ public class CoreModule extends AbstractModule {
 
         bind( ReIndexService.class ).to( ReIndexServiceImpl.class );
 
+        bind( ExportService.class ).to( ExportServiceImpl.class );
+
         install( new FactoryModuleBuilder().implement( AggregationService.class, AggregationServiceImpl.class )
                                            .build( AggregationServiceFactory.class ) );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/82e7ec57/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportRequestBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportRequestBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportRequestBuilder.java
new file mode 100644
index 0000000..71c64f6
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportRequestBuilder.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.export;
+
+
+import com.google.common.base.Optional;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * A builder interface to build our re-index request
+ */
+public interface ExportRequestBuilder {
+
+    /**
+     * Set the application id
+     */
+    ExportRequestBuilder withApplicationId(final UUID applicationId);
+
+
+    /**
+     * Get the application scope
+     * @return
+     */
+    Optional<ApplicationScope> getApplicationScope();
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/82e7ec57/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportRequestBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportRequestBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportRequestBuilderImpl.java
new file mode 100644
index 0000000..73fcec4
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportRequestBuilderImpl.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.export;
+
+
+import com.google.common.base.Optional;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Index service request builder
+ */
+public class ExportRequestBuilderImpl implements ExportRequestBuilder {
+
+    private Optional<UUID> withApplicationId = Optional.absent();
+    private Optional<String> withCollectionName = Optional.absent();
+    private Optional<String> cursor = Optional.absent();
+    private Optional<Long> updateTimestamp = Optional.absent();
+    private Optional<Integer> delayTimer = Optional.absent();
+    private Optional<TimeUnit> timeUnitOptional = Optional.absent();
+
+
+    /***
+     *
+     * @param applicationId The application id
+     * @return
+     */
+    @Override
+    public ExportRequestBuilder withApplicationId(final UUID applicationId ) {
+        this.withApplicationId = Optional.fromNullable( applicationId );
+        return this;
+    }
+
+    @Override
+    public Optional<ApplicationScope> getApplicationScope() {
+
+        if ( this.withApplicationId.isPresent() ) {
+            return Optional.of( CpNamingUtils.getApplicationScope( withApplicationId.get() ) );
+        }
+
+        return Optional.absent();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/82e7ec57/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportService.java
new file mode 100644
index 0000000..7615448
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportService.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.export;
+
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * An interface for exporting all entities within an application
+ */
+public interface ExportService {
+
+
+    /**
+     * Perform an application export
+     *
+     * @param exportRequestBuilder The builder to build the request
+     */
+    void export(final ExportRequestBuilder exportRequestBuilder, OutputStream stream) throws IOException;
+
+
+    /**
+     * Generate a build for the index
+     */
+    ExportRequestBuilder getBuilder();
+
+
+    enum Status{
+        STARTED, INPROGRESS, COMPLETE, UNKNOWN;
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/82e7ec57/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java
new file mode 100644
index 0000000..ebbcc58
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.export;
+
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.usergrid.corepersistence.ManagerCache;
+import org.apache.usergrid.corepersistence.index.*;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
+import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.map.MapScope;
+import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.schedulers.Schedulers;
+
+
+import java.io.*;
+import java.util.Map;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+
+@Singleton
+public class ExportServiceImpl implements ExportService {
+
+    private static final Logger logger = LoggerFactory.getLogger( ReIndexServiceImpl.class );
+
+    private static final MapScope RESUME_MAP_SCOPE =
+        new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "export-status" );
+
+
+    private static final String MAP_COUNT_KEY = "count";
+    private static final String MAP_STATUS_KEY = "status";
+    private static final String MAP_UPDATED_KEY = "lastUpdated";
+
+
+    private final AllEntityIdsObservable allEntityIdsObservable;
+    private final MapManager mapManager;
+    private final MapManagerFactory mapManagerFactory;
+    private final CollectionSettingsFactory collectionSettingsFactory;
+    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+    private final ManagerCache managerCache;
+
+    ObjectJsonSerializer jsonSerializer = ObjectJsonSerializer.INSTANCE;
+
+
+
+    @Inject
+    public ExportServiceImpl(final AllEntityIdsObservable allEntityIdsObservable,
+                             final ManagerCache managerCache,
+                               final MapManagerFactory mapManagerFactory,
+                             final CollectionSettingsFactory collectionSettingsFactory,
+                             final EntityCollectionManagerFactory entityCollectionManagerFactory) {
+        this.allEntityIdsObservable = allEntityIdsObservable;
+        this.collectionSettingsFactory = collectionSettingsFactory;
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.mapManagerFactory = mapManagerFactory;
+        this.mapManager = mapManagerFactory.createMapManager( RESUME_MAP_SCOPE );
+        this.managerCache = managerCache;
+    }
+
+
+    @Override
+    public void export(final ExportRequestBuilder exportRequestBuilder, OutputStream stream) throws IOException {
+
+        final ZipOutputStream zipOutputStream = new ZipOutputStream(stream);
+
+        //final AtomicInteger count = new AtomicInteger();
+        final ApplicationScope appScope = exportRequestBuilder.getApplicationScope().get();
+        final Observable<ApplicationScope> applicationScopes = Observable.just(appScope);
+
+
+        //final String jobId = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
+        final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(appScope);
+
+        final String rootPath = appScope.getApplication().getUuid().toString();
+
+        GraphManager gm = managerCache.getGraphManager( appScope );
+
+        allEntityIdsObservable.getEdgesToEntities( applicationScopes, Optional.absent(), Optional.absent() )
+            .doOnNext( edgeScope -> {
+
+                try {
+
+                    // load the entity and convert to a normal map
+                    Entity entity = ecm.load(edgeScope.getEdge().getTargetNode()).toBlocking().lastOrDefault(null);
+                    Map entityMap = CpEntityMapUtils.toMap(entity);
+
+                    if (entity != null) {
+                        final String filenameWithPath = rootPath + "/" +
+                            edgeScope.getEdge().getSourceNode().getUuid().toString() + "_" +
+                            edgeScope.getEdge().getType() + "_" +
+                            edgeScope.getEdge().getTargetNode().getUuid().toString() + ".json";
+
+                        logger.debug("adding zip entry: {}", filenameWithPath);
+                        zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath));
+
+
+                        logger.debug("writing and flushing entity to zip stream: {}", jsonSerializer.toString(entityMap));
+                        zipOutputStream.write(jsonSerializer.toString(entityMap).getBytes());
+                        zipOutputStream.closeEntry();
+                        zipOutputStream.flush();
+
+                    } else {
+                        logger.warn("{}  did not have corresponding entity, not writing", edgeScope.toString());
+                    }
+
+                } catch (IOException e) {
+                    logger.warn("Unable to create entry in zip export for edge {}", edgeScope);
+                }
+
+                //writeStateMeta( jobId, Status.INPROGRESS, count.addAndGet(1), System.currentTimeMillis() );
+            })
+            .flatMap( edgeScope -> {
+
+                // find all connection types for the each entity emitted from the app
+                return gm.getEdgeTypesFromSource(CpNamingUtils.createConnectionTypeSearch(edgeScope.getEdge().getTargetNode()))
+                    .flatMap(emittedEdgeType -> {
+
+                        logger.debug("loading edges of type {} from node {}", emittedEdgeType, edgeScope.getEdge().getTargetNode());
+                        return gm.loadEdgesFromSource(new SimpleSearchByEdgeType( edgeScope.getEdge().getTargetNode(),
+                            emittedEdgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent() ));
+
+                    }).doOnNext( markedEdge -> {
+
+                        if (!markedEdge.isDeleted()){
+
+                            // todo, probably don't need to load the target node itself since it would be loaded in normal collection walking
+                            Entity entity = ecm.load(markedEdge.getTargetNode()).toBlocking().lastOrDefault(null);
+
+                            Map entityMap = CpEntityMapUtils.toMap(entity);
+
+                            try {
+                                final String filenameWithPath = rootPath + "/" +
+                                    markedEdge.getSourceNode().getUuid().toString() + "_" +
+                                    markedEdge.getType() + "_" +
+                                    markedEdge.getTargetNode().getUuid().toString() + ".json";
+
+                                logger.debug("adding zip entry: {}", filenameWithPath);
+                                zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath));
+
+                                logger.debug("writing and flushing entity to zip stream: {}", jsonSerializer.toString(entityMap));
+                                zipOutputStream.write(jsonSerializer.toString(entityMap).getBytes());
+                                zipOutputStream.closeEntry();
+                                zipOutputStream.flush();
+
+                            } catch (IOException e) {
+                                logger.warn("Unable to create entry in zip export for edge {}", markedEdge.toString());
+                            }
+                        }
+
+                    });
+
+            })
+            .doOnCompleted(() -> {
+
+                //writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() );
+                try {
+                    logger.debug("closing zip stream");
+                    zipOutputStream.close();
+
+                } catch (IOException e) {
+                    logger.error( "unable to close zip stream");
+                }
+
+            })
+            .subscribeOn( Schedulers.io() ).toBlocking().lastOrDefault(null);
+    }
+
+
+    @Override
+    public ExportRequestBuilder getBuilder() {
+        return new ExportRequestBuilderImpl();
+    }
+
+
+
+
+    /**
+     * Write our state meta data into cassandra so everyone can see it
+     * @param jobId
+     * @param status
+     * @param processedCount
+     * @param lastUpdated
+     */
+    private void writeStateMeta( final String jobId, final Status status, final long processedCount,
+                                 final long lastUpdated ) {
+
+        if(logger.isDebugEnabled()) {
+            logger.debug( "Flushing state for jobId {}, status {}, processedCount {}, lastUpdated {}",
+                    jobId, status, processedCount, lastUpdated);
+        }
+
+        mapManager.putString( jobId + MAP_STATUS_KEY, status.name() );
+        mapManager.putLong( jobId + MAP_COUNT_KEY, processedCount );
+        mapManager.putLong( jobId + MAP_UPDATED_KEY, lastUpdated );
+    }
+
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/usergrid/blob/82e7ec57/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
index 4e5873a..b704afa 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
@@ -77,6 +77,23 @@ public final class ObjectJsonSerializer {
         return stringValue;
     }
 
+    public <T> String toString( final T toSerialize ) {
+
+        Preconditions.checkNotNull( toSerialize, "toSerialize must not be null" );
+        final String stringValue;
+        //mark this version as empty
+
+        //Convert to internal entity map
+        try {
+            stringValue = MAPPER.writeValueAsString( toSerialize );
+        }
+        catch ( JsonProcessingException jpe ) {
+            throw new RuntimeException( "Unable to serialize entity", jpe );
+        }
+
+        return stringValue;
+    }
+
 
     public <T extends Serializable> T fromString( final String value, final Class<T> toSerialize ) {
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/82e7ec57/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
index 9836f1c..920287b 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
@@ -25,6 +25,11 @@ import org.apache.amber.oauth2.common.message.OAuthResponse;
 import org.apache.amber.oauth2.common.message.types.GrantType;
 import org.apache.shiro.authz.UnauthorizedException;
 import org.apache.shiro.codec.Base64;
+import org.apache.usergrid.corepersistence.export.ExportRequestBuilder;
+import org.apache.usergrid.corepersistence.export.ExportRequestBuilderImpl;
+import org.apache.usergrid.corepersistence.export.ExportService;
+import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder;
+import org.apache.usergrid.corepersistence.index.ReIndexService;
 import org.apache.usergrid.management.ApplicationInfo;
 import org.apache.usergrid.management.exceptions.DisabledAdminUserException;
 import org.apache.usergrid.management.exceptions.DisabledAppUserException;
@@ -37,6 +42,7 @@ import org.apache.usergrid.persistence.entities.Application;
 import org.apache.usergrid.persistence.entities.User;
 import org.apache.usergrid.persistence.exceptions.EntityNotFoundException;
 import org.apache.usergrid.persistence.index.query.Identifier;
+import org.apache.usergrid.persistence.index.utils.UUIDUtils;
 import org.apache.usergrid.rest.AbstractContextResource;
 import org.apache.usergrid.rest.ApiResponse;
 import org.apache.usergrid.rest.applications.assets.AssetsResource;
@@ -58,6 +64,8 @@ import org.springframework.stereotype.Component;
 
 import javax.ws.rs.*;
 import javax.ws.rs.core.*;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
 import java.util.Map;
@@ -676,4 +684,44 @@ public class ApplicationResource extends CollectionResource {
     }
 
 
+    private ExportService getExportService() {
+        return injector.getInstance( ExportService.class );
+    }
+
+
+    @GET
+    @Path("export")
+    @RequireApplicationAccess
+    @Produces({"application/zip"})
+    public Response getExport( @Context UriInfo ui,
+                                @QueryParam("callback") @DefaultValue("callback") String callback )
+        throws Exception {
+
+        if (logger.isTraceEnabled()) {
+            logger.trace("ApplicationResource.getExport");
+        }
+
+        if ( !isApplicationAdmin( Identifier.fromUUID( applicationId ) ) ) {
+            throw new UnauthorizedException();
+        }
+
+
+        final ExportRequestBuilder request = new ExportRequestBuilderImpl().withApplicationId( applicationId );
+        StreamingOutput stream = new StreamingOutput() {
+            @Override
+            public void write(OutputStream outputStream) throws IOException, WebApplicationException {
+                getExportService().export(request,outputStream);
+            }
+        };
+        return Response
+            .ok(stream)
+            .header("Content-Disposition", "attachment; filename=\"usergrid_export-"+System.currentTimeMillis()+".zip\"")
+            .build();
+    }
+
+
+
+
+
+
 }


[2/4] usergrid git commit: Batch write to the export stream and use a cleaner filename.

Posted by mr...@apache.org.
Batch write to the export stream and use a cleaner filename.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2d56d813
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2d56d813
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2d56d813

Branch: refs/heads/master
Commit: 2d56d813a9d99295ef72ab055e8600f62df364e9
Parents: 82e7ec5
Author: Michael Russo <ru...@google.com>
Authored: Mon Mar 27 18:48:49 2017 -0700
Committer: Michael Russo <ru...@google.com>
Committed: Mon Mar 27 18:48:49 2017 -0700

----------------------------------------------------------------------
 .../export/ExportServiceImpl.java               | 153 ++++++++++++-------
 .../rest/applications/ApplicationResource.java  |   4 +-
 2 files changed, 104 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/2d56d813/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java
index ebbcc58..47d6982 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java
@@ -48,7 +48,9 @@ import rx.schedulers.Schedulers;
 
 
 import java.io.*;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
 
@@ -98,91 +100,138 @@ public class ExportServiceImpl implements ExportService {
 
         final ZipOutputStream zipOutputStream = new ZipOutputStream(stream);
 
-        //final AtomicInteger count = new AtomicInteger();
         final ApplicationScope appScope = exportRequestBuilder.getApplicationScope().get();
         final Observable<ApplicationScope> applicationScopes = Observable.just(appScope);
 
-
-        //final String jobId = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
         final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(appScope);
 
-        final String rootPath = appScope.getApplication().getUuid().toString();
-
         GraphManager gm = managerCache.getGraphManager( appScope );
 
+        final AtomicInteger entityFileCount = new AtomicInteger();
+        final AtomicInteger connectionFileCount = new AtomicInteger();
+
         allEntityIdsObservable.getEdgesToEntities( applicationScopes, Optional.absent(), Optional.absent() )
-            .doOnNext( edgeScope -> {
+            .buffer( 1000 )
+            .doOnNext( edgeScopes -> {
+
 
                 try {
 
-                    // load the entity and convert to a normal map
-                    Entity entity = ecm.load(edgeScope.getEdge().getTargetNode()).toBlocking().lastOrDefault(null);
-                    Map entityMap = CpEntityMapUtils.toMap(entity);
+                    final String filenameWithPath = "entities/" +
+                        "entities."+ entityFileCount.get() + ".json";
+
+                    logger.debug("adding zip entry: {}", filenameWithPath);
+                    zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath));
+
+                    edgeScopes.forEach( edgeScope -> {
+
+
+                        try {
+                            // load the entity and convert to a normal map
+                            Entity entity = ecm.load(edgeScope.getEdge().getTargetNode()).toBlocking().lastOrDefault(null);
+                            Map entityMap = CpEntityMapUtils.toMap(entity);
+
+                            if (entity != null) {
+
 
-                    if (entity != null) {
-                        final String filenameWithPath = rootPath + "/" +
-                            edgeScope.getEdge().getSourceNode().getUuid().toString() + "_" +
-                            edgeScope.getEdge().getType() + "_" +
-                            edgeScope.getEdge().getTargetNode().getUuid().toString() + ".json";
 
-                        logger.debug("adding zip entry: {}", filenameWithPath);
-                        zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath));
+                                logger.debug("writing and flushing entity to zip stream: {}", jsonSerializer.toString(entityMap));
+                                zipOutputStream.write(jsonSerializer.toString(entityMap).getBytes());
+                                zipOutputStream.write("\n".getBytes());
+
 
+                            } else {
+                                logger.warn("{}  did not have corresponding entity, not writing", edgeScope.toString());
+                            }
+
+                        } catch (IOException e) {
+                            logger.warn("Unable to create entry in zip export for edge {}", edgeScope);
+                        }
 
-                        logger.debug("writing and flushing entity to zip stream: {}", jsonSerializer.toString(entityMap));
-                        zipOutputStream.write(jsonSerializer.toString(entityMap).getBytes());
-                        zipOutputStream.closeEntry();
-                        zipOutputStream.flush();
 
-                    } else {
-                        logger.warn("{}  did not have corresponding entity, not writing", edgeScope.toString());
-                    }
+                        entityFileCount.addAndGet(1);
+
+                    });
+
+                    zipOutputStream.closeEntry();
+                    zipOutputStream.flush();
 
                 } catch (IOException e) {
-                    logger.warn("Unable to create entry in zip export for edge {}", edgeScope);
+                    logger.warn("Unable to create entry in zip export for batch entities");
                 }
 
                 //writeStateMeta( jobId, Status.INPROGRESS, count.addAndGet(1), System.currentTimeMillis() );
             })
-            .flatMap( edgeScope -> {
+            .doOnNext( edgeScopes -> {
 
-                // find all connection types for the each entity emitted from the app
-                return gm.getEdgeTypesFromSource(CpNamingUtils.createConnectionTypeSearch(edgeScope.getEdge().getTargetNode()))
-                    .flatMap(emittedEdgeType -> {
+                try{
 
-                        logger.debug("loading edges of type {} from node {}", emittedEdgeType, edgeScope.getEdge().getTargetNode());
-                        return gm.loadEdgesFromSource(new SimpleSearchByEdgeType( edgeScope.getEdge().getTargetNode(),
-                            emittedEdgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent() ));
+                    final String filenameWithPath = "connections/" +
+                        "connections." + connectionFileCount.get() + ".json";
 
-                    }).doOnNext( markedEdge -> {
+                    logger.debug("adding zip entry: {}", filenameWithPath);
+                    zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath));
 
-                        if (!markedEdge.isDeleted()){
+                    edgeScopes.forEach(edgeScope -> gm.getEdgeTypesFromSource(CpNamingUtils.createConnectionTypeSearch(edgeScope.getEdge().getTargetNode()))
+                        .flatMap(emittedEdgeType -> {
 
-                            // todo, probably don't need to load the target node itself since it would be loaded in normal collection walking
-                            Entity entity = ecm.load(markedEdge.getTargetNode()).toBlocking().lastOrDefault(null);
+                            logger.debug("loading edges of type {} from node {}", emittedEdgeType, edgeScope.getEdge().getTargetNode());
+                            return gm.loadEdgesFromSource(new SimpleSearchByEdgeType(edgeScope.getEdge().getTargetNode(),
+                                emittedEdgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent()));
 
-                            Map entityMap = CpEntityMapUtils.toMap(entity);
+                        })
 
-                            try {
-                                final String filenameWithPath = rootPath + "/" +
-                                    markedEdge.getSourceNode().getUuid().toString() + "_" +
-                                    markedEdge.getType() + "_" +
-                                    markedEdge.getTargetNode().getUuid().toString() + ".json";
+                        .buffer( 1000 )
+                        .doOnNext(markedEdges -> {
 
-                                logger.debug("adding zip entry: {}", filenameWithPath);
-                                zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath));
 
-                                logger.debug("writing and flushing entity to zip stream: {}", jsonSerializer.toString(entityMap));
-                                zipOutputStream.write(jsonSerializer.toString(entityMap).getBytes());
-                                zipOutputStream.closeEntry();
-                                zipOutputStream.flush();
 
-                            } catch (IOException e) {
-                                logger.warn("Unable to create entry in zip export for edge {}", markedEdge.toString());
-                            }
-                        }
+                                markedEdges.forEach( markedEdge -> {
 
-                    });
+                                    if (!markedEdge.isDeleted()) {
+
+                                        // doing the load to just again make sure bad connections are not exported
+                                        Entity entity = ecm.load(markedEdge.getTargetNode()).toBlocking().lastOrDefault(null);
+
+                                        if (entity != null) {
+
+                                            try {
+
+                                                final Map<String,String> connectionMap = new HashMap<String,String>(1){{
+                                                    put("sourceNodeUUID", markedEdge.getSourceNode().getUuid().toString() );
+                                                    put("relationship", CpNamingUtils.getConnectionNameFromEdgeName(markedEdge.getType()) );
+                                                    put("targetNodeUUID", markedEdge.getTargetNode().getUuid().toString());
+                                                }};
+
+                                                logger.debug("writing and flushing connection to zip stream: {}", jsonSerializer.toString(connectionMap).getBytes());
+                                                zipOutputStream.write(jsonSerializer.toString(connectionMap).getBytes());
+                                                zipOutputStream.write("\n".getBytes());
+
+
+                                            } catch (IOException e) {
+                                                logger.warn("Unable to create entry in zip export for edge {}", markedEdge.toString());
+                                            }
+                                        } else {
+                                            logger.warn("Exported connection has a missing target node, not creating connection in export. Edge: {}", markedEdge);
+                                        }
+                                    }
+
+                                });
+
+
+
+
+                        }).toBlocking().lastOrDefault(null));
+
+                    connectionFileCount.addAndGet(1);
+
+                    zipOutputStream.closeEntry();
+                    zipOutputStream.flush();
+
+
+                } catch (IOException e) {
+                    logger.warn("Unable to create entry in zip export for batch connections");
+                }
 
             })
             .doOnCompleted(() -> {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2d56d813/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
index 920287b..7479a90 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
@@ -31,6 +31,7 @@ import org.apache.usergrid.corepersistence.export.ExportService;
 import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder;
 import org.apache.usergrid.corepersistence.index.ReIndexService;
 import org.apache.usergrid.management.ApplicationInfo;
+import org.apache.usergrid.management.OrganizationInfo;
 import org.apache.usergrid.management.exceptions.DisabledAdminUserException;
 import org.apache.usergrid.management.exceptions.DisabledAppUserException;
 import org.apache.usergrid.management.exceptions.UnactivatedAdminUserException;
@@ -705,6 +706,7 @@ public class ApplicationResource extends CollectionResource {
             throw new UnauthorizedException();
         }
 
+        ApplicationInfo appInfo = management.getApplicationInfo(applicationId);
 
         final ExportRequestBuilder request = new ExportRequestBuilderImpl().withApplicationId( applicationId );
         StreamingOutput stream = new StreamingOutput() {
@@ -715,7 +717,7 @@ public class ApplicationResource extends CollectionResource {
         };
         return Response
             .ok(stream)
-            .header("Content-Disposition", "attachment; filename=\"usergrid_export-"+System.currentTimeMillis()+".zip\"")
+            .header("Content-Disposition", "attachment; filename=\""+appInfo.getName().replace("/","_")+"_"+System.currentTimeMillis()+".zip\"")
             .build();
     }
 


[3/4] usergrid git commit: Update new ExportService to better stream out the results directly from the database. Added tests to validate

Posted by mr...@apache.org.
Update new ExportService to better stream out the results directly from the database.  Added tests to validate


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/b37fc81e
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/b37fc81e
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/b37fc81e

Branch: refs/heads/master
Commit: b37fc81e984c43310bb8590a606ce639312e24b2
Parents: 2d56d81
Author: Michael Russo <ru...@google.com>
Authored: Thu Mar 30 19:53:12 2017 -0700
Committer: Michael Russo <ru...@google.com>
Committed: Thu Mar 30 19:53:12 2017 -0700

----------------------------------------------------------------------
 .../corepersistence/export/ExportService.java   |   6 +-
 .../export/ExportServiceImpl.java               | 245 +++++++--------
 .../corepersistence/export/ExportServiceIT.java | 122 +++++++
 .../MvccEntitySerializationStrategyV3Impl.java  |   8 +-
 .../exceptions/AbstractExceptionMapper.java     |   3 +-
 .../organizations/OrganizationResource.java     | 190 +++++------
 .../applications/ApplicationResource.java       | 314 +++++++++----------
 .../notifications/NotificationsService.java     |  13 +-
 .../apns/NotificationsServiceIT.java            |  49 ++-
 9 files changed, 558 insertions(+), 392 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/b37fc81e/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportService.java
index 7615448..0c7789b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportService.java
@@ -30,15 +30,15 @@ public interface ExportService {
 
 
     /**
-     * Perform an application export
+     * Perform an application export into the provided OutputStream
      *
      * @param exportRequestBuilder The builder to build the request
      */
-    void export(final ExportRequestBuilder exportRequestBuilder, OutputStream stream) throws IOException;
+    void export(final ExportRequestBuilder exportRequestBuilder, OutputStream stream) throws RuntimeException;
 
 
     /**
-     * Generate a build for the index
+     * Generate a builder for the export request
      */
     ExportRequestBuilder getBuilder();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b37fc81e/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java
index 47d6982..e561ac6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java
@@ -31,6 +31,8 @@ import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.serialization.impl.EntitySetImpl;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
@@ -41,6 +43,8 @@ import org.apache.usergrid.persistence.map.MapScope;
 import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
 import org.apache.usergrid.persistence.model.entity.Entity;
 
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.utils.InflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rx.Observable;
@@ -48,7 +52,9 @@ import rx.schedulers.Schedulers;
 
 
 import java.io.*;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.ZipEntry;
@@ -58,47 +64,32 @@ import java.util.zip.ZipOutputStream;
 @Singleton
 public class ExportServiceImpl implements ExportService {
 
-    private static final Logger logger = LoggerFactory.getLogger( ReIndexServiceImpl.class );
-
-    private static final MapScope RESUME_MAP_SCOPE =
-        new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "export-status" );
-
-
-    private static final String MAP_COUNT_KEY = "count";
-    private static final String MAP_STATUS_KEY = "status";
-    private static final String MAP_UPDATED_KEY = "lastUpdated";
-
+    private static final Logger logger = LoggerFactory.getLogger( ExportServiceImpl.class );
 
     private final AllEntityIdsObservable allEntityIdsObservable;
-    private final MapManager mapManager;
-    private final MapManagerFactory mapManagerFactory;
-    private final CollectionSettingsFactory collectionSettingsFactory;
     private final EntityCollectionManagerFactory entityCollectionManagerFactory;
     private final ManagerCache managerCache;
-
-    ObjectJsonSerializer jsonSerializer = ObjectJsonSerializer.INSTANCE;
+    private final ObjectJsonSerializer jsonSerializer = ObjectJsonSerializer.INSTANCE;
+    private final int exportVersion = 1;
+    private final String keyTotalEntityCount = "__totalEntityCount__";
 
 
 
     @Inject
     public ExportServiceImpl(final AllEntityIdsObservable allEntityIdsObservable,
                              final ManagerCache managerCache,
-                               final MapManagerFactory mapManagerFactory,
-                             final CollectionSettingsFactory collectionSettingsFactory,
                              final EntityCollectionManagerFactory entityCollectionManagerFactory) {
         this.allEntityIdsObservable = allEntityIdsObservable;
-        this.collectionSettingsFactory = collectionSettingsFactory;
         this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-        this.mapManagerFactory = mapManagerFactory;
-        this.mapManager = mapManagerFactory.createMapManager( RESUME_MAP_SCOPE );
         this.managerCache = managerCache;
     }
 
 
     @Override
-    public void export(final ExportRequestBuilder exportRequestBuilder, OutputStream stream) throws IOException {
+    public void export(final ExportRequestBuilder exportRequestBuilder, OutputStream stream) throws RuntimeException {
 
         final ZipOutputStream zipOutputStream = new ZipOutputStream(stream);
+        zipOutputStream.setLevel(9); // best compression to reduce the amount of data to stream over the wire
 
         final ApplicationScope appScope = exportRequestBuilder.getApplicationScope().get();
         final Observable<ApplicationScope> applicationScopes = Observable.just(appScope);
@@ -107,142 +98,172 @@ public class ExportServiceImpl implements ExportService {
 
         GraphManager gm = managerCache.getGraphManager( appScope );
 
-        final AtomicInteger entityFileCount = new AtomicInteger();
-        final AtomicInteger connectionFileCount = new AtomicInteger();
+        final AtomicInteger entityFileCount = new AtomicInteger(); // entities are batched into files
+        final AtomicInteger connectionCount = new AtomicInteger();
+        final Map<String, AtomicInteger> collectionStats = new HashMap<>();
+        collectionStats.put(keyTotalEntityCount, new AtomicInteger());
+        final Map<String, Object> infoMap = new HashMap<>();
+        infoMap.put("application", appScope.getApplication().getUuid().toString());
+        infoMap.put("exportVersion", exportVersion);
+        infoMap.put("exportStarted", System.currentTimeMillis());
+
+        logger.info("Starting export of application: {}", appScope.getApplication().getUuid().toString());
 
         allEntityIdsObservable.getEdgesToEntities( applicationScopes, Optional.absent(), Optional.absent() )
-            .buffer( 1000 )
-            .doOnNext( edgeScopes -> {
+            .buffer(500)
+            .map( edgeScopes -> {
 
+                List<Id> entityIds = new ArrayList<>();
+                edgeScopes.forEach( edgeScope -> {
+                    if (edgeScope.getEdge().getTargetNode() != null) {
+                        logger.debug("adding entity to list: {}", edgeScope.getEdge().getTargetNode());
+                        entityIds.add(edgeScope.getEdge().getTargetNode());
+                    }
+                });
 
-                try {
+                return entityIds;
 
-                    final String filenameWithPath = "entities/" +
-                        "entities."+ entityFileCount.get() + ".json";
+            })
+            .flatMap( entityIds -> {
 
-                    logger.debug("adding zip entry: {}", filenameWithPath);
-                    zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath));
+                logger.debug("entityIds: {}", entityIds);
 
-                    edgeScopes.forEach( edgeScope -> {
+                // batch load the entities
+                EntitySet entitySet = ecm.load(entityIds).toBlocking().lastOrDefault(new EntitySetImpl(0));
+
+                final String filenameWithPath = "entities/entities." + entityFileCount.get() + ".json";
+
+                try {
 
+                    logger.debug("adding zip entry: {}", filenameWithPath);
+                    zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath));
 
-                        try {
-                            // load the entity and convert to a normal map
-                            Entity entity = ecm.load(edgeScope.getEdge().getTargetNode()).toBlocking().lastOrDefault(null);
-                            Map entityMap = CpEntityMapUtils.toMap(entity);
+                    entitySet.getEntities().forEach(mvccEntity -> {
 
-                            if (entity != null) {
+                        if (mvccEntity.getEntity().isPresent()) {
+                            Map entityMap = CpEntityMapUtils.toMap(mvccEntity.getEntity().get());
 
+                            try {
 
+                                collectionStats.putIfAbsent(mvccEntity.getId().getType(), new AtomicInteger());
+                                collectionStats.get(mvccEntity.getId().getType()).incrementAndGet();
+                                collectionStats.get(keyTotalEntityCount).incrementAndGet();
 
-                                logger.debug("writing and flushing entity to zip stream: {}", jsonSerializer.toString(entityMap));
+                                logger.debug("writing and flushing entity {} to zip stream for file: {}", mvccEntity.getId().getUuid().toString(), filenameWithPath);
                                 zipOutputStream.write(jsonSerializer.toString(entityMap).getBytes());
                                 zipOutputStream.write("\n".getBytes());
+                                zipOutputStream.flush(); // entities can be large, flush after each
 
 
-                            } else {
-                                logger.warn("{}  did not have corresponding entity, not writing", edgeScope.toString());
-                            }
+                            } catch (IOException e) {
+                                logger.warn("unable to write entry in zip stream for entityId: {}", mvccEntity.getId());
+                                throw new RuntimeException("Unable to export data. Error writing to stream.");
 
-                        } catch (IOException e) {
-                            logger.warn("Unable to create entry in zip export for edge {}", edgeScope);
+                            }
+                        } else {
+                            logger.warn("entityId {} did not have corresponding entity, not writing", mvccEntity.getId());
                         }
-
-
-                        entityFileCount.addAndGet(1);
-
                     });
 
                     zipOutputStream.closeEntry();
-                    zipOutputStream.flush();
+                    entityFileCount.incrementAndGet();
 
-                } catch (IOException e) {
-                    logger.warn("Unable to create entry in zip export for batch entities");
+                }catch (IOException e){
+                    throw new RuntimeException("Unable to export data. Error writing to stream.");
                 }
 
-                //writeStateMeta( jobId, Status.INPROGRESS, count.addAndGet(1), System.currentTimeMillis() );
-            })
-            .doOnNext( edgeScopes -> {
-
-                try{
+                return Observable.from(entitySet.getEntities());
 
-                    final String filenameWithPath = "connections/" +
-                        "connections." + connectionFileCount.get() + ".json";
-
-                    logger.debug("adding zip entry: {}", filenameWithPath);
-                    zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath));
+            })
+            .doOnNext( mvccEntity -> {
 
-                    edgeScopes.forEach(edgeScope -> gm.getEdgeTypesFromSource(CpNamingUtils.createConnectionTypeSearch(edgeScope.getEdge().getTargetNode()))
+                    gm.getEdgeTypesFromSource(CpNamingUtils.createConnectionTypeSearch(mvccEntity.getId()))
                         .flatMap(emittedEdgeType -> {
 
-                            logger.debug("loading edges of type {} from node {}", emittedEdgeType, edgeScope.getEdge().getTargetNode());
-                            return gm.loadEdgesFromSource(new SimpleSearchByEdgeType(edgeScope.getEdge().getTargetNode(),
+                            logger.debug("loading edges of type {} from node {}", emittedEdgeType, mvccEntity.getId());
+                            return gm.loadEdgesFromSource(new SimpleSearchByEdgeType(mvccEntity.getId(),
                                 emittedEdgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent()));
 
                         })
+                        .doOnNext(markedEdge -> {
 
-                        .buffer( 1000 )
-                        .doOnNext(markedEdges -> {
+                            if (!markedEdge.isDeleted() && !markedEdge.isTargetNodeDeleted() && markedEdge.getTargetNode() != null ) {
 
+                                // doing the load to just again make sure bad connections are not exported
+                                Entity entity = ecm.load(markedEdge.getTargetNode()).toBlocking().lastOrDefault(null);
 
+                                if (entity != null) {
 
-                                markedEdges.forEach( markedEdge -> {
+                                    try {
+                                        // since a single stream is being written, and connecitons are loaded per entity,
+                                        // it cannot easily be batched eventlyinto files so write them separately
+                                        final String filenameWithPath = "connections/" +
+                                            markedEdge.getSourceNode().getUuid().toString()+"_" +
+                                            CpNamingUtils.getConnectionNameFromEdgeName(markedEdge.getType()) + "_" +
+                                            markedEdge.getTargetNode().getUuid().toString() + ".json";
 
-                                    if (!markedEdge.isDeleted()) {
+                                        logger.debug("adding zip entry: {}", filenameWithPath);
+                                        zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath));
 
-                                        // doing the load to just again make sure bad connections are not exported
-                                        Entity entity = ecm.load(markedEdge.getTargetNode()).toBlocking().lastOrDefault(null);
 
-                                        if (entity != null) {
+                                        final Map<String,String> connectionMap = new HashMap<String,String>(1){{
+                                            put("sourceNodeUUID", markedEdge.getSourceNode().getUuid().toString() );
+                                            put("relationship", CpNamingUtils.getConnectionNameFromEdgeName(markedEdge.getType()) );
+                                            put("targetNodeUUID", markedEdge.getTargetNode().getUuid().toString());
+                                        }};
 
-                                            try {
+                                        logger.debug("writing and flushing connection to zip stream: {}", jsonSerializer.toString(connectionMap).getBytes());
+                                        zipOutputStream.write(jsonSerializer.toString(connectionMap).getBytes());
 
-                                                final Map<String,String> connectionMap = new HashMap<String,String>(1){{
-                                                    put("sourceNodeUUID", markedEdge.getSourceNode().getUuid().toString() );
-                                                    put("relationship", CpNamingUtils.getConnectionNameFromEdgeName(markedEdge.getType()) );
-                                                    put("targetNodeUUID", markedEdge.getTargetNode().getUuid().toString());
-                                                }};
+                                        zipOutputStream.closeEntry();
+                                        zipOutputStream.flush();
 
-                                                logger.debug("writing and flushing connection to zip stream: {}", jsonSerializer.toString(connectionMap).getBytes());
-                                                zipOutputStream.write(jsonSerializer.toString(connectionMap).getBytes());
-                                                zipOutputStream.write("\n".getBytes());
+                                        connectionCount.incrementAndGet();
 
 
-                                            } catch (IOException e) {
-                                                logger.warn("Unable to create entry in zip export for edge {}", markedEdge.toString());
-                                            }
-                                        } else {
-                                            logger.warn("Exported connection has a missing target node, not creating connection in export. Edge: {}", markedEdge);
-                                        }
+                                    } catch (IOException e) {
+                                        logger.warn("Unable to create entry in zip export for edge {}", markedEdge.toString());
+                                        throw new RuntimeException("Unable to export data. Error writing to stream.");
                                     }
+                                } else {
+                                    logger.warn("Exported connection has a missing target node, not creating connection in export. Edge: {}", markedEdge);
+                                }
+                            }
 
-                                });
-
-
+                        }).toBlocking().lastOrDefault(null);
+            })
+            .doOnCompleted(() -> {
 
+                infoMap.put("exportFinished", System.currentTimeMillis());
 
-                        }).toBlocking().lastOrDefault(null));
 
-                    connectionFileCount.addAndGet(1);
+                try {
 
+                    zipOutputStream.putNextEntry(new ZipEntry("metadata.json"));
+                    zipOutputStream.write(jsonSerializer.toString(infoMap).getBytes());
                     zipOutputStream.closeEntry();
-                    zipOutputStream.flush();
 
+                    zipOutputStream.putNextEntry(new ZipEntry("stats.json"));
+                    Map<String, Object> stats = new HashMap<>();
+                    stats.put("totalEntities", collectionStats.get(keyTotalEntityCount).get());
+                    stats.put("totalConnections", connectionCount.get());
+                    collectionStats.remove(keyTotalEntityCount);
+                    stats.put("collectionCounts", new HashMap<String, Integer>(collectionStats.size()){{
+                        collectionStats.forEach( (collection,count) -> {
+                            put(InflectionUtils.pluralize(collection),count.get());
+                        });
+                    }});
+                    zipOutputStream.write(jsonSerializer.toString(stats).getBytes());
+                    zipOutputStream.closeEntry();
 
-                } catch (IOException e) {
-                    logger.warn("Unable to create entry in zip export for batch connections");
-                }
-
-            })
-            .doOnCompleted(() -> {
-
-                //writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() );
-                try {
                     logger.debug("closing zip stream");
                     zipOutputStream.close();
 
+                    logger.info("Finished export of application: {}", appScope.getApplication().getUuid().toString());
+
+
                 } catch (IOException e) {
-                    logger.error( "unable to close zip stream");
+                    throw new RuntimeException("Unable to export data due to inability to close zip stream.");
                 }
 
             })
@@ -255,30 +276,6 @@ public class ExportServiceImpl implements ExportService {
         return new ExportRequestBuilderImpl();
     }
 
-
-
-
-    /**
-     * Write our state meta data into cassandra so everyone can see it
-     * @param jobId
-     * @param status
-     * @param processedCount
-     * @param lastUpdated
-     */
-    private void writeStateMeta( final String jobId, final Status status, final long processedCount,
-                                 final long lastUpdated ) {
-
-        if(logger.isDebugEnabled()) {
-            logger.debug( "Flushing state for jobId {}, status {}, processedCount {}, lastUpdated {}",
-                    jobId, status, processedCount, lastUpdated);
-        }
-
-        mapManager.putString( jobId + MAP_STATUS_KEY, status.name() );
-        mapManager.putLong( jobId + MAP_COUNT_KEY, processedCount );
-        mapManager.putLong( jobId + MAP_UPDATED_KEY, lastUpdated );
-    }
-
-
 }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b37fc81e/stack/core/src/test/java/org/apache/usergrid/corepersistence/export/ExportServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/export/ExportServiceIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/export/ExportServiceIT.java
new file mode 100644
index 0000000..7412ef0
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/export/ExportServiceIT.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.export;
+
+import com.google.inject.Injector;
+import org.apache.usergrid.AbstractCoreIT;
+import org.apache.usergrid.cassandra.SpringResource;
+import org.apache.usergrid.corepersistence.EntityWriteHelper;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.SimpleEntityRef;
+
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+
+import static org.junit.Assert.assertTrue;
+
+
+public class ExportServiceIT extends AbstractCoreIT {
+
+
+    @Test
+    public void testExport() throws Exception {
+
+        Injector injector =  SpringResource.getInstance().getBean(Injector.class);
+
+        ExportService exportService = injector.getInstance(ExportService.class);
+
+        final EntityManager em = app.getEntityManager();
+
+        // create two types of entities
+
+        final String type1 = "type1thing";
+        final String type2 = "type2thing";
+        final int size = 1;
+
+        final Set<Id> type1Identities = EntityWriteHelper.createTypes( em, type1, size );
+        final Set<Id> type2Identities = EntityWriteHelper.createTypes( em, type2, size );
+
+        // connect the first type1 entity to all type2 entities
+
+        final Id source = type1Identities.iterator().next();
+        final Set<Id> connections = new HashSet<>();
+
+        for ( Id target : type2Identities ) {
+            em.createConnection( SimpleEntityRef.fromId( source ),
+                "likes", SimpleEntityRef.fromId( target ) );
+            connections.add( target );
+        }
+
+        ExportRequestBuilder builder = new ExportRequestBuilderImpl().withApplicationId(app.getId());
+
+        // fill the output stream
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        exportService.export(builder, stream);
+
+        // convert the output stream to an input stream and read it as a zip
+        InputStream inputStream = new ByteArrayInputStream(stream.toByteArray());
+        ZipInputStream zip = new ZipInputStream(inputStream);
+
+
+        boolean entityEntryExists = false;
+        boolean connectionEntryExists = false;
+        boolean statsEntryExists = false;
+        boolean metaEntryExists = false;
+
+        final String entityFile = "entities/entities.0.json";
+        final String connectionFile = "connections/"+source.getUuid().toString()+"_"+"likes"+"_"+connections.iterator().next().getUuid().toString()+".json";
+        final String statsFile = "stats.json";
+        final String metaFile = "metadata.json";
+
+        ZipEntry zipEntry;
+        while ( (zipEntry = zip.getNextEntry()) != null ) {
+
+                final String name = zipEntry.getName();
+
+                if (name.equals(entityFile)) {
+                    entityEntryExists = true;
+                }
+                if(name.equals(connectionFile)){
+                    connectionEntryExists = true;
+                }
+                if(name.equals(statsFile)){
+                    statsEntryExists = true;
+                }
+                if(name.equals(metaFile)){
+                    metaEntryExists = true;
+                }
+
+        }
+
+        assertTrue("Expected zip entries are missing", entityEntryExists && connectionEntryExists && statsEntryExists && metaEntryExists);
+
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b37fc81e/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
index 54702a8..438f731 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
@@ -146,11 +146,13 @@ public class MvccEntitySerializationStrategyV3Impl implements MvccEntitySerializ
 
         for ( final Id entityId : entityIds ) {
 
-            final ScopedRowKey<Id> rowKey =
-                    ScopedRowKey.fromKey( applicationId, entityId );
+            if ( entityId != null ) { // the size of entityIds is checked as preconditions, but the values are not
+                final ScopedRowKey<Id> rowKey =
+                    ScopedRowKey.fromKey(applicationId, entityId);
 
 
-            rowKeys.add( rowKey );
+                rowKeys.add(rowKey);
+            }
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b37fc81e/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/AbstractExceptionMapper.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/AbstractExceptionMapper.java b/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/AbstractExceptionMapper.java
index 807e0c9..8319ff5 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/AbstractExceptionMapper.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/AbstractExceptionMapper.java
@@ -71,8 +71,7 @@ public abstract class AbstractExceptionMapper<E extends java.lang.Throwable> imp
 
         if ( status >= 500 ) {
             // only log real errors as errors
-            logger.error( "{} 5XX Uncaught Exception ({}), {}", e.getClass().getCanonicalName(), status, e );
-
+            logger.error( "{} 5XX Uncaught Exception ({})", e.getClass().getCanonicalName(), status, e );
         } else {
             if (logger.isDebugEnabled()) {
                 logger.debug( "{} Following Exception Thrown ({}), {}", e.getClass().getCanonicalName(), status, e );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b37fc81e/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/OrganizationResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/OrganizationResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/OrganizationResource.java
index a12e484..7411ea4 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/OrganizationResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/OrganizationResource.java
@@ -66,8 +66,8 @@ public class OrganizationResource extends AbstractContextResource {
 
     private static final Logger logger = LoggerFactory.getLogger( OrganizationsResource.class );
 
-    @Autowired
-    protected ExportService exportService;
+    //@Autowired
+    //protected ExportService exportService;
 
     OrganizationInfo organization;
 
@@ -281,99 +281,99 @@ public class OrganizationResource extends AbstractContextResource {
         return response;
     }
 
-    @POST
-    @Path("export")
-    @Consumes(APPLICATION_JSON)
-    @RequireOrganizationAccess
-    public Response exportPostJson( @Context UriInfo ui,Map<String, Object> json,
-                                    @QueryParam("callback") @DefaultValue("") String callback )
-            throws OAuthSystemException {
-
-        if (logger.isTraceEnabled()) {
-            logger.trace("executePostJson");
-        }
-
-        Map<String, String> uuidRet = new HashMap<>();
-
-        try {
-            Object propertiesObj = json.get("properties");
-            if (propertiesObj == null) {
-                throw new NullArgumentException("Could not find 'properties'");
-            }
-            if (!(propertiesObj instanceof Map)) {
-                throw new IllegalArgumentException("'properties' not a map");
-            }
-
-            @SuppressWarnings("unchecked")
-            Map<String,Object> properties = (Map<String,Object>)propertiesObj;
-
-            String storage_provider = ( String ) properties.get( "storage_provider" );
-            if(storage_provider == null) {
-                throw new NullArgumentException( "Could not find field 'storage_provider'" );
-            }
-
-            Object storageInfoObj = properties.get("storage_info");
-            if(storageInfoObj == null) {
-                throw new NullArgumentException( "Could not find field 'storage_info'" );
-            }
-            @SuppressWarnings("unchecked")
-            Map<String,Object> storage_info = (Map<String, Object>)storageInfoObj;
-
-            String bucketName = ( String ) storage_info.get( "bucket_location" );
-            String accessId = ( String ) storage_info.get( "s3_access_id" );
-            String secretKey = ( String ) storage_info.get( "s3_key" );
-
-            if ( bucketName == null ) {
-                throw new NullArgumentException( "Could not find field 'bucketName'" );
-            }
-            if ( accessId == null ) {
-                throw new NullArgumentException( "Could not find field 's3_access_id'" );
-            }
-            if ( secretKey == null ) {
-
-                throw new NullArgumentException( "Could not find field 's3_key'" );
-            }
-
-            json.put( "organizationId",organization.getUuid());
-
-            UUID jobUUID = exportService.schedule( json );
-            uuidRet.put( "Export Entity", jobUUID.toString() );
-        }
-        catch ( NullArgumentException e ) {
-            return Response.status( SC_BAD_REQUEST ).type( JSONPUtils.jsonMediaType( callback ) )
-                           .entity( ServiceResource.wrapWithCallback( e.getMessage(), callback ) ).build();
-        }
-        catch ( Exception e ) {
-            //TODO:throw descriptive error message and or include on in the response
-            //TODO:fix below, it doesn't work if there is an exception. Make it look like the OauthResponse.
-            return Response.status(  SC_INTERNAL_SERVER_ERROR ).type( JSONPUtils.jsonMediaType( callback ) )
-                           .entity( ServiceResource.wrapWithCallback( e.getMessage(), callback ) ).build();
-        }
-        return Response.status( SC_ACCEPTED ).entity( uuidRet ).build();
-    }
-
-    @GET
-    @RequireOrganizationAccess
-    @Path("export/{exportEntity: [A-Fa-f0-9]{8}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{12}}")
-    public Response exportGetJson( @Context UriInfo ui, @PathParam("exportEntity") UUID exportEntityUUIDStr,
-                                   @QueryParam("callback") @DefaultValue("") String callback ) throws Exception {
-
-        Export entity;
-        try {
-            entity = smf.getServiceManager( emf.getManagementAppId() ).getEntityManager()
-                        .get( exportEntityUUIDStr, Export.class );
-        }
-        catch ( Exception e ) { //this might not be a bad request and needs better error checking
-            return Response.status( SC_BAD_REQUEST ).type( JSONPUtils.jsonMediaType( callback ) )
-                           .entity( ServiceResource.wrapWithCallback( e.getMessage(), callback ) ).build();
-        }
-
-        if ( entity == null ) {
-            return Response.status( SC_BAD_REQUEST ).build();
-        }
-
-        return Response.status( SC_OK ).entity( entity).build();
-    }
+//    @POST
+//    @Path("export")
+//    @Consumes(APPLICATION_JSON)
+//    @RequireOrganizationAccess
+//    public Response exportPostJson( @Context UriInfo ui,Map<String, Object> json,
+//                                    @QueryParam("callback") @DefaultValue("") String callback )
+//            throws OAuthSystemException {
+//
+//        if (logger.isTraceEnabled()) {
+//            logger.trace("executePostJson");
+//        }
+//
+//        Map<String, String> uuidRet = new HashMap<>();
+//
+//        try {
+//            Object propertiesObj = json.get("properties");
+//            if (propertiesObj == null) {
+//                throw new NullArgumentException("Could not find 'properties'");
+//            }
+//            if (!(propertiesObj instanceof Map)) {
+//                throw new IllegalArgumentException("'properties' not a map");
+//            }
+//
+//            @SuppressWarnings("unchecked")
+//            Map<String,Object> properties = (Map<String,Object>)propertiesObj;
+//
+//            String storage_provider = ( String ) properties.get( "storage_provider" );
+//            if(storage_provider == null) {
+//                throw new NullArgumentException( "Could not find field 'storage_provider'" );
+//            }
+//
+//            Object storageInfoObj = properties.get("storage_info");
+//            if(storageInfoObj == null) {
+//                throw new NullArgumentException( "Could not find field 'storage_info'" );
+//            }
+//            @SuppressWarnings("unchecked")
+//            Map<String,Object> storage_info = (Map<String, Object>)storageInfoObj;
+//
+//            String bucketName = ( String ) storage_info.get( "bucket_location" );
+//            String accessId = ( String ) storage_info.get( "s3_access_id" );
+//            String secretKey = ( String ) storage_info.get( "s3_key" );
+//
+//            if ( bucketName == null ) {
+//                throw new NullArgumentException( "Could not find field 'bucketName'" );
+//            }
+//            if ( accessId == null ) {
+//                throw new NullArgumentException( "Could not find field 's3_access_id'" );
+//            }
+//            if ( secretKey == null ) {
+//
+//                throw new NullArgumentException( "Could not find field 's3_key'" );
+//            }
+//
+//            json.put( "organizationId",organization.getUuid());
+//
+//            UUID jobUUID = exportService.schedule( json );
+//            uuidRet.put( "Export Entity", jobUUID.toString() );
+//        }
+//        catch ( NullArgumentException e ) {
+//            return Response.status( SC_BAD_REQUEST ).type( JSONPUtils.jsonMediaType( callback ) )
+//                           .entity( ServiceResource.wrapWithCallback( e.getMessage(), callback ) ).build();
+//        }
+//        catch ( Exception e ) {
+//            //TODO:throw descriptive error message and or include on in the response
+//            //TODO:fix below, it doesn't work if there is an exception. Make it look like the OauthResponse.
+//            return Response.status(  SC_INTERNAL_SERVER_ERROR ).type( JSONPUtils.jsonMediaType( callback ) )
+//                           .entity( ServiceResource.wrapWithCallback( e.getMessage(), callback ) ).build();
+//        }
+//        return Response.status( SC_ACCEPTED ).entity( uuidRet ).build();
+//    }
+
+//    @GET
+//    @RequireOrganizationAccess
+//    @Path("export/{exportEntity: [A-Fa-f0-9]{8}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{12}}")
+//    public Response exportGetJson( @Context UriInfo ui, @PathParam("exportEntity") UUID exportEntityUUIDStr,
+//                                   @QueryParam("callback") @DefaultValue("") String callback ) throws Exception {
+//
+//        Export entity;
+//        try {
+//            entity = smf.getServiceManager( emf.getManagementAppId() ).getEntityManager()
+//                        .get( exportEntityUUIDStr, Export.class );
+//        }
+//        catch ( Exception e ) { //this might not be a bad request and needs better error checking
+//            return Response.status( SC_BAD_REQUEST ).type( JSONPUtils.jsonMediaType( callback ) )
+//                           .entity( ServiceResource.wrapWithCallback( e.getMessage(), callback ) ).build();
+//        }
+//
+//        if ( entity == null ) {
+//            return Response.status( SC_BAD_REQUEST ).build();
+//        }
+//
+//        return Response.status( SC_OK ).entity( entity).build();
+//    }
 
 
     protected Set<String> getSetFromCommaSeparatedString(String input) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b37fc81e/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/applications/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/applications/ApplicationResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/applications/ApplicationResource.java
index a8ed8dc..79973c3 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/applications/ApplicationResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/applications/ApplicationResource.java
@@ -74,8 +74,8 @@ public class ApplicationResource extends AbstractContextResource {
 
     public static final String CONFIRM_APPLICATION_IDENTIFIER = "confirm_application_identifier";
 
-    @Autowired
-    protected ExportService exportService;
+    //@Autowired
+    //protected ExportService exportService;
 
     OrganizationInfo organization;
     UUID applicationId;
@@ -267,161 +267,161 @@ public class ApplicationResource extends AbstractContextResource {
         return response;
     }
 
-    @POST
-    @Path("export")
-    @Consumes(APPLICATION_JSON)
-    @RequireOrganizationAccess
-    public Response exportPostJson( @Context UriInfo ui,Map<String, Object> json,
-                                    @QueryParam("callback") @DefaultValue("") String callback )
-            throws OAuthSystemException {
-
-        UsergridAwsCredentials uac = new UsergridAwsCredentials();
-
-        UUID jobUUID = null;
-        Map<String, String> uuidRet = new HashMap<String, String>();
-
-        Map<String,Object> properties;
-        Map<String, Object> storage_info;
-
-        try {
-            if((properties = ( Map<String, Object> )  json.get( "properties" )) == null){
-                throw new NullArgumentException("Could not find 'properties'");
-            }
-            storage_info = ( Map<String, Object> ) properties.get( "storage_info" );
-            String storage_provider = ( String ) properties.get( "storage_provider" );
-            if(storage_provider == null) {
-                throw new NullArgumentException( "Could not find field 'storage_provider'" );
-            }
-            if(storage_info == null) {
-                throw new NullArgumentException( "Could not find field 'storage_info'" );
-            }
-
-
-            String bucketName = ( String ) storage_info.get( "bucket_location" );
-            String accessId = ( String ) storage_info.get( "s3_access_id" );
-            String secretKey = ( String ) storage_info.get( "s3_key" );
-
-            if ( bucketName == null ) {
-                throw new NullArgumentException( "Could not find field 'bucketName'" );
-            }
-            if ( accessId == null ) {
-                throw new NullArgumentException( "Could not find field 's3_access_id'" );
-            }
-            if ( secretKey == null ) {
-
-                throw new NullArgumentException( "Could not find field 's3_key'" );
-            }
-
-            json.put("organizationId", organization.getUuid());
-            json.put( "applicationId",applicationId);
-
-            jobUUID = exportService.schedule( json );
-            uuidRet.put( "Export Entity", jobUUID.toString() );
-        }
-        catch ( NullArgumentException e ) {
-            return Response.status( SC_BAD_REQUEST )
-                .type( JSONPUtils.jsonMediaType( callback ) )
-                .entity( ServiceResource.wrapWithCallback( e.getMessage(), callback ) ).build();
-        }
-        catch ( Exception e ) {
-            // TODO: throw descriptive error message and or include on in the response
-            // TODO: fix below, it doesn't work if there is an exception.
-            // Make it look like the OauthResponse.
-            return Response.status( SC_INTERNAL_SERVER_ERROR )
-                .type( JSONPUtils.jsonMediaType( callback ) )
-                .entity( ServiceResource.wrapWithCallback( e.getMessage(), callback ) ).build();
-        }
-
-        return Response.status( SC_ACCEPTED ).entity( uuidRet ).build();
-    }
-
-    @POST
-    @Path("collection/{collection_name}/export")
-    @Consumes(APPLICATION_JSON)
-    @RequireOrganizationAccess
-    public Response exportPostJson( @Context UriInfo ui,
-            @PathParam( "collection_name" ) String collection_name ,Map<String, Object> json,
-            @QueryParam("callback") @DefaultValue("") String callback )
-            throws OAuthSystemException {
-
-        UsergridAwsCredentials uac = new UsergridAwsCredentials();
-        UUID jobUUID = null;
-        String colExport = collection_name;
-        Map<String, String> uuidRet = new HashMap<String, String>();
-
-        Map<String,Object> properties;
-        Map<String, Object> storage_info;
-
-        try {
-            //checkJsonExportProperties(json);
-            if((properties = ( Map<String, Object> )  json.get( "properties" )) == null){
-                throw new NullArgumentException("Could not find 'properties'");
-            }
-            storage_info = ( Map<String, Object> ) properties.get( "storage_info" );
-            String storage_provider = ( String ) properties.get( "storage_provider" );
-            if(storage_provider == null) {
-                throw new NullArgumentException( "Could not find field 'storage_provider'" );
-            }
-            if(storage_info == null) {
-                throw new NullArgumentException( "Could not find field 'storage_info'" );
-            }
-
-            String bucketName = ( String ) storage_info.get( "bucket_location" );
-            String accessId = ( String ) storage_info.get( "s3_access_id" );
-            String secretKey = ( String ) storage_info.get( "s3_key" );
-
-            if ( accessId == null ) {
-                throw new NullArgumentException( "Could not find field 's3_access_id'" );
-            }
-            if ( secretKey == null ) {
-                throw new NullArgumentException( "Could not find field 's3_key'" );
-            }
-
-            if(bucketName == null) {
-                throw new NullArgumentException( "Could not find field 'bucketName'" );
-            }
-
-            json.put( "organizationId",organization.getUuid() );
-            json.put( "applicationId", applicationId);
-            json.put( "collectionName", colExport);
-
-            jobUUID = exportService.schedule( json );
-            uuidRet.put( "Export Entity", jobUUID.toString() );
-        }
-        catch ( NullArgumentException e ) {
-            return Response.status( SC_BAD_REQUEST )
-                .type( JSONPUtils.jsonMediaType( callback ) )
-                .entity( ServiceResource.wrapWithCallback( e.getMessage(), callback ) )
-                .build();
-        }
-        catch ( Exception e ) {
-
-            // TODO: throw descriptive error message and or include on in the response
-            // TODO: fix below, it doesn't work if there is an exception.
-            // Make it look like the OauthResponse.
-
-            OAuthResponse errorMsg = OAuthResponse.errorResponse( SC_INTERNAL_SERVER_ERROR )
-                .setErrorDescription( e.getMessage() )
-                .buildJSONMessage();
-
-            return Response.status( errorMsg.getResponseStatus() )
-                .type( JSONPUtils.jsonMediaType( callback ) )
-                .entity( ServiceResource.wrapWithCallback( errorMsg.getBody(), callback ) )
-                .build();
-        }
-
-        return Response.status( SC_ACCEPTED ).entity( uuidRet ).build();
-    }
-
-
-    @Path( "imports" )
-    public ImportsResource importGetJson( @Context UriInfo ui,
-                                          @QueryParam( "callback" ) @DefaultValue( "" ) String callback )
-        throws Exception {
-
-
-        return getSubResource( ImportsResource.class ).init( organization, application );
-    }
+//    @POST
+//    @Path("export")
+//    @Consumes(APPLICATION_JSON)
+//    @RequireOrganizationAccess
+//    public Response exportPostJson( @Context UriInfo ui,Map<String, Object> json,
+//                                    @QueryParam("callback") @DefaultValue("") String callback )
+//            throws OAuthSystemException {
+//
+//        UsergridAwsCredentials uac = new UsergridAwsCredentials();
+//
+//        UUID jobUUID = null;
+//        Map<String, String> uuidRet = new HashMap<String, String>();
+//
+//        Map<String,Object> properties;
+//        Map<String, Object> storage_info;
+//
+//        try {
+//            if((properties = ( Map<String, Object> )  json.get( "properties" )) == null){
+//                throw new NullArgumentException("Could not find 'properties'");
+//            }
+//            storage_info = ( Map<String, Object> ) properties.get( "storage_info" );
+//            String storage_provider = ( String ) properties.get( "storage_provider" );
+//            if(storage_provider == null) {
+//                throw new NullArgumentException( "Could not find field 'storage_provider'" );
+//            }
+//            if(storage_info == null) {
+//                throw new NullArgumentException( "Could not find field 'storage_info'" );
+//            }
+//
+//
+//            String bucketName = ( String ) storage_info.get( "bucket_location" );
+//            String accessId = ( String ) storage_info.get( "s3_access_id" );
+//            String secretKey = ( String ) storage_info.get( "s3_key" );
+//
+//            if ( bucketName == null ) {
+//                throw new NullArgumentException( "Could not find field 'bucketName'" );
+//            }
+//            if ( accessId == null ) {
+//                throw new NullArgumentException( "Could not find field 's3_access_id'" );
+//            }
+//            if ( secretKey == null ) {
+//
+//                throw new NullArgumentException( "Could not find field 's3_key'" );
+//            }
+//
+//            json.put("organizationId", organization.getUuid());
+//            json.put( "applicationId",applicationId);
+//
+//            jobUUID = exportService.schedule( json );
+//            uuidRet.put( "Export Entity", jobUUID.toString() );
+//        }
+//        catch ( NullArgumentException e ) {
+//            return Response.status( SC_BAD_REQUEST )
+//                .type( JSONPUtils.jsonMediaType( callback ) )
+//                .entity( ServiceResource.wrapWithCallback( e.getMessage(), callback ) ).build();
+//        }
+//        catch ( Exception e ) {
+//            // TODO: throw descriptive error message and or include on in the response
+//            // TODO: fix below, it doesn't work if there is an exception.
+//            // Make it look like the OauthResponse.
+//            return Response.status( SC_INTERNAL_SERVER_ERROR )
+//                .type( JSONPUtils.jsonMediaType( callback ) )
+//                .entity( ServiceResource.wrapWithCallback( e.getMessage(), callback ) ).build();
+//        }
+//
+//        return Response.status( SC_ACCEPTED ).entity( uuidRet ).build();
+//    }
+//
+//    @POST
+//    @Path("collection/{collection_name}/export")
+//    @Consumes(APPLICATION_JSON)
+//    @RequireOrganizationAccess
+//    public Response exportPostJson( @Context UriInfo ui,
+//            @PathParam( "collection_name" ) String collection_name ,Map<String, Object> json,
+//            @QueryParam("callback") @DefaultValue("") String callback )
+//            throws OAuthSystemException {
+//
+//        UsergridAwsCredentials uac = new UsergridAwsCredentials();
+//        UUID jobUUID = null;
+//        String colExport = collection_name;
+//        Map<String, String> uuidRet = new HashMap<String, String>();
+//
+//        Map<String,Object> properties;
+//        Map<String, Object> storage_info;
+//
+//        try {
+//            //checkJsonExportProperties(json);
+//            if((properties = ( Map<String, Object> )  json.get( "properties" )) == null){
+//                throw new NullArgumentException("Could not find 'properties'");
+//            }
+//            storage_info = ( Map<String, Object> ) properties.get( "storage_info" );
+//            String storage_provider = ( String ) properties.get( "storage_provider" );
+//            if(storage_provider == null) {
+//                throw new NullArgumentException( "Could not find field 'storage_provider'" );
+//            }
+//            if(storage_info == null) {
+//                throw new NullArgumentException( "Could not find field 'storage_info'" );
+//            }
+//
+//            String bucketName = ( String ) storage_info.get( "bucket_location" );
+//            String accessId = ( String ) storage_info.get( "s3_access_id" );
+//            String secretKey = ( String ) storage_info.get( "s3_key" );
+//
+//            if ( accessId == null ) {
+//                throw new NullArgumentException( "Could not find field 's3_access_id'" );
+//            }
+//            if ( secretKey == null ) {
+//                throw new NullArgumentException( "Could not find field 's3_key'" );
+//            }
+//
+//            if(bucketName == null) {
+//                throw new NullArgumentException( "Could not find field 'bucketName'" );
+//            }
+//
+//            json.put( "organizationId",organization.getUuid() );
+//            json.put( "applicationId", applicationId);
+//            json.put( "collectionName", colExport);
+//
+//            jobUUID = exportService.schedule( json );
+//            uuidRet.put( "Export Entity", jobUUID.toString() );
+//        }
+//        catch ( NullArgumentException e ) {
+//            return Response.status( SC_BAD_REQUEST )
+//                .type( JSONPUtils.jsonMediaType( callback ) )
+//                .entity( ServiceResource.wrapWithCallback( e.getMessage(), callback ) )
+//                .build();
+//        }
+//        catch ( Exception e ) {
+//
+//            // TODO: throw descriptive error message and or include on in the response
+//            // TODO: fix below, it doesn't work if there is an exception.
+//            // Make it look like the OauthResponse.
+//
+//            OAuthResponse errorMsg = OAuthResponse.errorResponse( SC_INTERNAL_SERVER_ERROR )
+//                .setErrorDescription( e.getMessage() )
+//                .buildJSONMessage();
+//
+//            return Response.status( errorMsg.getResponseStatus() )
+//                .type( JSONPUtils.jsonMediaType( callback ) )
+//                .entity( ServiceResource.wrapWithCallback( errorMsg.getBody(), callback ) )
+//                .build();
+//        }
+//
+//        return Response.status( SC_ACCEPTED ).entity( uuidRet ).build();
+//    }
+//
+//
+//    @Path( "imports" )
+//    public ImportsResource importGetJson( @Context UriInfo ui,
+//                                          @QueryParam( "callback" ) @DefaultValue( "" ) String callback )
+//        throws Exception {
+//
+//
+//        return getSubResource( ImportsResource.class ).init( organization, application );
+//    }
 
     @GET
     @Path("/status")

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b37fc81e/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index b43594a..d8c314e 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -134,7 +134,14 @@ public class NotificationsService extends AbstractCollectionService {
         postMeter.mark();
         try {
 
-            validate(null, context.getPayload());
+            validatePayload(null, context.getPayload());
+
+            final Object deliverValue = context.getProperties().get("deliver");
+            if ( deliverValue != null && deliverValue instanceof Number && (long)context.getProperties().get("deliver") < System.currentTimeMillis() ){
+                throw new IllegalArgumentException("Property 'deliver' cannot have a value in the past. It is expected to be a unix timestamp in milliseconds");
+            } else if (  deliverValue != null && !(deliverValue instanceof Number) ){
+                throw new IllegalArgumentException("Property 'deliver' must be a number. It is expected to be a unix timestamp in milliseconds");
+            }
 
             // perform some input validates on useGraph payload property vs. ql= path query
             final List<ServiceParameter> parameters = context.getRequest().getOriginalParameters();
@@ -227,7 +234,7 @@ public class NotificationsService extends AbstractCollectionService {
     public Entity updateEntity(ServiceRequest request, EntityRef ref,
             ServicePayload payload) throws Exception {
 
-        validate(ref, payload);
+        validatePayload(ref, payload);
 
         Notification notification = em.get(ref, Notification.class);
 
@@ -273,7 +280,7 @@ public class NotificationsService extends AbstractCollectionService {
     }
 
     // validate payloads
-    private void validate(EntityRef ref, ServicePayload servicePayload)
+    private void validatePayload(EntityRef ref, ServicePayload servicePayload)
             throws Exception {
         Object obj_payloads = servicePayload.getProperty("payloads");
         if (obj_payloads == null && ref == null) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b37fc81e/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
index bcfd654..2a757ca 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
@@ -33,12 +33,8 @@ import java.util.*;
 
 import org.apache.usergrid.services.ServiceAction;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
 import static org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl.NOTIFIER_ID_POSTFIX;
+import static org.junit.Assert.*;
 
 
 @NotThreadSafe
@@ -809,6 +805,49 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         }
     }
 
+    @Test
+    public void pushNotificationBadDeliverProperty() throws Exception {
+
+        // try with timestamp in the past
+        app.clear();
+        String payload = getPayload();
+        Map<String, String> payloads = new HashMap<String, String>(1);
+        payloads.put(notifier.getName(), payload);
+        app.put("payloads", payloads);
+        app.put("deliver", System.currentTimeMillis() - 1000); // some time in the past
+        app.put("debug",true);
+
+        // post notification to
+        try {
+            app.testRequest(ServiceAction.POST, 1,"devices",device1.getUuid(), "notifications").getEntity();
+        } catch (Exception e) {
+            // should be bad request due to time in the past
+            assertEquals(e.getClass(), IllegalArgumentException.class);
+            assertTrue(e.getMessage().contains("Property 'deliver' cannot have a value in the past"));
+        }
+
+
+        // try with invalid type of a string for deliver, should be a number
+        app.clear();
+        payload = getPayload();
+        payloads = new HashMap<String, String>(1);
+        payloads.put(notifier.getName(), payload);
+        app.put("payloads", payloads);
+        app.put("deliver", "notatime"); // some time in the past
+        app.put("debug",true);
+
+        // post notification
+        try {
+            app.testRequest(ServiceAction.POST, 1,"devices",device1.getUuid(), "notifications").getEntity();
+        } catch (Exception e) {
+            // should be bad request due invalid deliver value
+            assertEquals(e.getClass(), IllegalArgumentException.class);
+            assertTrue(e.getMessage().contains("Property 'deliver' must be a number"));
+        }
+
+
+    }
+
     private String getPayload(){
         ApnsPayloadBuilder builder = new ApnsPayloadBuilder();
         builder.setAlertBody("Hello, World!");


[4/4] usergrid git commit: Fix issue where duplicate receipts were created when a notification failed.

Posted by mr...@apache.org.
Fix issue where duplicate receipts were created when a notification failed.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/8b63aae7
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/8b63aae7
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/8b63aae7

Branch: refs/heads/master
Commit: 8b63aae7def01cbf4f83e13bc5ef1b237344174b
Parents: b37fc81
Author: Michael Russo <ru...@google.com>
Authored: Fri Mar 31 00:40:39 2017 -0700
Committer: Michael Russo <ru...@google.com>
Committed: Fri Mar 31 00:40:39 2017 -0700

----------------------------------------------------------------------
 .../org/apache/usergrid/services/notifications/TaskManager.java     | 1 -
 1 file changed, 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/8b63aae7/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
index 870cae9..678b88a 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
@@ -112,7 +112,6 @@ public class TaskManager {
                 this.saveReceipt( notification, new SimpleEntityRef( Device.ENTITY_TYPE, deviceUUID ), receipt, true );
             }
 
-            completed(notifier, deviceUUID);
             finishedBatch();
 
         } catch (Exception e){