You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ni...@apache.org on 2019/11/06 11:57:48 UTC

[atlas] branch branch-0.8 updated: ATLAS-3346:- Import Service Add support to re-activate (branch-0.8).

This is an automated email from the ASF dual-hosted git repository.

nixon pushed a commit to branch branch-0.8
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-0.8 by this push:
     new 65751b6  ATLAS-3346:- Import Service Add support to re-activate (branch-0.8).
65751b6 is described below

commit 65751b651c7d24b27aa157c0b14abe41c8143bdf
Author: nikhilbonte <ni...@freestoneinfotech.com>
AuthorDate: Mon Sep 30 16:18:03 2019 +0530

    ATLAS-3346:- Import Service Add support to re-activate (branch-0.8).
    
    Signed-off-by: nixonrodrigues <ni...@apache.org>
---
 .../store/graph/v1/AtlasEntityStoreV1.java         |  10 +-
 .../store/graph/v1/BulkImporterImpl.java           |   2 +-
 .../store/graph/v1/EntityGraphMapper.java          |  36 ++++
 .../impexp/ImportReactivateEntityTest.java         | 184 +++++++++++++++++++++
 4 files changed, 225 insertions(+), 7 deletions(-)

diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index 39c763f..75483aa 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -733,6 +733,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
 
             if (entity != null) {
                 AtlasVertex vertex = getResolvedEntityVertex(discoveryContext, entity);
+                AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
 
                 if (vertex != null) {
                     // entity would be null if guid is not in the stream but referenced by an entity in the stream
@@ -742,7 +743,6 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
                         graphDiscoverer.validateAndNormalizeForUpdate(entity);
                     }
 
-                    AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
                     String          guidVertex = AtlasGraphUtilsV1.getIdFromVertex(vertex);
 
                     if (!StringUtils.equals(guidVertex, guid)) { // if entity was found by unique attribute
@@ -755,8 +755,6 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
                 } else {
                     graphDiscoverer.validateAndNormalize(entity);
 
-                    AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
-
                     //Create vertices which do not exist in the repository
                     if ((entityStream instanceof EntityImportStream) && AtlasTypeUtil.isAssignedGuid(entity.getGuid())) {
                         vertex = entityGraphMapper.createVertexWithGuid(entity, entity.getGuid());
@@ -791,9 +789,9 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
 
                             context.addEntityToDelete(vertex);
                         } else if (currStatus == AtlasEntity.Status.DELETED && newStatus == AtlasEntity.Status.ACTIVE) {
-                            LOG.warn("attempt to activate deleted entity (guid={}). Ignored", guid);
-
-                            entity.setStatus(currStatus);
+                            LOG.warn("Import is attempting to activate deleted entity (guid={}).", guid);
+                            entityGraphMapper.importActivateEntity(vertex, discoveryContext.getReferencedGuids());
+                            context.addCreated(guid, entity, entityType, vertex);
                         }
                     }
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java
index 17c6ac2..09bd92e 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java
@@ -80,7 +80,7 @@ public class BulkImporterImpl implements BulkImporter {
             AtlasEntityWithExtInfo entityWithExtInfo = entityImportStreamWithResidualList.getNextEntityWithExtInfo();
             AtlasEntity            entity            = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
 
-            if (entity == null || processedGuids.contains(entity.getGuid())) {
+            if (entity == null) {
                 continue;
             }
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
index 7e3f027..f34506a 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
@@ -39,6 +39,7 @@ import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.RepositoryException;
 import org.apache.atlas.repository.graph.GraphHelper;
 import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.type.AtlasArrayType;
@@ -67,6 +68,8 @@ import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.CR
 import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
 import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PARTIAL_UPDATE;
 import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
+import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
+import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
 import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
 import static org.apache.atlas.repository.graph.GraphHelper.string;
 
@@ -1163,4 +1166,37 @@ public class EntityGraphMapper {
 
         return ret;
     }
+
+    public void importActivateEntity(AtlasVertex vertex, List<String> relatedEntitiesGuids) {
+        AtlasGraphUtilsV1.setEncodedProperty(vertex, STATE_PROPERTY_KEY, ACTIVE);
+
+        if(CollectionUtils.isEmpty(relatedEntitiesGuids)){
+           return;
+        }
+        activateEntityEdges(vertex, relatedEntitiesGuids);
+    }
+
+    private void activateEntityEdges(AtlasVertex vertex, List<String> relatedEntitiesGuids) {
+        Iterator<AtlasEdge> edgeIterator = vertex.getEdges(AtlasEdgeDirection.BOTH).iterator();
+
+        while (edgeIterator.hasNext()) {
+            AtlasEdge edge = edgeIterator.next();
+
+            if (AtlasGraphUtilsV1.getState(edge) != DELETED) {
+                continue;
+            }
+
+            String relatedEntityGuid = (edge.getInVertex() != null &&
+                                        Objects.equals(edge.getInVertex().getId(), vertex.getId()) &&
+                                        edge.getOutVertex() != null)
+                    ? AtlasGraphUtilsV1.getIdFromVertex(edge.getOutVertex())
+                    : AtlasGraphUtilsV1.getIdFromVertex(edge.getInVertex());
+
+            if (StringUtils.isEmpty(relatedEntityGuid) || !relatedEntitiesGuids.contains(relatedEntityGuid)) {
+                continue;
+            }
+
+            edge.setProperty(STATE_PROPERTY_KEY, ACTIVE);
+        }
+    }
 }
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportReactivateEntityTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportReactivateEntityTest.java
new file mode 100644
index 0000000..c97b87d
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportReactivateEntityTest.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.impexp;
+
+import com.google.inject.Inject;
+import org.apache.atlas.RequestContextV1;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.TestUtilsV2;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+
+import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream;
+import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import static org.apache.atlas.AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME;
+import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
+import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
+import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getInputStreamFrom;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getDefaultImportRequest;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadFsModel;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadHiveModel;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithParameters;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class ImportReactivateEntityTest extends ExportImportTestBase {
+    private static final String ENTITY_TYPE_COL        = "hive_column";
+    private static final String COLUMNS_ATTR_NAME      = "columns";
+    private static final String STORAGE_DESC_ATTR_NAME = "sd";
+
+    private static final String ENTITY_GUID_TABLE_WITH_REL_ATTRS    = "e19e5683-d9ae-436a-af1e-0873582d0f1e";
+    private static final String ENTITY_GUID_TABLE_WITHOUT_REL_ATTRS = "027a987e-867a-4c98-ac1e-c5ded41130d3";
+
+    private static final String REPL_FROM = "cl1";
+
+    @Inject
+    private ImportService importService;
+
+    @Inject
+    private AtlasTypeRegistry typeRegistry;
+
+    @Inject
+    private AtlasEntityStore entityStore;
+
+    @Inject
+    private AtlasTypeDefStore typeDefStore;
+
+    @BeforeTest
+    public void setup() throws IOException, AtlasBaseException {
+        RequestContextV1.clear();
+        RequestContextV1.get().setUser(TestUtilsV2.TEST_USER);
+        basicSetup(typeDefStore, typeRegistry);
+    }
+
+    @AfterClass
+    public void clear() {
+        AtlasGraphProvider.cleanup();
+    }
+
+    @Test
+    public void testWithRelationshipAttr() throws AtlasBaseException, IOException {
+        testReactivation(ENTITY_GUID_TABLE_WITH_REL_ATTRS, 2);
+    }
+
+    @Test
+    public void testWithoutRelationshipAttr() throws AtlasBaseException, IOException {
+        testReactivation(ENTITY_GUID_TABLE_WITHOUT_REL_ATTRS, 7);
+    }
+
+    private void testReactivation(String tableEntityGuid, int columnCount) throws AtlasBaseException, IOException {
+        importSeedData();
+
+        String newColumnGuid = addColumnToTable(tableEntityGuid);
+        columnCount++;
+
+        entityStore.deleteById(tableEntityGuid);
+        AtlasEntity.AtlasEntityWithExtInfo entity = entityStore.getById(tableEntityGuid);
+        assertEquals(entity.getEntity().getStatus(), DELETED);
+
+        importSeedData();
+
+        assertActivatedEntities(tableEntityGuid, newColumnGuid, columnCount);
+    }
+
+    private void importSeedData() throws AtlasBaseException, IOException {
+        loadFsModel(typeDefStore, typeRegistry);
+        loadHiveModel(typeDefStore, typeRegistry);
+        AtlasImportRequest atlasImportRequest = getDefaultImportRequest();
+
+        runImportWithParameters(importService, atlasImportRequest, getDataWithoutRelationshipAttrs());
+        runImportWithParameters(importService, atlasImportRequest, getDataWithRelationshipAttrs());
+    }
+
+    private InputStream getDataWithoutRelationshipAttrs() {
+        return getInputStreamFrom("stocks.zip");
+    }
+
+    private InputStream getDataWithRelationshipAttrs() {
+        return getInputStreamFrom("repl_exp_1.zip");
+    }
+
+    private String addColumnToTable(String tableEntityGuid)  throws AtlasBaseException {
+        AtlasEntity.AtlasEntityWithExtInfo entity = entityStore.getById(tableEntityGuid);
+        AtlasEntity newColumn = createColumn(entity.getEntity());
+        String newColumnGuid = newColumn.getGuid();
+        assertNotNull(newColumnGuid);
+        return newColumnGuid;
+    }
+
+    private AtlasEntity createColumn(AtlasEntity tableEntity) throws AtlasBaseException {
+        AtlasEntity ret = new AtlasEntity(ENTITY_TYPE_COL);
+        String name = "new_column";
+
+        ret.setAttribute("name", name);
+        ret.setAttribute(REFERENCEABLE_ATTRIBUTE_NAME, name + REPL_FROM);
+        ret.setAttribute("type", "int");
+        ret.setAttribute("comment", name);
+        ret.setAttribute("table", AtlasTypeUtil.getAtlasObjectId(tableEntity));
+
+        EntityMutationResponse response = entityStore.createOrUpdate(new AtlasEntityStream(ret), false);
+        ret.setAttribute(GUID_PROPERTY_KEY,  response.getCreatedEntities().get(0).getGuid());
+
+        return ret;
+    }
+
+    private void assertActivatedEntities(String tableEntityGuid, String newColumnGuid,int columnCount) throws AtlasBaseException {
+        AtlasEntity atlasEntity = entityStore.getById(tableEntityGuid).getEntity();
+
+        String sdGuid = ((AtlasObjectId) atlasEntity.getAttribute(STORAGE_DESC_ATTR_NAME)).getGuid();
+        AtlasEntity sd = entityStore.getById(sdGuid).getEntity();
+        assertEquals(sd.getStatus(), ACTIVE);
+
+        assertEquals(atlasEntity.getStatus(), ACTIVE);
+        List<AtlasObjectId> columns = (List<AtlasObjectId>) atlasEntity.getAttribute(COLUMNS_ATTR_NAME);
+        assertEquals(columns.size(), columnCount);
+
+        int activeColumnCount = 0;
+        int deletedColumnCount = 0;
+        for (AtlasObjectId column : columns) {
+            final AtlasEntity.AtlasEntityWithExtInfo byId = entityStore.getById(column.getGuid());
+            if (column.getGuid().equals(newColumnGuid)){
+                assertEquals(byId.getEntity().getStatus(), DELETED);
+                deletedColumnCount++;
+            }else{
+                assertEquals(byId.getEntity().getStatus(), ACTIVE);
+                activeColumnCount++;
+            }
+        }
+        assertEquals(activeColumnCount, --columnCount);
+        assertEquals(deletedColumnCount, 1);
+    }
+}
\ No newline at end of file