You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/03/17 07:34:23 UTC

[2/3] incubator-atlas git commit: ATLAS-1503: optimization of export implementation

ATLAS-1503: optimization of export implementation

Signed-off-by: Madhan Neethiraj <ma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/7154e12d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/7154e12d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/7154e12d

Branch: refs/heads/master
Commit: 7154e12d13fe4ccba4207bbf6f411353411f06b2
Parents: 89a3872
Author: ashutoshm <am...@hortonworks.com>
Authored: Fri Mar 10 10:50:51 2017 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Fri Mar 17 00:13:26 2017 -0700

----------------------------------------------------------------------
 .../atlas/model/impexp/AtlasExportResult.java   | 20 +++++++
 .../atlas/web/resources/AdminResource.java      | 10 ++--
 .../atlas/web/resources/ExportService.java      | 62 +++++++++++++++-----
 .../org/apache/atlas/web/resources/ZipSink.java | 18 +-----
 4 files changed, 74 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7154e12d/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java
index e6a967e..8f3075e 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java
@@ -180,6 +180,16 @@ public class AtlasExportResult implements Serializable {
         return toString(new StringBuilder()).toString();
     }
 
+    public void clear() {
+        if(this.data != null) {
+            this.data.clear();
+        }
+
+        if(this.metrics != null) {
+            this.metrics.clear();
+        }
+    }
+
     @JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
     @JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
     @JsonIgnoreProperties(ignoreUnknown=true)
