You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2017/03/31 07:44:48 UTC

[2/4] usergrid git commit: Batch write to the export stream and use a cleaner filename.

Batch write to the export stream and use a cleaner filename.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2d56d813
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2d56d813
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2d56d813

Branch: refs/heads/master
Commit: 2d56d813a9d99295ef72ab055e8600f62df364e9
Parents: 82e7ec5
Author: Michael Russo <ru...@google.com>
Authored: Mon Mar 27 18:48:49 2017 -0700
Committer: Michael Russo <ru...@google.com>
Committed: Mon Mar 27 18:48:49 2017 -0700

----------------------------------------------------------------------
 .../export/ExportServiceImpl.java               | 153 ++++++++++++-------
 .../rest/applications/ApplicationResource.java  |   4 +-
 2 files changed, 104 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/2d56d813/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java
index ebbcc58..47d6982 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java
@@ -48,7 +48,9 @@ import rx.schedulers.Schedulers;
 
 
 import java.io.*;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
 
@@ -98,91 +100,138 @@ public class ExportServiceImpl implements ExportService {
 
         final ZipOutputStream zipOutputStream = new ZipOutputStream(stream);
 
-        //final AtomicInteger count = new AtomicInteger();
         final ApplicationScope appScope = exportRequestBuilder.getApplicationScope().get();
         final Observable<ApplicationScope> applicationScopes = Observable.just(appScope);
 
-
-        //final String jobId = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
         final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(appScope);
 
-        final String rootPath = appScope.getApplication().getUuid().toString();
-
         GraphManager gm = managerCache.getGraphManager( appScope );
 
+        final AtomicInteger entityFileCount = new AtomicInteger();
+        final AtomicInteger connectionFileCount = new AtomicInteger();
+
         allEntityIdsObservable.getEdgesToEntities( applicationScopes, Optional.absent(), Optional.absent() )
-            .doOnNext( edgeScope -> {
+            .buffer( 1000 )
+            .doOnNext( edgeScopes -> {
+
 
                 try {
 
-                    // load the entity and convert to a normal map
-                    Entity entity = ecm.load(edgeScope.getEdge().getTargetNode()).toBlocking().lastOrDefault(null);
-                    Map entityMap = CpEntityMapUtils.toMap(entity);
+                    final String filenameWithPath = "entities/" +
+                        "entities."+ entityFileCount.get() + ".json";
+
+                    logger.debug("adding zip entry: {}", filenameWithPath);
+                    zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath));
+
+                    edgeScopes.forEach( edgeScope -> {
+
+
+                        try {
+                            // load the entity and convert to a normal map
+                            Entity entity = ecm.load(edgeScope.getEdge().getTargetNode()).toBlocking().lastOrDefault(null);
+                            Map entityMap = CpEntityMapUtils.toMap(entity);
+
+                            if (entity != null) {
+
 
-                    if (entity != null) {
-                        final String filenameWithPath = rootPath + "/" +
-                            edgeScope.getEdge().getSourceNode().getUuid().toString() + "_" +
-                            edgeScope.getEdge().getType() + "_" +
-                            edgeScope.getEdge().getTargetNode().getUuid().toString() + ".json";
 
-                        logger.debug("adding zip entry: {}", filenameWithPath);
-                        zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath));
+                                logger.debug("writing and flushing entity to zip stream: {}", jsonSerializer.toString(entityMap));
+                                zipOutputStream.write(jsonSerializer.toString(entityMap).getBytes());
+                                zipOutputStream.write("\n".getBytes());
+
 
+                            } else {
+                                logger.warn("{}  did not have corresponding entity, not writing", edgeScope.toString());
+                            }
+
+                        } catch (IOException e) {
+                            logger.warn("Unable to create entry in zip export for edge {}", edgeScope);
+                        }
 
-                        logger.debug("writing and flushing entity to zip stream: {}", jsonSerializer.toString(entityMap));
-                        zipOutputStream.write(jsonSerializer.toString(entityMap).getBytes());
-                        zipOutputStream.closeEntry();
-                        zipOutputStream.flush();
 
-                    } else {
-                        logger.warn("{}  did not have corresponding entity, not writing", edgeScope.toString());
-                    }
+                        entityFileCount.addAndGet(1);
+
+                    });
+
+                    zipOutputStream.closeEntry();
+                    zipOutputStream.flush();
 
                 } catch (IOException e) {
-                    logger.warn("Unable to create entry in zip export for edge {}", edgeScope);
+                    logger.warn("Unable to create entry in zip export for batch entities");
                 }
 
                 //writeStateMeta( jobId, Status.INPROGRESS, count.addAndGet(1), System.currentTimeMillis() );
             })
