You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2019/10/02 19:09:28 UTC

[atlas] branch branch-2.0 updated: ATLAS-3346: Support to re-activated deleted entity during import.

This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 7d44670  ATLAS-3346: Support to re-activated deleted entity during import.
7d44670 is described below

commit 7d44670678581de117afa9b0191a4035b44e6648
Author: Nikhil Bonte <ni...@freestoneinfotech.com>
AuthorDate: Tue Oct 1 10:32:42 2019 -0700

    ATLAS-3346: Support to re-activated deleted entity during import.
    
    Signed-off-by: Ashutosh Mestry <am...@hortonworks.com>
    (cherry picked from commit 74bf4cdaae598bebd3f7956495b93d4e0b1242f6)
---
 .../store/graph/v2/AtlasEntityStoreV2.java         |   6 +-
 .../store/graph/v2/EntityGraphMapper.java          |  52 ++++++
 .../impexp/ImportReactivateTableTest.java          | 187 +++++++++++++++++++++
 3 files changed, 242 insertions(+), 3 deletions(-)

diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index 0d3d82a..607adc0 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -959,9 +959,9 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
 
                             context.addEntityToDelete(vertex);
                         } else if (currStatus == Status.DELETED && newStatus == Status.ACTIVE) {
-                            LOG.warn("attempt to activate deleted entity (guid={}). Ignored", guid);
-
-                            entity.setStatus(currStatus);
+                            LOG.warn("Import is attempting to activate deleted entity (guid={}).", guid);
+                            entityGraphMapper.importActivateEntity(vertex, entity);
+                            context.addCreated(guid, entity, entityType, vertex);
                         }
                     }
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index db65863..495d788 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -42,6 +42,7 @@ import org.apache.atlas.repository.converters.AtlasInstanceConverter;
 import org.apache.atlas.repository.graph.FullTextMapperV2;
 import org.apache.atlas.repository.graph.GraphHelper;
 import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
