You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/02/17 23:21:24 UTC
incubator-atlas git commit: ATLAS-1503: optimization of import API
implementation
Repository: incubator-atlas
Updated Branches:
refs/heads/master 852a71183 -> 1d85e95fa
ATLAS-1503: optimization of import API implementation
Signed-off-by: Madhan Neethiraj <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/1d85e95f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/1d85e95f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/1d85e95f
Branch: refs/heads/master
Commit: 1d85e95fa06a98d70417c9671cd4d0a0b33a9ed1
Parents: 852a711
Author: ashutoshm <am...@hortonworks.com>
Authored: Fri Feb 17 10:47:22 2017 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Fri Feb 17 14:38:53 2017 -0800
----------------------------------------------------------------------
.../java/org/apache/atlas/AtlasErrorCode.java | 1 +
.../atlas/model/impexp/AtlasImportResult.java | 25 +++--
.../atlas/model/instance/EntityMutations.java | 2 +-
.../test/java/org/apache/atlas/TestUtilsV2.java | 1 +
.../store/graph/AtlasEntityStore.java | 9 ++
.../store/graph/v1/AtlasEntityStoreV1.java | 77 +++++++++++--
.../store/graph/v1/AtlasEntityStream.java | 16 ++-
.../graph/v1/AtlasEntityStreamForImport.java | 30 ++++++
.../store/graph/v1/EntityGraphMapper.java | 26 ++---
.../store/graph/v1/AtlasEntityStoreV1Test.java | 65 ++++++-----
.../atlas/web/resources/AdminResource.java | 43 ++++++--
.../atlas/web/resources/ExportService.java | 11 +-
.../atlas/web/resources/ImportService.java | 31 +++---
.../apache/atlas/web/resources/ZipSource.java | 107 ++++++++-----------
14 files changed, 290 insertions(+), 154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1d85e95f/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
index 584bf25..ce5fea3 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
@@ -88,6 +88,7 @@ public enum AtlasErrorCode {
INDEX_CREATION_FAILED(500, "ATLAS5002E", "Index creation failed for {0}"),
INDEX_ROLLBACK_FAILED(500, "ATLAS5003E", "Index rollback failed for {0}"),
FAILED_TO_OBTAIN_TYPE_UPDATE_LOCK(500, "ATLAS5004E", "Failed to get the lock; another type update might be in progress. Please try again"),
+ FAILED_TO_OBTAIN_IMPORT_EXPORT_LOCK(500, "ATLAS5005E", "Another import or export is in progress. Please try again"),
NOTIFICATION_FAILED(500, "ATLAS5005E", "Failed to notify for change {0}");
private String errorCode;
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1d85e95f/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java
index a5eeef1..bfb7637 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java
@@ -26,7 +26,9 @@ import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE;
@@ -50,6 +52,7 @@ public class AtlasImportResult {
private String hostName;
private long timeStamp;
private Map<String, Integer> metrics;
+ private List<String> processedEntities;
private OperationStatus operationStatus;
public AtlasImportResult() {
@@ -58,13 +61,14 @@ public class AtlasImportResult {
public AtlasImportResult(AtlasImportRequest request, String userName,
String clientIpAddress, String hostName, long timeStamp) {
- this.request = request;
- this.userName = userName;
- this.clientIpAddress = clientIpAddress;
- this.hostName = hostName;
- this.timeStamp = timeStamp;
- this.metrics = new HashMap<>();
- this.operationStatus = OperationStatus.FAIL;
+ this.request = request;
+ this.userName = userName;
+ this.clientIpAddress = clientIpAddress;
+ this.hostName = hostName;
+ this.timeStamp = timeStamp;
+ this.metrics = new HashMap<>();
+ this.operationStatus = OperationStatus.FAIL;
+ this.processedEntities = new ArrayList<>();
}
public AtlasImportRequest getRequest() {
@@ -133,6 +137,10 @@ public class AtlasImportResult {
metrics.put(key, currentValue + incrementBy);
}
+ public void setProcessedEntities(List<String> processedEntities) { this.processedEntities = processedEntities; }
+
+ public List<String> getProcessedEntities() { return this.processedEntities; }
+
public StringBuilder toString(StringBuilder sb) {
if (sb == null) {
sb = new StringBuilder();
@@ -149,6 +157,9 @@ public class AtlasImportResult {
sb.append("}");
sb.append(", operationStatus='").append(operationStatus).append("'");
+ sb.append(", processedEntities=[");
+ AtlasBaseTypeDef.dumpObjects(processedEntities, sb);
+ sb.append("]");
sb.append("}");
return sb;
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1d85e95f/intg/src/main/java/org/apache/atlas/model/instance/EntityMutations.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/instance/EntityMutations.java b/intg/src/main/java/org/apache/atlas/model/instance/EntityMutations.java
index 74e3c57..b509420 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/EntityMutations.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/EntityMutations.java
@@ -46,7 +46,7 @@ public class EntityMutations implements Serializable {
CREATE,
UPDATE,
PARTIAL_UPDATE,
- DELETE,
+ DELETE
}
public static final class EntityMutation implements Serializable {
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1d85e95f/intg/src/test/java/org/apache/atlas/TestUtilsV2.java
----------------------------------------------------------------------
diff --git a/intg/src/test/java/org/apache/atlas/TestUtilsV2.java b/intg/src/test/java/org/apache/atlas/TestUtilsV2.java
index ea56dd6..6d85672 100755
--- a/intg/src/test/java/org/apache/atlas/TestUtilsV2.java
+++ b/intg/src/test/java/org/apache/atlas/TestUtilsV2.java
@@ -562,6 +562,7 @@ public final class TestUtilsV2 {
AtlasTypeUtil.createUniqueRequiredAttrDef(NAME, "string"),
AtlasTypeUtil.createOptionalAttrDef("isReplicated", "boolean"),
AtlasTypeUtil.createOptionalAttrDef("created", "string"),
+ AtlasTypeUtil.createOptionalAttrDef("parameters", "map<string,string>"),
AtlasTypeUtil.createRequiredAttrDef("description", "string"));
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1d85e95f/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
index 1cd4375..3a037cc 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
@@ -19,6 +19,7 @@ package org.apache.atlas.repository.store.graph;
import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
@@ -69,6 +70,14 @@ public interface AtlasEntityStore {
EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate) throws AtlasBaseException;
/**
+ * Create or update entities in the stream using repeated commits of connected entities
+ * @param entityStream AtlasEntityStream
+ * @return EntityMutationResponse Entity mutations operations with the corresponding set of entities on which these operations were performed
+ * @throws AtlasBaseException
+ */
+ EntityMutationResponse bulkImport(EntityStream entityStream, AtlasImportResult importResult) throws AtlasBaseException;
+
+ /**
* Update a single entity
* @param entityType type of the entity
* @param uniqAttributes Attributes that uniquely identify the entity
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1d85e95f/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index 4312287..4684bfe 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -24,13 +24,10 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.GraphTransaction;
import org.apache.atlas.RequestContextV1;
import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.instance.AtlasClassification;
-import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.instance.*;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
-import org.apache.atlas.model.instance.AtlasObjectId;
-import org.apache.atlas.model.instance.EntityMutationResponse;
-import org.apache.atlas.model.instance.EntityMutations;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
@@ -43,10 +40,9 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+
+import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.*;
@Singleton
@@ -130,6 +126,65 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
}
@Override
+ public EntityMutationResponse bulkImport(EntityStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> bulkImport()");
+ }
+
+ if (entityStream == null || !entityStream.hasNext()) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
+ }
+
+ EntityMutationResponse ret = new EntityMutationResponse();
+ ret.setGuidAssignments(new HashMap<String, String>());
+
+ Set<String> processedGuids = new HashSet<>();
+ int progressReportedAtCount = 0;
+
+ while (entityStream.hasNext()) {
+ AtlasEntity entity = entityStream.next();
+
+ if(processedGuids.contains(entity.getGuid())) {
+ continue;
+ }
+
+ AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entity, entityStream);
+
+ EntityMutationResponse resp = createOrUpdate(oneEntityStream, false);
+
+ updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
+ updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
+ updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
+
+ if ((processedGuids.size() - progressReportedAtCount) > 10) {
+ progressReportedAtCount = processedGuids.size();
+
+ LOG.info("bulkImport(): in progress.. number of entities imported: {}", progressReportedAtCount);
+ }
+
+ if (resp.getGuidAssignments() != null) {
+ ret.getGuidAssignments().putAll(resp.getGuidAssignments());
+ }
+ }
+
+ importResult.getProcessedEntities().addAll(processedGuids);
+ LOG.info("bulkImport(): done. Number of entities imported: {}", processedGuids.size());
+
+ return ret;
+ }
+
+ private void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) {
+ if (list == null) {
+ return;
+ }
+
+ for (AtlasEntityHeader h : list) {
+ processedGuids.add(h.getGuid());
+ importResult.incrementMeticsCounter(String.format(prefix, h.getTypeName()));
+ }
+ }
+
+ @Override
@GraphTransaction
public EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
@@ -323,11 +378,11 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
deleteHandler.deleteEntities(deletionCandidates);
RequestContextV1 req = RequestContextV1.get();
for (AtlasObjectId id : req.getDeletedEntityIds()) {
- response.addEntity(EntityMutations.EntityOperation.DELETE, EntityGraphMapper.constructHeader(id));
+ response.addEntity(DELETE, EntityGraphMapper.constructHeader(id));
}
for (AtlasObjectId id : req.getUpdatedEntityIds()) {
- response.addEntity(EntityMutations.EntityOperation.UPDATE, EntityGraphMapper.constructHeader(id));
+ response.addEntity(UPDATE, EntityGraphMapper.constructHeader(id));
}
return response;
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1d85e95f/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java
index 010b626..5d9a7d4 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java
@@ -24,11 +24,10 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import java.util.Iterator;
public class AtlasEntityStream implements EntityStream {
- private AtlasEntitiesWithExtInfo entitiesWithExtInfo = new AtlasEntitiesWithExtInfo();
- private Iterator<AtlasEntity> iterator;
+ private final AtlasEntitiesWithExtInfo entitiesWithExtInfo;
+ private final EntityStream entityStream;
+ private Iterator<AtlasEntity> iterator;
- public AtlasEntityStream() {
- }
public AtlasEntityStream(AtlasEntity entity) {
this(new AtlasEntitiesWithExtInfo(entity));
@@ -41,6 +40,13 @@ public class AtlasEntityStream implements EntityStream {
public AtlasEntityStream(AtlasEntitiesWithExtInfo entitiesWithExtInfo) {
this.entitiesWithExtInfo = entitiesWithExtInfo;
this.iterator = this.entitiesWithExtInfo.getEntities().iterator();
+ this.entityStream = null;
+ }
+
+ public AtlasEntityStream(AtlasEntity entity, EntityStream entityStream) {
+ this.entitiesWithExtInfo = new AtlasEntitiesWithExtInfo(entity);
+ this.iterator = this.entitiesWithExtInfo.getEntities().iterator();
+ this.entityStream = entityStream;
}
@Override
@@ -60,7 +66,7 @@ public class AtlasEntityStream implements EntityStream {
@Override
public AtlasEntity getByGuid(String guid) {
- return entitiesWithExtInfo.getEntity(guid);
+ return entityStream != null ? entityStream.getByGuid(guid) : entitiesWithExtInfo.getEntity(guid);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1d85e95f/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java
new file mode 100644
index 0000000..c0b4d8d
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v1;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+
+public class AtlasEntityStreamForImport extends AtlasEntityStream implements EntityImportStream {
+ public AtlasEntityStreamForImport(AtlasEntity entity) {
+ super(entity);
+ }
+
+ public AtlasEntityStreamForImport(AtlasEntity entity, EntityStream entityStream) {
+ super(entity, entityStream);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1d85e95f/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
index 2e71ab8..8c96c7b 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
@@ -739,28 +739,18 @@ public class EntityGraphMapper {
private AtlasEntityHeader constructHeader(AtlasEntity entity, final AtlasEntityType type, AtlasVertex vertex) {
- //TODO - enhance to return only selective attributes
- AtlasEntityHeader header = new AtlasEntityHeader(entity.getTypeName(), AtlasGraphUtilsV1.getIdFromVertex(vertex), entity.getAttributes());
- final Map<String, AtlasStructType.AtlasAttribute> allAttributes = type.getAllAttributes();
- for (String attribute : allAttributes.keySet()) {
- AtlasType attributeType = allAttributes.get(attribute).getAttributeType();
- AtlasAttributeDef attributeDef = allAttributes.get(attribute).getAttributeDef();
- if ( header.getAttribute(attribute) == null && (TypeCategory.PRIMITIVE == attributeType.getTypeCategory())) {
-
- if ( attributeDef.getIsOptional()) {
- header.setAttribute(attribute, attributeType.createOptionalDefaultValue());
- } else {
- header.setAttribute(attribute, attributeType.createDefaultValue());
- }
- }
+ AtlasEntityHeader header = new AtlasEntityHeader(entity.getTypeName());
+
+ header.setGuid(AtlasGraphUtilsV1.getIdFromVertex(vertex));
+
+ for (AtlasAttribute attribute : type.getUniqAttributes().values()) {
+ header.setAttribute(attribute.getName(), entity.getAttribute(attribute.getName()));
}
+
return header;
}
public static AtlasEntityHeader constructHeader(AtlasObjectId id) {
- AtlasEntityHeader entity = new AtlasEntityHeader(id.getTypeName());
- entity.setGuid(id.getGuid());
-
- return entity;
+ return new AtlasEntityHeader(id.getTypeName(), id.getGuid(), id.getUniqueAttributes());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1d85e95f/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1Test.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1Test.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1Test.java
index 1d10461..dd82cb2 100644
--- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1Test.java
+++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1Test.java
@@ -74,6 +74,7 @@ import static org.apache.atlas.TestUtils.NAME;
import static org.apache.atlas.TestUtils.randomString;
import static org.apache.atlas.TestUtilsV2.TABLE_TYPE;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
@Guice(modules = RepositoryMetadataModule.class)
public class AtlasEntityStoreV1Test {
@@ -199,8 +200,8 @@ public class AtlasEntityStoreV1Test {
init();
EntityMutationResponse response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false);
- AtlasEntityHeader updatedTable = response.getFirstUpdatedEntityByTypeName(tableEntity.getTypeName());
- validateEntity(entitiesInfo, getEntityFromStore(updatedTable));
+ AtlasEntityHeader updatedTableHeader = response.getFirstUpdatedEntityByTypeName(tableEntity.getTypeName());
+ validateEntity(entitiesInfo, getEntityFromStore(updatedTableHeader));
//Complete update. Add array elements - col3,col4
AtlasEntity col3 = TestUtilsV2.createColumnEntity(tableEntity);
@@ -219,8 +220,8 @@ public class AtlasEntityStoreV1Test {
init();
response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false);
- updatedTable = response.getFirstUpdatedEntityByTypeName(tableEntity.getTypeName());
- validateEntity(entitiesInfo, getEntityFromStore(updatedTable));
+ updatedTableHeader = response.getFirstUpdatedEntityByTypeName(tableEntity.getTypeName());
+ validateEntity(entitiesInfo, getEntityFromStore(updatedTableHeader));
//Swap elements
columns.clear();
@@ -231,8 +232,10 @@ public class AtlasEntityStoreV1Test {
init();
response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false);
- updatedTable = response.getFirstUpdatedEntityByTypeName(tableEntity.getTypeName());
- Assert.assertEquals(((List<AtlasObjectId>) updatedTable.getAttribute(COLUMNS_ATTR_NAME)).size(), 2);
+ updatedTableHeader = response.getFirstUpdatedEntityByTypeName(tableEntity.getTypeName());
+ AtlasEntity updatedEntity = getEntityFromStore(updatedTableHeader);
+ // deleted columns are also included in "columns" attribute
+ Assert.assertTrue(((List<AtlasObjectId>) updatedEntity.getAttribute(COLUMNS_ATTR_NAME)).size() >= 2);
assertEquals(response.getEntitiesByOperation(EntityMutations.EntityOperation.DELETE).size(), 2); // col1, col2 are deleted
@@ -242,8 +245,8 @@ public class AtlasEntityStoreV1Test {
init();
response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false);
- updatedTable = response.getFirstUpdatedEntityByTypeName(tableEntity.getTypeName());
- validateEntity(entitiesInfo, getEntityFromStore(updatedTable));
+ updatedTableHeader = response.getFirstUpdatedEntityByTypeName(tableEntity.getTypeName());
+ validateEntity(entitiesInfo, getEntityFromStore(updatedTableHeader));
assertEquals(response.getEntitiesByOperation(EntityMutations.EntityOperation.DELETE).size(), 2);
}
@@ -261,9 +264,10 @@ public class AtlasEntityStoreV1Test {
EntityMutationResponse response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false);
AtlasEntityHeader tableDefinition1 = response.getFirstUpdatedEntityByTypeName(TABLE_TYPE);
- validateEntity(entitiesInfo, getEntityFromStore(tableDefinition1));
+ AtlasEntity updatedTableDef1 = getEntityFromStore(tableDefinition1);
+ validateEntity(entitiesInfo, updatedTableDef1);
- Assert.assertTrue(partsMap.get("part0").equals(((Map<String, AtlasStruct>) tableDefinition1.getAttribute("partitionsMap")).get("part0")));
+ Assert.assertTrue(partsMap.get("part0").equals(((Map<String, AtlasStruct>) updatedTableDef1.getAttribute("partitionsMap")).get("part0")));
//update map - add a map key
partsMap.put("part1", new AtlasStruct(TestUtils.PARTITION_STRUCT_TYPE, TestUtilsV2.NAME, "test1"));
@@ -273,10 +277,11 @@ public class AtlasEntityStoreV1Test {
response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false);
AtlasEntityHeader tableDefinition2 = response.getFirstUpdatedEntityByTypeName(TABLE_TYPE);
- validateEntity(entitiesInfo, getEntityFromStore(tableDefinition2));
+ AtlasEntity updatedTableDef2 = getEntityFromStore(tableDefinition2);
+ validateEntity(entitiesInfo, updatedTableDef2);
- assertEquals(((Map<String, AtlasStruct>) tableDefinition2.getAttribute("partitionsMap")).size(), 2);
- Assert.assertTrue(partsMap.get("part1").equals(((Map<String, AtlasStruct>) tableDefinition2.getAttribute("partitionsMap")).get("part1")));
+ assertEquals(((Map<String, AtlasStruct>) updatedTableDef2.getAttribute("partitionsMap")).size(), 2);
+ Assert.assertTrue(partsMap.get("part1").equals(((Map<String, AtlasStruct>) updatedTableDef2.getAttribute("partitionsMap")).get("part1")));
//update map - remove a key and add another key
partsMap.remove("part0");
@@ -287,11 +292,12 @@ public class AtlasEntityStoreV1Test {
response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false);
AtlasEntityHeader tableDefinition3 = response.getFirstUpdatedEntityByTypeName(TABLE_TYPE);
- validateEntity(entitiesInfo, getEntityFromStore(tableDefinition3));
+ AtlasEntity updatedTableDef3 = getEntityFromStore(tableDefinition3);
+ validateEntity(entitiesInfo, updatedTableDef3);
- assertEquals(((Map<String, AtlasStruct>) tableDefinition3.getAttribute("partitionsMap")).size(), 2);
- Assert.assertNull(((Map<String, AtlasStruct>) tableDefinition3.getAttribute("partitionsMap")).get("part0"));
- Assert.assertTrue(partsMap.get("part2").equals(((Map<String, AtlasStruct>) tableDefinition3.getAttribute("partitionsMap")).get("part2")));
+ assertEquals(((Map<String, AtlasStruct>) updatedTableDef3.getAttribute("partitionsMap")).size(), 2);
+ Assert.assertNull(((Map<String, AtlasStruct>) updatedTableDef3.getAttribute("partitionsMap")).get("part0"));
+ Assert.assertTrue(partsMap.get("part2").equals(((Map<String, AtlasStruct>) updatedTableDef3.getAttribute("partitionsMap")).get("part2")));
//update struct value for existing map key
AtlasStruct partition2 = partsMap.get("part2");
@@ -301,11 +307,12 @@ public class AtlasEntityStoreV1Test {
response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false);
AtlasEntityHeader tableDefinition4 = response.getFirstUpdatedEntityByTypeName(TABLE_TYPE);
- validateEntity(entitiesInfo, getEntityFromStore(tableDefinition4));
+ AtlasEntity updatedTableDef4 = getEntityFromStore(tableDefinition4);
+ validateEntity(entitiesInfo, updatedTableDef4);
- assertEquals(((Map<String, AtlasStruct>) tableDefinition4.getAttribute("partitionsMap")).size(), 2);
- Assert.assertNull(((Map<String, AtlasStruct>) tableDefinition4.getAttribute("partitionsMap")).get("part0"));
- Assert.assertTrue(partsMap.get("part2").equals(((Map<String, AtlasStruct>) tableDefinition4.getAttribute("partitionsMap")).get("part2")));
+ assertEquals(((Map<String, AtlasStruct>) updatedTableDef4.getAttribute("partitionsMap")).size(), 2);
+ Assert.assertNull(((Map<String, AtlasStruct>) updatedTableDef4.getAttribute("partitionsMap")).get("part0"));
+ Assert.assertTrue(partsMap.get("part2").equals(((Map<String, AtlasStruct>) updatedTableDef4.getAttribute("partitionsMap")).get("part2")));
//Test map pointing to a class
@@ -523,8 +530,9 @@ public class AtlasEntityStoreV1Test {
response = entityStore.createOrUpdate(new InMemoryMapEntityStream(tableCloneMap), false);
final AtlasEntityHeader tableDefinition = response.getFirstUpdatedEntityByTypeName(TABLE_TYPE);
- Assert.assertNotNull(tableDefinition.getAttribute("database"));
- Assert.assertEquals(((AtlasObjectId) tableDefinition.getAttribute("database")).getGuid(), dbCreated.getGuid());
+ AtlasEntity updatedTableDefinition = getEntityFromStore(tableDefinition);
+ Assert.assertNotNull(updatedTableDefinition.getAttribute("database"));
+ Assert.assertEquals(((AtlasObjectId) updatedTableDefinition.getAttribute("database")).getGuid(), dbCreated.getGuid());
}
@Test
@@ -534,7 +542,7 @@ public class AtlasEntityStoreV1Test {
init();
EntityMutationResponse response = entityStore.createOrUpdate(new AtlasEntityStream(dbEntity), false);
- AtlasEntityHeader firstEntityCreated = response.getFirstCreatedEntityByTypeName(TestUtilsV2.DATABASE_TYPE);
+ AtlasEntity firstEntityCreated = getEntityFromStore(response.getFirstCreatedEntityByTypeName(TestUtilsV2.DATABASE_TYPE));
//The optional boolean attribute should have a non-null value
final String isReplicatedAttr = "isReplicated";
@@ -552,7 +560,7 @@ public class AtlasEntityStoreV1Test {
init();
response = entityStore.createOrUpdate(new AtlasEntityStream(dbEntity), false);
- AtlasEntityHeader firstEntityUpdated = response.getFirstUpdatedEntityByTypeName(TestUtilsV2.DATABASE_TYPE);
+ AtlasEntity firstEntityUpdated = getEntityFromStore(response.getFirstUpdatedEntityByTypeName(TestUtilsV2.DATABASE_TYPE));
Assert.assertNotNull(firstEntityUpdated);
Assert.assertNotNull(firstEntityUpdated.getAttribute(isReplicatedAttr));
@@ -736,8 +744,9 @@ public class AtlasEntityStoreV1Test {
tblHeader = response.getFirstEntityPartialUpdated();
AtlasEntity updatedTblEntity = getEntityFromStore(tblHeader);
- columns = (List<AtlasObjectId>) tblHeader.getAttribute(TestUtilsV2.COLUMNS_ATTR_NAME);
- assertEquals(columns.size(), 2);
+ columns = (List<AtlasObjectId>) updatedTblEntity.getAttribute(TestUtilsV2.COLUMNS_ATTR_NAME);
+ // deleted columns are included in the attribute; hence use >=
+ assertTrue(columns.size() >= 2);
}
@Test
@@ -867,7 +876,7 @@ public class AtlasEntityStoreV1Test {
if (MapUtils.isNotEmpty(expectedMap)) {
Assert.assertTrue(MapUtils.isNotEmpty(actualMap));
- //actual map could have deleted entities. Hence size may not match.
+ // deleted entries are included in the attribute; hence use >=
Assert.assertTrue(actualMap.size() >= expectedMap.size());
for (Object key : expectedMap.keySet()) {
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1d85e95f/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index c8c0099..8ff3396 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -20,7 +20,7 @@ package org.apache.atlas.web.resources;
import com.google.inject.Inject;
import org.apache.atlas.AtlasClient;
-import org.apache.atlas.AtlasException;
+import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasExportResult;
@@ -59,6 +59,7 @@ import javax.ws.rs.core.Response;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.*;
+import java.util.concurrent.locks.ReentrantLock;
import static org.apache.atlas.repository.converters.AtlasInstanceConverter.toAtlasBaseException;
@@ -76,6 +77,8 @@ public class AdminResource {
@Context
private HttpServletResponse httpServletResponse;
+ private final ReentrantLock importExportOperationLock;
+
private static final String isCSRF_ENABLED = "atlas.rest-csrf.enabled";
private static final String BROWSER_USER_AGENT_PARAM = "atlas.rest-csrf.browser-useragents-regex";
private static final String CUSTOM_METHODS_TO_IGNORE_PARAM = "atlas.rest-csrf.methods-to-ignore";
@@ -97,11 +100,12 @@ public class AdminResource {
public AdminResource(ServiceState serviceState, MetricsService metricsService,
AtlasTypeRegistry typeRegistry, AtlasTypeDefStore typeDefStore,
AtlasEntityStore entityStore) {
- this.serviceState = serviceState;
- this.metricsService = metricsService;
- this.typeRegistry = typeRegistry;
- this.typesDefStore = typeDefStore;
- this.entityStore = entityStore;
+ this.serviceState = serviceState;
+ this.metricsService = metricsService;
+ this.typeRegistry = typeRegistry;
+ this.typesDefStore = typeDefStore;
+ this.entityStore = entityStore;
+ this.importExportOperationLock = new ReentrantLock();
}
/**
@@ -275,6 +279,10 @@ public class AdminResource {
return metrics;
}
+ private void releaseExportImportLock() {
+ importExportOperationLock.unlock();
+ }
+
@POST
@Path("/export")
@Consumes(Servlets.JSON_MEDIA_TYPE)
@@ -283,6 +291,8 @@ public class AdminResource {
LOG.debug("==> AdminResource.export()");
}
+ acquireExportImportLock("export");
+
ZipSink exportSink = null;
try {
exportSink = new ZipSink();
@@ -308,6 +318,8 @@ public class AdminResource {
throw new AtlasBaseException(excp);
} finally {
+ releaseExportImportLock();
+
if (exportSink != null) {
exportSink.close();
}
@@ -327,6 +339,8 @@ public class AdminResource {
LOG.debug("==> AdminResource.importData(bytes.length={})", bytes.length);
}
+ acquireExportImportLock("import");
+
AtlasImportResult result;
try {
@@ -344,6 +358,8 @@ public class AdminResource {
throw new AtlasBaseException(excp);
} finally {
+ releaseExportImportLock();
+
if (LOG.isDebugEnabled()) {
LOG.debug("<== AdminResource.importData(binary)");
}
@@ -360,6 +376,8 @@ public class AdminResource {
LOG.debug("==> AdminResource.importFile()");
}
+ acquireExportImportLock("importFile");
+
AtlasImportResult result;
try {
@@ -374,6 +392,8 @@ public class AdminResource {
throw new AtlasBaseException(excp);
} finally {
+ releaseExportImportLock();
+
if (LOG.isDebugEnabled()) {
LOG.debug("<== AdminResource.importFile()");
}
@@ -407,4 +427,15 @@ public class AdminResource {
return ret;
}
+
+ private void acquireExportImportLock(String activity) throws AtlasBaseException {
+ boolean alreadyLocked = importExportOperationLock.isLocked();
+ if (alreadyLocked) {
+ LOG.warn("Another export or import is currently in progress..aborting this " + activity, Thread.currentThread().getName());
+
+ throw new AtlasBaseException(AtlasErrorCode.FAILED_TO_OBTAIN_IMPORT_EXPORT_LOCK);
+ }
+
+ importExportOperationLock.lock();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1d85e95f/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java
index 7d3d442..04bb4d3 100644
--- a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java
@@ -75,7 +75,7 @@ public class ExportService {
public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName,
String requestingIP) throws AtlasBaseException {
-
+ long startTimestamp = System.currentTimeMillis();
ExportContext context = new ExportContext(new AtlasExportResult(request, userName, hostName, requestingIP,
System.currentTimeMillis()), exportSink);
@@ -90,6 +90,9 @@ public class ExportService {
context.sink.setTypesDef(context.result.getData().getTypesDef());
context.result.setData(null);
context.result.setOperationStatus(AtlasExportResult.OperationStatus.SUCCESS);
+
+ long endTimestamp = System.currentTimeMillis();
+ context.result.incrementMeticsCounter("duration", (int) (endTimestamp - startTimestamp));
context.sink.setResult(context.result);
} catch(Exception ex) {
LOG.error("Operation failed: ", ex);
@@ -175,7 +178,7 @@ public class ExportService {
context.sink.add(entity);
context.result.incrementMeticsCounter(String.format("entity:%s", entity.getTypeName()));
- context.result.incrementMeticsCounter("Entities");
+ context.result.incrementMeticsCounter("entities");
if (context.guidsProcessed.size() % 10 == 0) {
LOG.info("export(): in progress.. number of entities exported: {}", context.guidsProcessed.size());
@@ -195,7 +198,7 @@ public class ExportService {
AtlasClassificationDef cd = typeRegistry.getClassificationDefByName(c.getTypeName());
typesDef.getClassificationDefs().add(cd);
- result.incrementMeticsCounter("Classification");
+ result.incrementMeticsCounter("typedef:classification");
}
}
}
@@ -208,7 +211,7 @@ public class ExportService {
AtlasEntityDef typeDefinition = typeRegistry.getEntityDefByName(typeName);
typesDef.getEntityDefs().add(typeDefinition);
- result.incrementMeticsCounter("Type(s)");
+ result.incrementMeticsCounter("typedef:" + typeDefinition.getName());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1d85e95f/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java b/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java
index 7554cdb..7b0c887 100644
--- a/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java
@@ -20,6 +20,7 @@ package org.apache.atlas.web.resources;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.typedef.*;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.commons.io.FileUtils;
@@ -31,6 +32,7 @@ import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.List;
+import java.util.concurrent.TimeUnit;
public class ImportService {
@@ -39,6 +41,9 @@ public class ImportService {
private final AtlasTypeDefStore typeDefStore;
private final AtlasEntityStore entityStore;
+ private long startTimestamp;
+ private long endTimestamp;
+
public ImportService(final AtlasTypeDefStore typeDefStore, final AtlasEntityStore entityStore) {
this.typeDefStore = typeDefStore;
@@ -52,6 +57,7 @@ public class ImportService {
try {
LOG.info("==> import(user={}, from={})", userName, requestingIP);
+ startTimestamp = System.currentTimeMillis();
processTypes(source.getTypesDef(), result);
processEntities(source, result);
@@ -65,12 +71,7 @@ public class ImportService {
throw new AtlasBaseException(excp);
} finally {
- try {
- source.close();
- } catch (IOException e) {
- // ignore
- }
-
+ source.close();
LOG.info("<== import(user={}, from={}): status={}", userName, requestingIP, result.getOperationStatus());
}
@@ -118,10 +119,14 @@ public class ImportService {
setGuidToEmpty(typeDefinitionMap.getEntityDefs());
typeDefStore.updateTypesDef(typeDefinitionMap);
- result.incrementMeticsCounter("Enum(s)", typeDefinitionMap.getEnumDefs().size());
- result.incrementMeticsCounter("Struct(s)", typeDefinitionMap.getStructDefs().size());
- result.incrementMeticsCounter("Classification(s)", typeDefinitionMap.getClassificationDefs().size());
- result.incrementMeticsCounter("Entity definition(s)", typeDefinitionMap.getEntityDefs().size());
+ updateMetricsForTypesDef(typeDefinitionMap, result);
+ }
+
+ private void updateMetricsForTypesDef(AtlasTypesDef typeDefinitionMap, AtlasImportResult result) {
+ result.incrementMeticsCounter("typedef:classification", typeDefinitionMap.getClassificationDefs().size());
+ result.incrementMeticsCounter("typedef:enum", typeDefinitionMap.getEnumDefs().size());
+ result.incrementMeticsCounter("typedef:entitydef", typeDefinitionMap.getEntityDefs().size());
+ result.incrementMeticsCounter("typedef:struct", typeDefinitionMap.getStructDefs().size());
}
private void setGuidToEmpty(List<AtlasEntityDef> entityDefList) {
@@ -131,7 +136,9 @@ public class ImportService {
}
private void processEntities(ZipSource importSource, AtlasImportResult result) throws AtlasBaseException {
- this.entityStore.createOrUpdate(importSource, false);
- result.incrementMeticsCounter("Entities", importSource.getCreationOrder().size());
+ this.entityStore.bulkImport(importSource, result);
+
+ endTimestamp = System.currentTimeMillis();
+ result.incrementMeticsCounter("Duration", (int) (this.endTimestamp - this.startTimestamp));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1d85e95f/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java
index ea62862..e69a139 100644
--- a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java
@@ -19,7 +19,6 @@ package org.apache.atlas.web.resources;
import org.codehaus.jackson.type.TypeReference;
import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.graph.v1.EntityImportStream;
@@ -28,8 +27,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
@@ -37,103 +38,73 @@ import java.util.zip.ZipInputStream;
public class ZipSource implements EntityImportStream {
private static final Logger LOG = LoggerFactory.getLogger(ZipSource.class);
- private final ByteArrayInputStream inputStream;
- private List<String> creationOrder;
- private Iterator<String> iterator;
+ private final ByteArrayInputStream inputStream;
+ private List<String> creationOrder;
+ private Iterator<String> iterator;
+ private Map<String, String> guidEntityJsonMap;
- public ZipSource(ByteArrayInputStream inputStream) {
+ public ZipSource(ByteArrayInputStream inputStream) throws IOException {
this.inputStream = inputStream;
+ guidEntityJsonMap = new HashMap<>();
+ updateGuidZipEntryMap();
this.setCreationOrder();
}
public AtlasTypesDef getTypesDef() throws AtlasBaseException {
final String fileName = ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString();
- try {
- String s = get(fileName);
- return convertFromJson(AtlasTypesDef.class, s);
- } catch (IOException e) {
- LOG.error(String.format("Error retrieving '%s' from zip.", fileName), e);
- return null;
- }
+ String s = getFromCache(fileName);
+ return convertFromJson(AtlasTypesDef.class, s);
}
- public AtlasExportResult getExportResult() throws AtlasBaseException {
- String fileName = ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toString();
- try {
- String s = get(fileName);
- return convertFromJson(AtlasExportResult.class, s);
- } catch (IOException e) {
- LOG.error(String.format("Error retrieving '%s' from zip.", fileName), e);
- return null;
- }
- }
-
-
private void setCreationOrder() {
String fileName = ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString();
try {
- String s = get(fileName);
+ String s = getFromCache(fileName);
this.creationOrder = convertFromJson(new TypeReference<List<String>>(){}, s);
this.iterator = this.creationOrder.iterator();
- } catch (IOException e) {
- LOG.error(String.format("Error retrieving '%s' from zip.", fileName), e);
} catch (AtlasBaseException e) {
LOG.error(String.format("Error retrieving '%s' from zip.", fileName), e);
}
}
- public List<String> getCreationOrder() throws AtlasBaseException {
- return this.creationOrder;
- }
-
- public AtlasEntity getEntity(String guid) throws AtlasBaseException {
- try {
- String s = get(guid);
- return convertFromJson(AtlasEntity.class, s);
- } catch (IOException e) {
- LOG.error(String.format("Error retrieving '%s' from zip.", guid), e);
- return null;
- }
- }
-
- private String get(String entryName) throws IOException {
- String ret = "";
+ private void updateGuidZipEntryMap() throws IOException {
inputStream.reset();
ZipInputStream zipInputStream = new ZipInputStream(inputStream);
- ZipEntry zipEntry = zipInputStream.getNextEntry();
-
- entryName = entryName + ".json";
-
+ ZipEntry zipEntry = zipInputStream.getNextEntry();
while (zipEntry != null) {
- if (zipEntry.getName().equals(entryName)) {
- break;
- }
+ String entryName = zipEntry.getName().replace(".json", "");
- zipEntry = zipInputStream.getNextEntry();
- }
+ if (guidEntityJsonMap.containsKey(entryName)) continue;
+ if (zipEntry == null) continue;
- if (zipEntry != null) {
- ByteArrayOutputStream os = new ByteArrayOutputStream();
- byte[] buf = new byte[1024];
+ byte[] buf = new byte[1024];
int n = 0;
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
while ((n = zipInputStream.read(buf, 0, 1024)) > -1) {
- os.write(buf, 0, n);
+ bos.write(buf, 0, n);
}
- ret = os.toString();
- } else {
- LOG.warn("{}: no such entry in zip file", entryName);
+ guidEntityJsonMap.put(entryName, bos.toString());
+ zipEntry = zipInputStream.getNextEntry();
+
}
zipInputStream.close();
+ }
+
+ public List<String> getCreationOrder() throws AtlasBaseException {
+ return this.creationOrder;
+ }
- return ret;
+ public AtlasEntity getEntity(String guid) throws AtlasBaseException {
+ String s = getFromCache(guid);
+ return convertFromJson(AtlasEntity.class, s);
}
private <T> T convertFromJson(TypeReference clazz, String jsonData) throws AtlasBaseException {
@@ -158,8 +129,20 @@ public class ZipSource implements EntityImportStream {
}
}
- public void close() throws IOException {
- inputStream.close();
+ private String getFromCache(String entryName) {
+ if(!guidEntityJsonMap.containsKey(entryName)) return "";
+
+ return guidEntityJsonMap.get(entryName).toString();
+ }
+
+ public void close() {
+ try {
+ inputStream.close();
+ guidEntityJsonMap.clear();
+ }
+ catch(IOException ex) {
+ LOG.warn("{}: Error closing streams.");
+ }
}
@Override