-            .flatMap( edgeScope -> {
+            .doOnNext( edgeScopes -> {
 
-                // find all connection types for the each entity emitted from the app
-                return gm.getEdgeTypesFromSource(CpNamingUtils.createConnectionTypeSearch(edgeScope.getEdge().getTargetNode()))
-                    .flatMap(emittedEdgeType -> {
+                try{
 
-                        logger.debug("loading edges of type {} from node {}", emittedEdgeType, edgeScope.getEdge().getTargetNode());
-                        return gm.loadEdgesFromSource(new SimpleSearchByEdgeType( edgeScope.getEdge().getTargetNode(),
-                            emittedEdgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent() ));
+                    final String filenameWithPath = "connections/" +
+                        "connections." + connectionFileCount.get() + ".json";
 
-                    }).doOnNext( markedEdge -> {
+                    logger.debug("adding zip entry: {}", filenameWithPath);
+                    zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath));
 
-                        if (!markedEdge.isDeleted()){
+                    edgeScopes.forEach(edgeScope -> gm.getEdgeTypesFromSource(CpNamingUtils.createConnectionTypeSearch(edgeScope.getEdge().getTargetNode()))
+                        .flatMap(emittedEdgeType -> {
 
-                            // todo, probably don't need to load the target node itself since it would be loaded in normal collection walking
-                            Entity entity = ecm.load(markedEdge.getTargetNode()).toBlocking().lastOrDefault(null);
+                            logger.debug("loading edges of type {} from node {}", emittedEdgeType, edgeScope.getEdge().getTargetNode());
+                            return gm.loadEdgesFromSource(new SimpleSearchByEdgeType(edgeScope.getEdge().getTargetNode(),
+                                emittedEdgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent()));
 
-                            Map entityMap = CpEntityMapUtils.toMap(entity);
+                        })
 
-                            try {
-                                final String filenameWithPath = rootPath + "/" +
-                                    markedEdge.getSourceNode().getUuid().toString() + "_" +
-                                    markedEdge.getType() + "_" +
-                                    markedEdge.getTargetNode().getUuid().toString() + ".json";
+                        .buffer( 1000 )
+                        .doOnNext(markedEdges -> {
 
-                                logger.debug("adding zip entry: {}", filenameWithPath);
-                                zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath));
 
-                                logger.debug("writing and flushing entity to zip stream: {}", jsonSerializer.toString(entityMap));
-                                zipOutputStream.write(jsonSerializer.toString(entityMap).getBytes());
-                                zipOutputStream.closeEntry();
-                                zipOutputStream.flush();
 
-                            } catch (IOException e) {
-                                logger.warn("Unable to create entry in zip export for edge {}", markedEdge.toString());
-                            }
-                        }
+                                markedEdges.forEach( markedEdge -> {
 
-                    });
+                                    if (!markedEdge.isDeleted()) {
+
+                                        // doing the load to just again make sure bad connections are not exported
+                                        Entity entity = ecm.load(markedEdge.getTargetNode()).toBlocking().lastOrDefault(null);
+
+                                        if (entity != null) {
+
+                                            try {
+
+                                                final Map<String,String> connectionMap = new HashMap<String,String>(1){{
+                                                    put("sourceNodeUUID", markedEdge.getSourceNode().getUuid().toString() );
+                                                    put("relationship", CpNamingUtils.getConnectionNameFromEdgeName(markedEdge.getType()) );
+                                                    put("targetNodeUUID", markedEdge.getTargetNode().getUuid().toString());
+                                                }};
+
+                                                logger.debug("writing and flushing connection to zip stream: {}", jsonSerializer.toString(connectionMap).getBytes());
+                                                zipOutputStream.write(jsonSerializer.toString(connectionMap).getBytes());
+                                                zipOutputStream.write("\n".getBytes());
+
+
+                                            } catch (IOException e) {
+                                                logger.warn("Unable to create entry in zip export for edge {}", markedEdge.toString());
+                                            }
+                                        } else {
+                                            logger.warn("Exported connection has a missing target node, not creating connection in export. Edge: {}", markedEdge);
+                                        }
+                                    }
+
+                                });
+
+
+
+
+                        }).toBlocking().lastOrDefault(null));
+
+                    connectionFileCount.addAndGet(1);
+
+                    zipOutputStream.closeEntry();
+                    zipOutputStream.flush();
+
+
+                } catch (IOException e) {
+                    logger.warn("Unable to create entry in zip export for batch connections");
+                }
 
             })
             .doOnCompleted(() -> {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2d56d813/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
index 920287b..7479a90 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
@@ -31,6 +31,7 @@ import org.apache.usergrid.corepersistence.export.ExportService;
 import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder;
 import org.apache.usergrid.corepersistence.index.ReIndexService;
 import org.apache.usergrid.management.ApplicationInfo;
+import org.apache.usergrid.management.OrganizationInfo;
 import org.apache.usergrid.management.exceptions.DisabledAdminUserException;
 import org.apache.usergrid.management.exceptions.DisabledAppUserException;
 import org.apache.usergrid.management.exceptions.UnactivatedAdminUserException;
@@ -705,6 +706,7 @@ public class ApplicationResource extends CollectionResource {
             throw new UnauthorizedException();
         }
 
+        ApplicationInfo appInfo = management.getApplicationInfo(applicationId);
 
         final ExportRequestBuilder request = new ExportRequestBuilderImpl().withApplicationId( applicationId );
         StreamingOutput stream = new StreamingOutput() {
@@ -715,7 +717,7 @@ public class ApplicationResource extends CollectionResource {
         };
         return Response
             .ok(stream)
-            .header("Content-Disposition", "attachment; filename=\"usergrid_export-"+System.currentTimeMillis()+".zip\"")
+            .header("Content-Disposition", "attachment; filename=\""+appInfo.getName().replace("/","_")+"_"+System.currentTimeMillis()+".zip\"")
             .build();
     }