You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/03/17 07:34:23 UTC
[2/3] incubator-atlas git commit: ATLAS-1503: optimization of export
implementation
ATLAS-1503: optimization of export implementation
Signed-off-by: Madhan Neethiraj <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/7154e12d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/7154e12d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/7154e12d
Branch: refs/heads/master
Commit: 7154e12d13fe4ccba4207bbf6f411353411f06b2
Parents: 89a3872
Author: ashutoshm <am...@hortonworks.com>
Authored: Fri Mar 10 10:50:51 2017 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Fri Mar 17 00:13:26 2017 -0700
----------------------------------------------------------------------
.../atlas/model/impexp/AtlasExportResult.java | 20 +++++++
.../atlas/web/resources/AdminResource.java | 10 ++--
.../atlas/web/resources/ExportService.java | 62 +++++++++++++++-----
.../org/apache/atlas/web/resources/ZipSink.java | 18 +-----
4 files changed, 74 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7154e12d/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java
index e6a967e..8f3075e 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java
@@ -180,6 +180,16 @@ public class AtlasExportResult implements Serializable {
return toString(new StringBuilder()).toString();
}
+ public void clear() {
+ if(this.data != null) {
+ this.data.clear();
+ }
+
+ if(this.metrics != null) {
+ this.metrics.clear();
+ }
+ }
+
@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown=true)
@@ -234,5 +244,15 @@ public class AtlasExportResult implements Serializable {
public String toString() {
return toString(new StringBuilder()).toString();
}
+
+ public void clear() {
+ if(this.typesDef!= null) {
+ this.typesDef.clear();
+ }
+
+ if(this.entityCreationOrder != null) {
+ this.entityCreationOrder.clear();
+ }
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7154e12d/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index 31a4cf9..0dfdeb2 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -51,7 +51,6 @@ import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.context.SecurityContextHolder;
import javax.inject.Singleton;
-import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.*;
@@ -309,7 +308,7 @@ public class AdminResource {
ZipSink exportSink = null;
try {
- exportSink = new ZipSink();
+ exportSink = new ZipSink(httpServletResponse.getOutputStream());
ExportService exportService = new ExportService(this.typeRegistry);
AtlasExportResult result = exportService.run(exportSink, request, Servlets.getUserName(httpServletRequest),
@@ -318,14 +317,13 @@ public class AdminResource {
exportSink.close();
- ServletOutputStream outStream = httpServletResponse.getOutputStream();
- exportSink.writeTo(outStream);
-
+ httpServletResponse.addHeader("Content-Encoding","gzip");
httpServletResponse.setContentType("application/zip");
httpServletResponse.setHeader("Content-Disposition",
"attachment; filename=" + result.getClass().getSimpleName());
+ httpServletResponse.setHeader("Transfer-Encoding", "chunked");
- outStream.flush();
+ httpServletResponse.getOutputStream().flush();
return Response.ok().build();
} catch (IOException excp) {
LOG.error("export() failed", excp);
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7154e12d/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java
index c1891e0..e123ff7 100644
--- a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java
@@ -105,8 +105,9 @@ public class ExportService {
LOG.error("Operation failed: ", ex);
} finally {
atlasGraph.releaseGremlinScriptEngine(context.scriptEngine);
-
LOG.info("<== export(user={}, from={}): status {}", userName, requestingIP, context.result.getOperationStatus());
+ context.clear();
+ result.clear();
}
return context.result;
@@ -124,8 +125,8 @@ public class ExportService {
processEntity(entity, context, TraversalDirection.UNKNOWN);
}
- while (!context.guidsToProcess.isEmpty()) {
- String guid = context.guidsToProcess.remove(0);
+ while (!context.guidsToProcessIsEmpty()) {
+ String guid = context.guidsToProcessRemove(0);
TraversalDirection direction = context.guidDirection.get(guid);
AtlasEntity entity = entityGraphRetriever.toAtlasEntity(guid);
@@ -245,7 +246,7 @@ public class ExportService {
private void getEntityGuidsForConnectedFetch(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException {
if (direction == TraversalDirection.UNKNOWN) {
- getConnectedEntityGuids(entity, context, TraversalDirection.OUTWARD, TraversalDirection.OUTWARD);
+ getConnectedEntityGuids(entity, context, TraversalDirection.OUTWARD, TraversalDirection.INWARD);
} else {
if (isProcessEntity(entity)) {
direction = TraversalDirection.OUTWARD;
@@ -271,7 +272,7 @@ public class ExportService {
String query = getQueryForTraversalDirection(direction);
if (LOG.isDebugEnabled()) {
- LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), query);
+ LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcessSize(), query);
}
context.bindings.clear();
@@ -289,8 +290,8 @@ public class ExportService {
if (currentDirection == null) {
context.guidDirection.put(guid, direction);
- if (!context.guidsToProcess.contains(guid)) {
- context.guidsToProcess.add(guid);
+ if (!context.guidsToProcessContains(guid)) {
+ context.guidsToProcessAdd(guid);
}
} else if (currentDirection == TraversalDirection.OUTWARD && direction == TraversalDirection.INWARD) {
context.guidDirection.put(guid, direction);
@@ -298,14 +299,14 @@ public class ExportService {
// the entity should be reprocessed to get inward entities
context.guidsProcessed.remove(guid);
- if (!context.guidsToProcess.contains(guid)) {
- context.guidsToProcess.add(guid);
+ if (!context.guidsToProcessContains(guid)) {
+ context.guidsToProcessAdd(guid);
}
}
}
if (LOG.isDebugEnabled()) {
- LOG.debug("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), guids.size(), context.guidsToProcess.size());
+ LOG.debug("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), guids.size(), context.guidsToProcessSize());
}
}
}
@@ -323,7 +324,7 @@ public class ExportService {
private void getEntityGuidsForFullFetch(AtlasEntity entity, ExportContext context) {
if (LOG.isDebugEnabled()) {
- LOG.debug("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
+ LOG.debug("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcessSize());
}
String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_FULL);
@@ -339,8 +340,8 @@ public class ExportService {
for (String guid : result) {
if (!context.guidsProcessed.contains(guid)) {
- if (!context.guidsToProcess.contains(guid)) {
- context.guidsToProcess.add(guid);
+ if (!context.guidsToProcessContains(guid)) {
+ context.guidsToProcessAdd(guid);
}
context.guidDirection.put(guid, TraversalDirection.BOTH);
@@ -348,7 +349,7 @@ public class ExportService {
}
if (LOG.isDebugEnabled()) {
- LOG.debug("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size());
+ LOG.debug("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcessSize());
}
}
@@ -434,7 +435,8 @@ public class ExportService {
private class ExportContext {
final Set<String> guidsProcessed = new HashSet<>();
- final List<String> guidsToProcess = new ArrayList<>();
+ private final List<String> guidsToProcessList = new ArrayList<>();
+ private final Set<String> guidsToProcessSet = new HashSet<>();
final Map<String, TraversalDirection> guidDirection = new HashMap<>();
final AtlasExportResult result;
final ZipSink sink;
@@ -477,5 +479,35 @@ public class ExportService {
return matchType;
}
+
+ public void clear() {
+ guidsToProcessList.clear();
+ guidsToProcessSet.clear();
+ guidsProcessed.clear();
+ guidDirection.clear();
+ }
+
+ public boolean guidsToProcessIsEmpty() {
+ return this.guidsToProcessList.isEmpty();
+ }
+
+ public String guidsToProcessRemove(int i) {
+ String s = this.guidsToProcessList.remove(i);
+ guidsToProcessSet.remove(s);
+ return s;
+ }
+
+ public int guidsToProcessSize() {
+ return this.guidsToProcessList.size();
+ }
+
+ public boolean guidsToProcessContains(String guid) {
+ return guidsToProcessSet.contains(guid);
+ }
+
+ public void guidsToProcessAdd(String guid) {
+ this.guidsToProcessList.add(guid);
+ guidsToProcessSet.add(guid);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7154e12d/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java
index 2e4cb01..37d9eb5 100644
--- a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java
@@ -18,18 +18,16 @@
package org.apache.atlas.web.resources;
import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.impexp.AtlasExportResult;
+import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.type.AtlasType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
-import java.util.Map;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
@@ -37,15 +35,9 @@ public class ZipSink {
private static final Logger LOG = LoggerFactory.getLogger(ZipSink.class);
private ZipOutputStream zipOutputStream;
- private ByteArrayOutputStream byteArrayOutputStream;
-
- public ZipSink() {
- init();
- }
- private void init() {
- byteArrayOutputStream = new ByteArrayOutputStream();
- zipOutputStream = new ZipOutputStream(byteArrayOutputStream);
+ public ZipSink(OutputStream outputStream) {
+ zipOutputStream = new ZipOutputStream(outputStream);
}
public void add(AtlasEntity entity) throws AtlasBaseException {
@@ -68,10 +60,6 @@ public class ZipSink {
saveToZip(ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME, jsonData);
}
- public void writeTo(OutputStream stream) throws IOException {
- byteArrayOutputStream.writeTo(stream);
- }
-
public void close() {
try {
if(zipOutputStream != null) {