You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ni...@apache.org on 2019/11/06 11:57:48 UTC
[atlas] branch branch-0.8 updated: ATLAS-3346:- Import Service Add
support to re-activate (branch-0.8).
This is an automated email from the ASF dual-hosted git repository.
nixon pushed a commit to branch branch-0.8
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-0.8 by this push:
new 65751b6 ATLAS-3346:- Import Service Add support to re-activate (branch-0.8).
65751b6 is described below
commit 65751b651c7d24b27aa157c0b14abe41c8143bdf
Author: nikhilbonte <ni...@freestoneinfotech.com>
AuthorDate: Mon Sep 30 16:18:03 2019 +0530
ATLAS-3346:- Import Service Add support to re-activate (branch-0.8).
Signed-off-by: nixonrodrigues <ni...@apache.org>
---
.../store/graph/v1/AtlasEntityStoreV1.java | 10 +-
.../store/graph/v1/BulkImporterImpl.java | 2 +-
.../store/graph/v1/EntityGraphMapper.java | 36 ++++
.../impexp/ImportReactivateEntityTest.java | 184 +++++++++++++++++++++
4 files changed, 225 insertions(+), 7 deletions(-)
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index 39c763f..75483aa 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -733,6 +733,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
if (entity != null) {
AtlasVertex vertex = getResolvedEntityVertex(discoveryContext, entity);
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
if (vertex != null) {
// entity would be null if guid is not in the stream but referenced by an entity in the stream
@@ -742,7 +743,6 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
graphDiscoverer.validateAndNormalizeForUpdate(entity);
}
- AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
String guidVertex = AtlasGraphUtilsV1.getIdFromVertex(vertex);
if (!StringUtils.equals(guidVertex, guid)) { // if entity was found by unique attribute
@@ -755,8 +755,6 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
} else {
graphDiscoverer.validateAndNormalize(entity);
- AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
-
//Create vertices which do not exist in the repository
if ((entityStream instanceof EntityImportStream) && AtlasTypeUtil.isAssignedGuid(entity.getGuid())) {
vertex = entityGraphMapper.createVertexWithGuid(entity, entity.getGuid());
@@ -791,9 +789,9 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
context.addEntityToDelete(vertex);
} else if (currStatus == AtlasEntity.Status.DELETED && newStatus == AtlasEntity.Status.ACTIVE) {
- LOG.warn("attempt to activate deleted entity (guid={}). Ignored", guid);
-
- entity.setStatus(currStatus);
+ LOG.warn("Import is attempting to activate deleted entity (guid={}).", guid);
+ entityGraphMapper.importActivateEntity(vertex, discoveryContext.getReferencedGuids());
+ context.addCreated(guid, entity, entityType, vertex);
}
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java
index 17c6ac2..09bd92e 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java
@@ -80,7 +80,7 @@ public class BulkImporterImpl implements BulkImporter {
AtlasEntityWithExtInfo entityWithExtInfo = entityImportStreamWithResidualList.getNextEntityWithExtInfo();
AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
- if (entity == null || processedGuids.contains(entity.getGuid())) {
+ if (entity == null) {
continue;
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
index 7e3f027..f34506a 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
@@ -39,6 +39,7 @@ import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.RepositoryException;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.type.AtlasArrayType;
@@ -67,6 +68,8 @@ import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.CR
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PARTIAL_UPDATE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
+import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
+import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
import static org.apache.atlas.repository.graph.GraphHelper.string;
@@ -1163,4 +1166,37 @@ public class EntityGraphMapper {
return ret;
}
+
+ public void importActivateEntity(AtlasVertex vertex, List<String> relatedEntitiesGuids) {
+ AtlasGraphUtilsV1.setEncodedProperty(vertex, STATE_PROPERTY_KEY, ACTIVE);
+
+ if(CollectionUtils.isEmpty(relatedEntitiesGuids)){
+ return;
+ }
+ activateEntityEdges(vertex, relatedEntitiesGuids);
+ }
+
+ private void activateEntityEdges(AtlasVertex vertex, List<String> relatedEntitiesGuids) {
+ Iterator<AtlasEdge> edgeIterator = vertex.getEdges(AtlasEdgeDirection.BOTH).iterator();
+
+ while (edgeIterator.hasNext()) {
+ AtlasEdge edge = edgeIterator.next();
+
+ if (AtlasGraphUtilsV1.getState(edge) != DELETED) {
+ continue;
+ }
+
+ String relatedEntityGuid = (edge.getInVertex() != null &&
+ Objects.equals(edge.getInVertex().getId(), vertex.getId()) &&
+ edge.getOutVertex() != null)
+ ? AtlasGraphUtilsV1.getIdFromVertex(edge.getOutVertex())
+ : AtlasGraphUtilsV1.getIdFromVertex(edge.getInVertex());
+
+ if (StringUtils.isEmpty(relatedEntityGuid) || !relatedEntitiesGuids.contains(relatedEntityGuid)) {
+ continue;
+ }
+
+ edge.setProperty(STATE_PROPERTY_KEY, ACTIVE);
+ }
+ }
}
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportReactivateEntityTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportReactivateEntityTest.java
new file mode 100644
index 0000000..c97b87d
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportReactivateEntityTest.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.impexp;
+
+import com.google.inject.Inject;
+import org.apache.atlas.RequestContextV1;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.TestUtilsV2;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+
+import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream;
+import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import static org.apache.atlas.AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME;
+import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
+import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
+import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getInputStreamFrom;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getDefaultImportRequest;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadFsModel;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadHiveModel;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithParameters;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class ImportReactivateEntityTest extends ExportImportTestBase {
+ private static final String ENTITY_TYPE_COL = "hive_column";
+ private static final String COLUMNS_ATTR_NAME = "columns";
+ private static final String STORAGE_DESC_ATTR_NAME = "sd";
+
+ private static final String ENTITY_GUID_TABLE_WITH_REL_ATTRS = "e19e5683-d9ae-436a-af1e-0873582d0f1e";
+ private static final String ENTITY_GUID_TABLE_WITHOUT_REL_ATTRS = "027a987e-867a-4c98-ac1e-c5ded41130d3";
+
+ private static final String REPL_FROM = "cl1";
+
+ @Inject
+ private ImportService importService;
+
+ @Inject
+ private AtlasTypeRegistry typeRegistry;
+
+ @Inject
+ private AtlasEntityStore entityStore;
+
+ @Inject
+ private AtlasTypeDefStore typeDefStore;
+
+ @BeforeTest
+ public void setup() throws IOException, AtlasBaseException {
+ RequestContextV1.clear();
+ RequestContextV1.get().setUser(TestUtilsV2.TEST_USER);
+ basicSetup(typeDefStore, typeRegistry);
+ }
+
+ @AfterClass
+ public void clear() {
+ AtlasGraphProvider.cleanup();
+ }
+
+ @Test
+ public void testWithRelationshipAttr() throws AtlasBaseException, IOException {
+ testReactivation(ENTITY_GUID_TABLE_WITH_REL_ATTRS, 2);
+ }
+
+ @Test
+ public void testWithoutRelationshipAttr() throws AtlasBaseException, IOException {
+ testReactivation(ENTITY_GUID_TABLE_WITHOUT_REL_ATTRS, 7);
+ }
+
+ private void testReactivation(String tableEntityGuid, int columnCount) throws AtlasBaseException, IOException {
+ importSeedData();
+
+ String newColumnGuid = addColumnToTable(tableEntityGuid);
+ columnCount++;
+
+ entityStore.deleteById(tableEntityGuid);
+ AtlasEntity.AtlasEntityWithExtInfo entity = entityStore.getById(tableEntityGuid);
+ assertEquals(entity.getEntity().getStatus(), DELETED);
+
+ importSeedData();
+
+ assertActivatedEntities(tableEntityGuid, newColumnGuid, columnCount);
+ }
+
+ private void importSeedData() throws AtlasBaseException, IOException {
+ loadFsModel(typeDefStore, typeRegistry);
+ loadHiveModel(typeDefStore, typeRegistry);
+ AtlasImportRequest atlasImportRequest = getDefaultImportRequest();
+
+ runImportWithParameters(importService, atlasImportRequest, getDataWithoutRelationshipAttrs());
+ runImportWithParameters(importService, atlasImportRequest, getDataWithRelationshipAttrs());
+ }
+
+ private InputStream getDataWithoutRelationshipAttrs() {
+ return getInputStreamFrom("stocks.zip");
+ }
+
+ private InputStream getDataWithRelationshipAttrs() {
+ return getInputStreamFrom("repl_exp_1.zip");
+ }
+
+ private String addColumnToTable(String tableEntityGuid) throws AtlasBaseException {
+ AtlasEntity.AtlasEntityWithExtInfo entity = entityStore.getById(tableEntityGuid);
+ AtlasEntity newColumn = createColumn(entity.getEntity());
+ String newColumnGuid = newColumn.getGuid();
+ assertNotNull(newColumnGuid);
+ return newColumnGuid;
+ }
+
+ private AtlasEntity createColumn(AtlasEntity tableEntity) throws AtlasBaseException {
+ AtlasEntity ret = new AtlasEntity(ENTITY_TYPE_COL);
+ String name = "new_column";
+
+ ret.setAttribute("name", name);
+ ret.setAttribute(REFERENCEABLE_ATTRIBUTE_NAME, name + REPL_FROM);
+ ret.setAttribute("type", "int");
+ ret.setAttribute("comment", name);
+ ret.setAttribute("table", AtlasTypeUtil.getAtlasObjectId(tableEntity));
+
+ EntityMutationResponse response = entityStore.createOrUpdate(new AtlasEntityStream(ret), false);
+ ret.setAttribute(GUID_PROPERTY_KEY, response.getCreatedEntities().get(0).getGuid());
+
+ return ret;
+ }
+
+ private void assertActivatedEntities(String tableEntityGuid, String newColumnGuid,int columnCount) throws AtlasBaseException {
+ AtlasEntity atlasEntity = entityStore.getById(tableEntityGuid).getEntity();
+
+ String sdGuid = ((AtlasObjectId) atlasEntity.getAttribute(STORAGE_DESC_ATTR_NAME)).getGuid();
+ AtlasEntity sd = entityStore.getById(sdGuid).getEntity();
+ assertEquals(sd.getStatus(), ACTIVE);
+
+ assertEquals(atlasEntity.getStatus(), ACTIVE);
+ List<AtlasObjectId> columns = (List<AtlasObjectId>) atlasEntity.getAttribute(COLUMNS_ATTR_NAME);
+ assertEquals(columns.size(), columnCount);
+
+ int activeColumnCount = 0;
+ int deletedColumnCount = 0;
+ for (AtlasObjectId column : columns) {
+ final AtlasEntity.AtlasEntityWithExtInfo byId = entityStore.getById(column.getGuid());
+ if (column.getGuid().equals(newColumnGuid)){
+ assertEquals(byId.getEntity().getStatus(), DELETED);
+ deletedColumnCount++;
+ }else{
+ assertEquals(byId.getEntity().getStatus(), ACTIVE);
+ activeColumnCount++;
+ }
+ }
+ assertEquals(activeColumnCount, --columnCount);
+ assertEquals(deletedColumnCount, 1);
+ }
+}
\ No newline at end of file