@@ -2099,4 +2100,55 @@ public class EntityGraphMapper {
     private static String getSoftRefFormattedString(String typeName, String resolvedGuid) {
         return String.format(SOFT_REF_FORMAT, typeName, resolvedGuid);
     }
+
+    public void importActivateEntity(AtlasVertex vertex, AtlasEntity entity) {
+        AtlasGraphUtilsV2.setEncodedProperty(vertex, STATE_PROPERTY_KEY, ACTIVE);
+
+        if (MapUtils.isNotEmpty(entity.getRelationshipAttributes())) {
+            Set<String> relatedEntitiesGuids = getRelatedEntitiesGuids(entity);
+            activateEntityRelationships(vertex, relatedEntitiesGuids);
+        }
+    }
+
+    private void activateEntityRelationships(AtlasVertex vertex, Set<String> relatedEntitiesGuids) {
+        Iterator<AtlasEdge> edgeIterator = vertex.getEdges(AtlasEdgeDirection.BOTH).iterator();
+
+        while (edgeIterator.hasNext()) {
+            AtlasEdge edge = edgeIterator.next();
+
+            if (AtlasGraphUtilsV2.getState(edge) != DELETED) {
+                continue;
+            }
+
+            final String relatedEntityGuid;
+            if (Objects.equals(edge.getInVertex().getId(), vertex.getId())) {
+                relatedEntityGuid = AtlasGraphUtilsV2.getIdFromVertex(edge.getOutVertex());
+            } else {
+                relatedEntityGuid = AtlasGraphUtilsV2.getIdFromVertex(edge.getInVertex());
+            }
+
+            if (StringUtils.isEmpty(relatedEntityGuid) || !relatedEntitiesGuids.contains(relatedEntityGuid)) {
+                continue;
+            }
+
+            edge.setProperty(STATE_PROPERTY_KEY, AtlasRelationship.Status.ACTIVE);
+        }
+    }
+
+    private Set<String> getRelatedEntitiesGuids(AtlasEntity entity) {
+        Set<String> relGuidsSet = new HashSet<>();
+
+        for (Object o : entity.getRelationshipAttributes().values()) {
+            if (o instanceof AtlasObjectId) {
+                relGuidsSet.add(((AtlasObjectId) o).getGuid());
+            } else if (o instanceof List) {
+                for (Object id : (List) o) {
+                    if (id instanceof AtlasObjectId) {
+                        relGuidsSet.add(((AtlasObjectId) id).getGuid());
+                    }
+                }
+            }
+        }
+        return relGuidsSet;
+    }
 }
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportReactivateTableTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportReactivateTableTest.java
new file mode 100644
index 0000000..d0c06a1
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportReactivateTableTest.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.impexp;
+
+import com.google.inject.Inject;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.TestUtilsV2;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
+import org.apache.atlas.model.instance.AtlasRelationship;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
+import org.apache.atlas.runner.LocalSolrRunner;
+import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import static org.apache.atlas.AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME;
+import static org.apache.atlas.graph.GraphSandboxUtil.useLocalSolr;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getInputStreamFrom;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getDefaultImportRequest;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadFsModel;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadHiveModel;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithParameters;
+import static org.apache.atlas.type.AtlasTypeUtil.toAtlasRelatedObjectId;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class ImportReactivateTableTest extends ExportImportTestBase {
+    private static final Logger LOG = LoggerFactory.getLogger(ImportReactivateTableTest.class);
+
+    private static final String ENTITY_TYPE_COL = "hive_column";
+
+    private static final String ENTITY_GUID_TABLE_WITH_REL_ATTRS = "e19e5683-d9ae-436a-af1e-0873582d0f1e";
+    private static final String ENTITY_GUID_TABLE_WITHOUT_REL_ATTRS = "027a987e-867a-4c98-ac1e-c5ded41130d3";
+
+    private static final String REPL_FROM = "cl1";
+    private static final String REPL_TRANSFORMER = "[{\"conditions\":{\"__entity\":\"topLevel: \"}," +
+            "\"action\":{\"__entity\":\"ADD_CLASSIFICATION: cl1_replicated\"}}," +
+            "{\"action\":{\"__entity.replicatedTo\":\"CLEAR:\",\"__entity.replicatedFrom\":\"CLEAR:\"}}," +
+            "{\"conditions\":{\"hive_db.clusterName\":\"EQUALS: cl1\"},\"action\":{\"hive_db.clusterName\":\"SET: cl2\"}}," +
+            "{\"conditions\":{\"hive_db.location\":\"STARTS_WITH_IGNORE_CASE: file:///\"}," +
+            "\"action\":{\"hive_db.location\":\"REPLACE_PREFIX: = :file:///=file:///\"}}," +
+            "{\"conditions\":{\"hive_storagedesc.location\":\"STARTS_WITH_IGNORE_CASE: file:///\"}," +
+            "\"action\":{\"hive_storagedesc.location\":\"REPLACE_PREFIX: = :file:///=file:///\"}}]";
+
+    @Inject
+    private ImportService importService;
+
+    @Inject
+    private AtlasTypeRegistry typeRegistry;
+
+    @Inject
+    private AtlasEntityStore entityStore;
+
+    @Inject
+    private AtlasTypeDefStore typeDefStore;
+
+    @BeforeTest
+    public void setup() throws IOException, AtlasBaseException {
+        RequestContext.clear();
+        RequestContext.get().setUser(TestUtilsV2.TEST_USER, null);
+        basicSetup(typeDefStore, typeRegistry);
+    }
+
+    @AfterClass
+    public void clear() throws Exception {
+        AtlasGraphProvider.cleanup();
+
+        if (useLocalSolr()) {
+            LocalSolrRunner.stop();
+        }
+    }
+
+    private void importSeedData() throws AtlasBaseException, IOException {
+        loadFsModel(typeDefStore, typeRegistry);
+        loadHiveModel(typeDefStore, typeRegistry);
+        AtlasImportRequest atlasImportRequest = getDefaultImportRequest();
+
+        atlasImportRequest.setOption("replicatedFrom", REPL_FROM);
+        atlasImportRequest.setOption("transformers", REPL_TRANSFORMER);
+
+        runImportWithParameters(importService, atlasImportRequest, getDataWithoutRelationshipAttrs());
+        runImportWithParameters(importService, atlasImportRequest, getDataWithRelationshipAttrs());
+    }
+
+    public static InputStream getDataWithRelationshipAttrs() {
+        return getInputStreamFrom("repl_exp_1.zip");
+    }
+
+    public static InputStream getDataWithoutRelationshipAttrs() {
+        return getInputStreamFrom("stocks.zip");
+    }
+
+    @Test
+    public void testWithRelationshipAttr() throws AtlasBaseException, IOException {
+        testReactivation(ENTITY_GUID_TABLE_WITH_REL_ATTRS, 2);
+    }
+
+    @Test
+    public void testWithoutRelationshipAttr() throws AtlasBaseException, IOException {
+        testReactivation(ENTITY_GUID_TABLE_WITHOUT_REL_ATTRS, 7);
+    }
+
+    public void testReactivation(String tableEntityGuid, int columnCount) throws AtlasBaseException, IOException {
+        importSeedData();
+
+        AtlasEntity.AtlasEntityWithExtInfo entity = entityStore.getById(tableEntityGuid);
+        EntityMutationResponse response = createColumn(entity.getEntity());
+        String columnGuid = response.getCreatedEntities().get(0).getGuid();
+        assertNotNull(columnGuid);
+        columnCount++;
+
+        entityStore.deleteById(tableEntityGuid);
+        entity = entityStore.getById(tableEntityGuid);
+        assertEquals(entity.getEntity().getStatus(), AtlasEntity.Status.DELETED);
+
+        importSeedData();
+
+        AtlasEntity atlasEntity = entityStore.getById(tableEntityGuid).getEntity();
+
+        assertEquals(atlasEntity.getStatus(), AtlasEntity.Status.ACTIVE);
+        List<AtlasRelatedObjectId> columns = (List<AtlasRelatedObjectId>) atlasEntity.getRelationshipAttribute("columns");
+        assertEquals(columns.size(), columnCount);
+
+        int activeColumnCount = 0;
+        int deletedColumnCount = 0;
+        for (AtlasRelatedObjectId column : columns) {
+            if (column.getGuid().equals(columnGuid)){
+                assertEquals(column.getEntityStatus(), AtlasEntity.Status.DELETED);
+                assertEquals(column.getRelationshipStatus(), AtlasRelationship.Status.DELETED);
+                deletedColumnCount++;
+            }else{
+                assertEquals(column.getEntityStatus(), AtlasEntity.Status.ACTIVE);
+                assertEquals(column.getRelationshipStatus(), AtlasRelationship.Status.ACTIVE);
+                activeColumnCount++;
+            }
+        }
+        assertEquals(activeColumnCount, --columnCount);
+        assertEquals(deletedColumnCount, 1);
+    }
+
+    private EntityMutationResponse createColumn(AtlasEntity tblEntity) throws AtlasBaseException {
+        AtlasEntity ret = new AtlasEntity(ENTITY_TYPE_COL);
+        String name = "new_column";
+
+        ret.setAttribute("name", name);
+        ret.setAttribute(REFERENCEABLE_ATTRIBUTE_NAME, name + REPL_FROM);
+        ret.setAttribute("type", "int");
+        ret.setAttribute("comment", name);
+
+        ret.setRelationshipAttribute("table", toAtlasRelatedObjectId(tblEntity));
+        EntityMutationResponse response = entityStore.createOrUpdate(new AtlasEntityStream(ret), false);
+
+        return response;
+    }
+}