You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2019/10/02 19:09:28 UTC
[atlas] branch branch-2.0 updated: ATLAS-3346: Support to
re-activated deleted entity during import.
This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 7d44670 ATLAS-3346: Support to re-activated deleted entity during import.
7d44670 is described below
commit 7d44670678581de117afa9b0191a4035b44e6648
Author: Nikhil Bonte <ni...@freestoneinfotech.com>
AuthorDate: Tue Oct 1 10:32:42 2019 -0700
ATLAS-3346: Support to re-activated deleted entity during import.
Signed-off-by: Ashutosh Mestry <am...@hortonworks.com>
(cherry picked from commit 74bf4cdaae598bebd3f7956495b93d4e0b1242f6)
---
.../store/graph/v2/AtlasEntityStoreV2.java | 6 +-
.../store/graph/v2/EntityGraphMapper.java | 52 ++++++
.../impexp/ImportReactivateTableTest.java | 187 +++++++++++++++++++++
3 files changed, 242 insertions(+), 3 deletions(-)
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index 0d3d82a..607adc0 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -959,9 +959,9 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
context.addEntityToDelete(vertex);
} else if (currStatus == Status.DELETED && newStatus == Status.ACTIVE) {
- LOG.warn("attempt to activate deleted entity (guid={}). Ignored", guid);
-
- entity.setStatus(currStatus);
+ LOG.warn("Import is attempting to activate deleted entity (guid={}).", guid);
+ entityGraphMapper.importActivateEntity(vertex, entity);
+ context.addCreated(guid, entity, entityType, vertex);
}
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index db65863..495d788 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -42,6 +42,7 @@ import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.graph.FullTextMapperV2;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
@@ -2099,4 +2100,55 @@ public class EntityGraphMapper {
private static String getSoftRefFormattedString(String typeName, String resolvedGuid) {
return String.format(SOFT_REF_FORMAT, typeName, resolvedGuid);
}
+
+ public void importActivateEntity(AtlasVertex vertex, AtlasEntity entity) {
+ AtlasGraphUtilsV2.setEncodedProperty(vertex, STATE_PROPERTY_KEY, ACTIVE);
+
+ if (MapUtils.isNotEmpty(entity.getRelationshipAttributes())) {
+ Set<String> relatedEntitiesGuids = getRelatedEntitiesGuids(entity);
+ activateEntityRelationships(vertex, relatedEntitiesGuids);
+ }
+ }
+
+ private void activateEntityRelationships(AtlasVertex vertex, Set<String> relatedEntitiesGuids) {
+ Iterator<AtlasEdge> edgeIterator = vertex.getEdges(AtlasEdgeDirection.BOTH).iterator();
+
+ while (edgeIterator.hasNext()) {
+ AtlasEdge edge = edgeIterator.next();
+
+ if (AtlasGraphUtilsV2.getState(edge) != DELETED) {
+ continue;
+ }
+
+ final String relatedEntityGuid;
+ if (Objects.equals(edge.getInVertex().getId(), vertex.getId())) {
+ relatedEntityGuid = AtlasGraphUtilsV2.getIdFromVertex(edge.getOutVertex());
+ } else {
+ relatedEntityGuid = AtlasGraphUtilsV2.getIdFromVertex(edge.getInVertex());
+ }
+
+ if (StringUtils.isEmpty(relatedEntityGuid) || !relatedEntitiesGuids.contains(relatedEntityGuid)) {
+ continue;
+ }
+
+ edge.setProperty(STATE_PROPERTY_KEY, AtlasRelationship.Status.ACTIVE);
+ }
+ }
+
+ private Set<String> getRelatedEntitiesGuids(AtlasEntity entity) {
+ Set<String> relGuidsSet = new HashSet<>();
+
+ for (Object o : entity.getRelationshipAttributes().values()) {
+ if (o instanceof AtlasObjectId) {
+ relGuidsSet.add(((AtlasObjectId) o).getGuid());
+ } else if (o instanceof List) {
+ for (Object id : (List) o) {
+ if (id instanceof AtlasObjectId) {
+ relGuidsSet.add(((AtlasObjectId) id).getGuid());
+ }
+ }
+ }
+ }
+ return relGuidsSet;
+ }
}
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportReactivateTableTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportReactivateTableTest.java
new file mode 100644
index 0000000..d0c06a1
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportReactivateTableTest.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.impexp;
+
+import com.google.inject.Inject;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.TestUtilsV2;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
+import org.apache.atlas.model.instance.AtlasRelationship;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
+import org.apache.atlas.runner.LocalSolrRunner;
+import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import static org.apache.atlas.AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME;
+import static org.apache.atlas.graph.GraphSandboxUtil.useLocalSolr;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getInputStreamFrom;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getDefaultImportRequest;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadFsModel;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadHiveModel;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithParameters;
+import static org.apache.atlas.type.AtlasTypeUtil.toAtlasRelatedObjectId;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class ImportReactivateTableTest extends ExportImportTestBase {
+ private static final Logger LOG = LoggerFactory.getLogger(ImportReactivateTableTest.class);
+
+ private static final String ENTITY_TYPE_COL = "hive_column";
+
+ private static final String ENTITY_GUID_TABLE_WITH_REL_ATTRS = "e19e5683-d9ae-436a-af1e-0873582d0f1e";
+ private static final String ENTITY_GUID_TABLE_WITHOUT_REL_ATTRS = "027a987e-867a-4c98-ac1e-c5ded41130d3";
+
+ private static final String REPL_FROM = "cl1";
+ private static final String REPL_TRANSFORMER = "[{\"conditions\":{\"__entity\":\"topLevel: \"}," +
+ "\"action\":{\"__entity\":\"ADD_CLASSIFICATION: cl1_replicated\"}}," +
+ "{\"action\":{\"__entity.replicatedTo\":\"CLEAR:\",\"__entity.replicatedFrom\":\"CLEAR:\"}}," +
+ "{\"conditions\":{\"hive_db.clusterName\":\"EQUALS: cl1\"},\"action\":{\"hive_db.clusterName\":\"SET: cl2\"}}," +
+ "{\"conditions\":{\"hive_db.location\":\"STARTS_WITH_IGNORE_CASE: file:///\"}," +
+ "\"action\":{\"hive_db.location\":\"REPLACE_PREFIX: = :file:///=file:///\"}}," +
+ "{\"conditions\":{\"hive_storagedesc.location\":\"STARTS_WITH_IGNORE_CASE: file:///\"}," +
+ "\"action\":{\"hive_storagedesc.location\":\"REPLACE_PREFIX: = :file:///=file:///\"}}]";
+
+ @Inject
+ private ImportService importService;
+
+ @Inject
+ private AtlasTypeRegistry typeRegistry;
+
+ @Inject
+ private AtlasEntityStore entityStore;
+
+ @Inject
+ private AtlasTypeDefStore typeDefStore;
+
+ @BeforeTest
+ public void setup() throws IOException, AtlasBaseException {
+ RequestContext.clear();
+ RequestContext.get().setUser(TestUtilsV2.TEST_USER, null);
+ basicSetup(typeDefStore, typeRegistry);
+ }
+
+ @AfterClass
+ public void clear() throws Exception {
+ AtlasGraphProvider.cleanup();
+
+ if (useLocalSolr()) {
+ LocalSolrRunner.stop();
+ }
+ }
+
+ private void importSeedData() throws AtlasBaseException, IOException {
+ loadFsModel(typeDefStore, typeRegistry);
+ loadHiveModel(typeDefStore, typeRegistry);
+ AtlasImportRequest atlasImportRequest = getDefaultImportRequest();
+
+ atlasImportRequest.setOption("replicatedFrom", REPL_FROM);
+ atlasImportRequest.setOption("transformers", REPL_TRANSFORMER);
+
+ runImportWithParameters(importService, atlasImportRequest, getDataWithoutRelationshipAttrs());
+ runImportWithParameters(importService, atlasImportRequest, getDataWithRelationshipAttrs());
+ }
+
+ public static InputStream getDataWithRelationshipAttrs() {
+ return getInputStreamFrom("repl_exp_1.zip");
+ }
+
+ public static InputStream getDataWithoutRelationshipAttrs() {
+ return getInputStreamFrom("stocks.zip");
+ }
+
+ @Test
+ public void testWithRelationshipAttr() throws AtlasBaseException, IOException {
+ testReactivation(ENTITY_GUID_TABLE_WITH_REL_ATTRS, 2);
+ }
+
+ @Test
+ public void testWithoutRelationshipAttr() throws AtlasBaseException, IOException {
+ testReactivation(ENTITY_GUID_TABLE_WITHOUT_REL_ATTRS, 7);
+ }
+
+ public void testReactivation(String tableEntityGuid, int columnCount) throws AtlasBaseException, IOException {
+ importSeedData();
+
+ AtlasEntity.AtlasEntityWithExtInfo entity = entityStore.getById(tableEntityGuid);
+ EntityMutationResponse response = createColumn(entity.getEntity());
+ String columnGuid = response.getCreatedEntities().get(0).getGuid();
+ assertNotNull(columnGuid);
+ columnCount++;
+
+ entityStore.deleteById(tableEntityGuid);
+ entity = entityStore.getById(tableEntityGuid);
+ assertEquals(entity.getEntity().getStatus(), AtlasEntity.Status.DELETED);
+
+ importSeedData();
+
+ AtlasEntity atlasEntity = entityStore.getById(tableEntityGuid).getEntity();
+
+ assertEquals(atlasEntity.getStatus(), AtlasEntity.Status.ACTIVE);
+ List<AtlasRelatedObjectId> columns = (List<AtlasRelatedObjectId>) atlasEntity.getRelationshipAttribute("columns");
+ assertEquals(columns.size(), columnCount);
+
+ int activeColumnCount = 0;
+ int deletedColumnCount = 0;
+ for (AtlasRelatedObjectId column : columns) {
+ if (column.getGuid().equals(columnGuid)){
+ assertEquals(column.getEntityStatus(), AtlasEntity.Status.DELETED);
+ assertEquals(column.getRelationshipStatus(), AtlasRelationship.Status.DELETED);
+ deletedColumnCount++;
+ }else{
+ assertEquals(column.getEntityStatus(), AtlasEntity.Status.ACTIVE);
+ assertEquals(column.getRelationshipStatus(), AtlasRelationship.Status.ACTIVE);
+ activeColumnCount++;
+ }
+ }
+ assertEquals(activeColumnCount, --columnCount);
+ assertEquals(deletedColumnCount, 1);
+ }
+
+ private EntityMutationResponse createColumn(AtlasEntity tblEntity) throws AtlasBaseException {
+ AtlasEntity ret = new AtlasEntity(ENTITY_TYPE_COL);
+ String name = "new_column";
+
+ ret.setAttribute("name", name);
+ ret.setAttribute(REFERENCEABLE_ATTRIBUTE_NAME, name + REPL_FROM);
+ ret.setAttribute("type", "int");
+ ret.setAttribute("comment", name);
+
+ ret.setRelationshipAttribute("table", toAtlasRelatedObjectId(tblEntity));
+ EntityMutationResponse response = entityStore.createOrUpdate(new AtlasEntityStream(ret), false);
+
+ return response;
+ }
+}