You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2017/03/31 07:44:48 UTC
[2/4] usergrid git commit: Batch write to the export stream and use a
cleaner filename.
Batch write to the export stream and use a cleaner filename.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2d56d813
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2d56d813
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2d56d813
Branch: refs/heads/master
Commit: 2d56d813a9d99295ef72ab055e8600f62df364e9
Parents: 82e7ec5
Author: Michael Russo <ru...@google.com>
Authored: Mon Mar 27 18:48:49 2017 -0700
Committer: Michael Russo <ru...@google.com>
Committed: Mon Mar 27 18:48:49 2017 -0700
----------------------------------------------------------------------
.../export/ExportServiceImpl.java | 153 ++++++++++++-------
.../rest/applications/ApplicationResource.java | 4 +-
2 files changed, 104 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2d56d813/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java
index ebbcc58..47d6982 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java
@@ -48,7 +48,9 @@ import rx.schedulers.Schedulers;
import java.io.*;
+import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
@@ -98,91 +100,138 @@ public class ExportServiceImpl implements ExportService {
final ZipOutputStream zipOutputStream = new ZipOutputStream(stream);
- //final AtomicInteger count = new AtomicInteger();
final ApplicationScope appScope = exportRequestBuilder.getApplicationScope().get();
final Observable<ApplicationScope> applicationScopes = Observable.just(appScope);
-
- //final String jobId = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(appScope);
- final String rootPath = appScope.getApplication().getUuid().toString();
-
GraphManager gm = managerCache.getGraphManager( appScope );
+ final AtomicInteger entityFileCount = new AtomicInteger();
+ final AtomicInteger connectionFileCount = new AtomicInteger();
+
allEntityIdsObservable.getEdgesToEntities( applicationScopes, Optional.absent(), Optional.absent() )
- .doOnNext( edgeScope -> {
+ .buffer( 1000 )
+ .doOnNext( edgeScopes -> {
+
try {
- // load the entity and convert to a normal map
- Entity entity = ecm.load(edgeScope.getEdge().getTargetNode()).toBlocking().lastOrDefault(null);
- Map entityMap = CpEntityMapUtils.toMap(entity);
+ final String filenameWithPath = "entities/" +
+ "entities."+ entityFileCount.get() + ".json";
+
+ logger.debug("adding zip entry: {}", filenameWithPath);
+ zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath));
+
+ edgeScopes.forEach( edgeScope -> {
+
+
+ try {
+ // load the entity and convert to a normal map
+ Entity entity = ecm.load(edgeScope.getEdge().getTargetNode()).toBlocking().lastOrDefault(null);
+ Map entityMap = CpEntityMapUtils.toMap(entity);
+
+ if (entity != null) {
+
- if (entity != null) {
- final String filenameWithPath = rootPath + "/" +
- edgeScope.getEdge().getSourceNode().getUuid().toString() + "_" +
- edgeScope.getEdge().getType() + "_" +
- edgeScope.getEdge().getTargetNode().getUuid().toString() + ".json";
- logger.debug("adding zip entry: {}", filenameWithPath);
- zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath));
+ logger.debug("writing and flushing entity to zip stream: {}", jsonSerializer.toString(entityMap));
+ zipOutputStream.write(jsonSerializer.toString(entityMap).getBytes());
+ zipOutputStream.write("\n".getBytes());
+
+ } else {
+ logger.warn("{} did not have corresponding entity, not writing", edgeScope.toString());
+ }
+
+ } catch (IOException e) {
+ logger.warn("Unable to create entry in zip export for edge {}", edgeScope);
+ }
- logger.debug("writing and flushing entity to zip stream: {}", jsonSerializer.toString(entityMap));
- zipOutputStream.write(jsonSerializer.toString(entityMap).getBytes());
- zipOutputStream.closeEntry();
- zipOutputStream.flush();
- } else {
- logger.warn("{} did not have corresponding entity, not writing", edgeScope.toString());
- }
+ entityFileCount.addAndGet(1);
+
+ });
+
+ zipOutputStream.closeEntry();
+ zipOutputStream.flush();
} catch (IOException e) {
- logger.warn("Unable to create entry in zip export for edge {}", edgeScope);
+ logger.warn("Unable to create entry in zip export for batch entities");
}
//writeStateMeta( jobId, Status.INPROGRESS, count.addAndGet(1), System.currentTimeMillis() );
})
- .flatMap( edgeScope -> {
+ .doOnNext( edgeScopes -> {
- // find all connection types for the each entity emitted from the app
- return gm.getEdgeTypesFromSource(CpNamingUtils.createConnectionTypeSearch(edgeScope.getEdge().getTargetNode()))
- .flatMap(emittedEdgeType -> {
+ try{
- logger.debug("loading edges of type {} from node {}", emittedEdgeType, edgeScope.getEdge().getTargetNode());
- return gm.loadEdgesFromSource(new SimpleSearchByEdgeType( edgeScope.getEdge().getTargetNode(),
- emittedEdgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent() ));
+ final String filenameWithPath = "connections/" +
+ "connections." + connectionFileCount.get() + ".json";
- }).doOnNext( markedEdge -> {
+ logger.debug("adding zip entry: {}", filenameWithPath);
+ zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath));
- if (!markedEdge.isDeleted()){
+ edgeScopes.forEach(edgeScope -> gm.getEdgeTypesFromSource(CpNamingUtils.createConnectionTypeSearch(edgeScope.getEdge().getTargetNode()))
+ .flatMap(emittedEdgeType -> {
- // todo, probably don't need to load the target node itself since it would be loaded in normal collection walking
- Entity entity = ecm.load(markedEdge.getTargetNode()).toBlocking().lastOrDefault(null);
+ logger.debug("loading edges of type {} from node {}", emittedEdgeType, edgeScope.getEdge().getTargetNode());
+ return gm.loadEdgesFromSource(new SimpleSearchByEdgeType(edgeScope.getEdge().getTargetNode(),
+ emittedEdgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent()));
- Map entityMap = CpEntityMapUtils.toMap(entity);
+ })
- try {
- final String filenameWithPath = rootPath + "/" +
- markedEdge.getSourceNode().getUuid().toString() + "_" +
- markedEdge.getType() + "_" +
- markedEdge.getTargetNode().getUuid().toString() + ".json";
+ .buffer( 1000 )
+ .doOnNext(markedEdges -> {
- logger.debug("adding zip entry: {}", filenameWithPath);
- zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath));
- logger.debug("writing and flushing entity to zip stream: {}", jsonSerializer.toString(entityMap));
- zipOutputStream.write(jsonSerializer.toString(entityMap).getBytes());
- zipOutputStream.closeEntry();
- zipOutputStream.flush();
- } catch (IOException e) {
- logger.warn("Unable to create entry in zip export for edge {}", markedEdge.toString());
- }
- }
+ markedEdges.forEach( markedEdge -> {
- });
+ if (!markedEdge.isDeleted()) {
+
+ // doing the load to just again make sure bad connections are not exported
+ Entity entity = ecm.load(markedEdge.getTargetNode()).toBlocking().lastOrDefault(null);
+
+ if (entity != null) {
+
+ try {
+
+ final Map<String,String> connectionMap = new HashMap<String,String>(1){{
+ put("sourceNodeUUID", markedEdge.getSourceNode().getUuid().toString() );
+ put("relationship", CpNamingUtils.getConnectionNameFromEdgeName(markedEdge.getType()) );
+ put("targetNodeUUID", markedEdge.getTargetNode().getUuid().toString());
+ }};
+
+ logger.debug("writing and flushing connection to zip stream: {}", jsonSerializer.toString(connectionMap).getBytes());
+ zipOutputStream.write(jsonSerializer.toString(connectionMap).getBytes());
+ zipOutputStream.write("\n".getBytes());
+
+
+ } catch (IOException e) {
+ logger.warn("Unable to create entry in zip export for edge {}", markedEdge.toString());
+ }
+ } else {
+ logger.warn("Exported connection has a missing target node, not creating connection in export. Edge: {}", markedEdge);
+ }
+ }
+
+ });
+
+
+
+
+ }).toBlocking().lastOrDefault(null));
+
+ connectionFileCount.addAndGet(1);
+
+ zipOutputStream.closeEntry();
+ zipOutputStream.flush();
+
+
+ } catch (IOException e) {
+ logger.warn("Unable to create entry in zip export for batch connections");
+ }
})
.doOnCompleted(() -> {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2d56d813/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
index 920287b..7479a90 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
@@ -31,6 +31,7 @@ import org.apache.usergrid.corepersistence.export.ExportService;
import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder;
import org.apache.usergrid.corepersistence.index.ReIndexService;
import org.apache.usergrid.management.ApplicationInfo;
+import org.apache.usergrid.management.OrganizationInfo;
import org.apache.usergrid.management.exceptions.DisabledAdminUserException;
import org.apache.usergrid.management.exceptions.DisabledAppUserException;
import org.apache.usergrid.management.exceptions.UnactivatedAdminUserException;
@@ -705,6 +706,7 @@ public class ApplicationResource extends CollectionResource {
throw new UnauthorizedException();
}
+ ApplicationInfo appInfo = management.getApplicationInfo(applicationId);
final ExportRequestBuilder request = new ExportRequestBuilderImpl().withApplicationId( applicationId );
StreamingOutput stream = new StreamingOutput() {
@@ -715,7 +717,7 @@ public class ApplicationResource extends CollectionResource {
};
return Response
.ok(stream)
- .header("Content-Disposition", "attachment; filename=\"usergrid_export-"+System.currentTimeMillis()+".zip\"")
+ .header("Content-Disposition", "attachment; filename=\""+appInfo.getName().replace("/","_")+"_"+System.currentTimeMillis()+".zip\"")
.build();
}