@@ -234,5 +244,15 @@ public class AtlasExportResult implements Serializable {
         public String toString() {
             return toString(new StringBuilder()).toString();
         }
+
+        public void clear() {
+            if(this.typesDef!= null) {
+                this.typesDef.clear();
+            }
+
+            if(this.entityCreationOrder != null) {
+                this.entityCreationOrder.clear();
+            }
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7154e12d/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index 31a4cf9..0dfdeb2 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -51,7 +51,6 @@ import org.springframework.security.core.GrantedAuthority;
 import org.springframework.security.core.context.SecurityContextHolder;
 
 import javax.inject.Singleton;
-import javax.servlet.ServletOutputStream;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.*;
@@ -309,7 +308,7 @@ public class AdminResource {
 
         ZipSink exportSink = null;
         try {
-            exportSink = new ZipSink();
+            exportSink = new ZipSink(httpServletResponse.getOutputStream());
             ExportService exportService = new ExportService(this.typeRegistry);
 
             AtlasExportResult result = exportService.run(exportSink, request, Servlets.getUserName(httpServletRequest),
@@ -318,14 +317,13 @@ public class AdminResource {
 
             exportSink.close();
 
-            ServletOutputStream outStream = httpServletResponse.getOutputStream();
-            exportSink.writeTo(outStream);
-
+            httpServletResponse.addHeader("Content-Encoding","gzip");
             httpServletResponse.setContentType("application/zip");
             httpServletResponse.setHeader("Content-Disposition",
                                           "attachment; filename=" + result.getClass().getSimpleName());
+            httpServletResponse.setHeader("Transfer-Encoding", "chunked");
 
-            outStream.flush();
+            httpServletResponse.getOutputStream().flush();
             return Response.ok().build();
         } catch (IOException excp) {
             LOG.error("export() failed", excp);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7154e12d/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java
index c1891e0..e123ff7 100644
--- a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java
@@ -105,8 +105,9 @@ public class ExportService {
             LOG.error("Operation failed: ", ex);
         } finally {
             atlasGraph.releaseGremlinScriptEngine(context.scriptEngine);
-
             LOG.info("<== export(user={}, from={}): status {}", userName, requestingIP, context.result.getOperationStatus());
+            context.clear();
+            result.clear();
         }
 
         return context.result;
@@ -124,8 +125,8 @@ public class ExportService {
                 processEntity(entity, context, TraversalDirection.UNKNOWN);
             }
 
-            while (!context.guidsToProcess.isEmpty()) {
-                String             guid      = context.guidsToProcess.remove(0);
+            while (!context.guidsToProcessIsEmpty()) {
+                String             guid      = context.guidsToProcessRemove(0);
                 TraversalDirection direction = context.guidDirection.get(guid);
                 AtlasEntity        entity    = entityGraphRetriever.toAtlasEntity(guid);
 
@@ -245,7 +246,7 @@ public class ExportService {
 
     private void getEntityGuidsForConnectedFetch(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException {
         if (direction == TraversalDirection.UNKNOWN) {
-            getConnectedEntityGuids(entity, context, TraversalDirection.OUTWARD, TraversalDirection.OUTWARD);
+            getConnectedEntityGuids(entity, context, TraversalDirection.OUTWARD, TraversalDirection.INWARD);
         } else {
             if (isProcessEntity(entity)) {
                 direction = TraversalDirection.OUTWARD;
@@ -271,7 +272,7 @@ public class ExportService {
             String query = getQueryForTraversalDirection(direction);
 
             if (LOG.isDebugEnabled()) {
-                LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), query);
+                LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcessSize(), query);
             }
 
             context.bindings.clear();
@@ -289,8 +290,8 @@ public class ExportService {
                 if (currentDirection == null) {
                     context.guidDirection.put(guid, direction);
 
-                    if (!context.guidsToProcess.contains(guid)) {
-                        context.guidsToProcess.add(guid);
+                    if (!context.guidsToProcessContains(guid)) {
+                        context.guidsToProcessAdd(guid);
                     }
                 } else if (currentDirection == TraversalDirection.OUTWARD && direction == TraversalDirection.INWARD) {
                     context.guidDirection.put(guid, direction);
@@ -298,14 +299,14 @@ public class ExportService {
                     // the entity should be reprocessed to get inward entities
                     context.guidsProcessed.remove(guid);
 
-                    if (!context.guidsToProcess.contains(guid)) {
-                        context.guidsToProcess.add(guid);
+                    if (!context.guidsToProcessContains(guid)) {
+                        context.guidsToProcessAdd(guid);
                     }
                 }
             }
 
             if (LOG.isDebugEnabled()) {
-                LOG.debug("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), guids.size(), context.guidsToProcess.size());
+                LOG.debug("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), guids.size(), context.guidsToProcessSize());
             }
         }
     }
@@ -323,7 +324,7 @@ public class ExportService {
 
     private void getEntityGuidsForFullFetch(AtlasEntity entity, ExportContext context) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
+            LOG.debug("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcessSize());
         }
 
         String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_FULL);
@@ -339,8 +340,8 @@ public class ExportService {
 
         for (String guid : result) {
             if (!context.guidsProcessed.contains(guid)) {
-                if (!context.guidsToProcess.contains(guid)) {
-                    context.guidsToProcess.add(guid);
+                if (!context.guidsToProcessContains(guid)) {
+                    context.guidsToProcessAdd(guid);
                 }
 
                 context.guidDirection.put(guid, TraversalDirection.BOTH);
@@ -348,7 +349,7 @@ public class ExportService {
         }
 
         if (LOG.isDebugEnabled()) {
-            LOG.debug("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size());
+            LOG.debug("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcessSize());
         }
     }
 
@@ -434,7 +435,8 @@ public class ExportService {
 
     private class ExportContext {
         final Set<String>                     guidsProcessed = new HashSet<>();
-        final List<String>                    guidsToProcess = new ArrayList<>();
+        private final List<String>            guidsToProcessList = new ArrayList<>();
+        private final Set<String>             guidsToProcessSet = new HashSet<>();
         final Map<String, TraversalDirection> guidDirection  = new HashMap<>();
         final AtlasExportResult               result;
         final ZipSink                         sink;
@@ -477,5 +479,35 @@ public class ExportService {
 
             return matchType;
         }
+
+        public void clear() {
+            guidsToProcessList.clear();
+            guidsToProcessSet.clear();
+            guidsProcessed.clear();
+            guidDirection.clear();
+        }
+
+        public boolean guidsToProcessIsEmpty() {
+            return this.guidsToProcessList.isEmpty();
+        }
+
+        public String guidsToProcessRemove(int i) {
+            String s = this.guidsToProcessList.remove(i);
+            guidsToProcessSet.remove(s);
+            return s;
+        }
+
+        public int guidsToProcessSize() {
+            return this.guidsToProcessList.size();
+        }
+
+        public boolean guidsToProcessContains(String guid) {
+            return guidsToProcessSet.contains(guid);
+        }
+
+        public void guidsToProcessAdd(String guid) {
+            this.guidsToProcessList.add(guid);
+            guidsToProcessSet.add(guid);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7154e12d/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java
index 2e4cb01..37d9eb5 100644
--- a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java
@@ -18,18 +18,16 @@
 package org.apache.atlas.web.resources;
 
 import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.impexp.AtlasExportResult;
+import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
 import org.apache.atlas.type.AtlasType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.List;
-import java.util.Map;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
 
@@ -37,15 +35,9 @@ public class ZipSink {
     private static final Logger LOG = LoggerFactory.getLogger(ZipSink.class);
 
     private ZipOutputStream zipOutputStream;
-    private ByteArrayOutputStream byteArrayOutputStream;
-
-    public ZipSink() {
-        init();
-    }
 
-    private void init() {
-        byteArrayOutputStream = new ByteArrayOutputStream();
-        zipOutputStream = new ZipOutputStream(byteArrayOutputStream);
+    public ZipSink(OutputStream outputStream) {
+        zipOutputStream = new ZipOutputStream(outputStream);
     }
 
     public void add(AtlasEntity entity) throws AtlasBaseException {
@@ -68,10 +60,6 @@ public class ZipSink {
         saveToZip(ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME, jsonData);
     }
 
-    public void writeTo(OutputStream stream) throws IOException {
-        byteArrayOutputStream.writeTo(stream);
-    }
-
     public void close() {
         try {
             if(zipOutputStream != null) {