You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2019/05/31 17:58:40 UTC
[atlas] branch master updated: ATLAS-3251: Implement Patch to
populate classification text for legacy data.
This is an automated email from the ASF dual-hosted git repository.
amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 64c7a3b ATLAS-3251: Implement Patch to populate classification text for legacy data.
64c7a3b is described below
commit 64c7a3befcc90566f59cb805d51df34bdc7edc74
Author: skoritala <sk...@cloudera.com>
AuthorDate: Fri May 31 10:52:42 2019 -0700
ATLAS-3251: Implement Patch to populate classification text for legacy data.
Signed-off-by: Ashutosh Mestry <am...@hortonworks.com>
---
.../repository/patches/AtlasPatchHandler.java | 3 +-
.../repository/patches/AtlasPatchManager.java | 8 +-
.../patches/ClassificationTextPatch.java | 81 +++++++
.../patches/ConcurrentPatchProcessor.java | 233 +++++++++++++++++++++
.../atlas/repository/patches/PatchContext.java | 6 +-
.../repository/patches/UniqueAttributePatch.java | 200 +++---------------
.../store/graph/v2/EntityGraphMapper.java | 16 +-
7 files changed, 371 insertions(+), 176 deletions(-)
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchHandler.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchHandler.java
index 3e2bf53..970d4d7 100644
--- a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchHandler.java
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchHandler.java
@@ -17,6 +17,7 @@
*/
package org.apache.atlas.repository.patches;
+import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN;
@@ -64,5 +65,5 @@ public abstract class AtlasPatchHandler {
return patchId;
}
- public abstract void apply();
+ public abstract void apply() throws AtlasBaseException;
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java
index 629215d..259f246 100644
--- a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java
@@ -22,6 +22,7 @@ import org.apache.atlas.model.patches.AtlasPatch;
import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,8 +40,8 @@ public class AtlasPatchManager {
private final PatchContext context;
@Inject
- public AtlasPatchManager(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry, GraphBackedSearchIndexer indexer) {
- this.context = new PatchContext(atlasGraph, typeRegistry, indexer);
+ public AtlasPatchManager(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry, GraphBackedSearchIndexer indexer, EntityGraphMapper entityGraphMapper) {
+ this.context = new PatchContext(atlasGraph, typeRegistry, indexer, entityGraphMapper);
}
public AtlasPatch.AtlasPatches getAllPatches() {
@@ -49,7 +50,8 @@ public class AtlasPatchManager {
public void applyAll() {
final AtlasPatchHandler handlers[] = {
- new UniqueAttributePatch(context)
+ new UniqueAttributePatch(context),
+ new ClassificationTextPatch(context)
};
try {
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/ClassificationTextPatch.java b/repository/src/main/java/org/apache/atlas/repository/patches/ClassificationTextPatch.java
new file mode 100644
index 0000000..eeaf3d9
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/ClassificationTextPatch.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.patches;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.type.AtlasEntityType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
+
+public class ClassificationTextPatch extends AtlasPatchHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(ClassificationTextPatch.class);
+
+ private static final String PATCH_ID = "JAVA_PATCH_0000_002";
+ private static final String PATCH_DESCRIPTION = "Populates Classification Text attribute for entities from classifications applied on them.";
+
+ private final PatchContext context;
+
+ public ClassificationTextPatch(PatchContext context) {
+ super(context.getPatchRegistry(), PATCH_ID, PATCH_DESCRIPTION);
+
+ this.context = context;
+ }
+
+ @Override
+ public void apply() throws AtlasBaseException {
+ ConcurrentPatchProcessor patchProcessor = new ClassificationTextPatchProcessor(context);
+
+ patchProcessor.apply();
+
+ setStatus(APPLIED);
+
+ LOG.info("ClassificationTextPatch.apply(): patchId={}, status={}", getPatchId(), getStatus());
+ }
+
+ public static class ClassificationTextPatchProcessor extends ConcurrentPatchProcessor {
+
+ public ClassificationTextPatchProcessor(PatchContext context) {
+ super(context);
+ }
+
+ @Override
+ protected void processVertexItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) throws AtlasBaseException {
+ processItem(vertexId, vertex, typeName, entityType);
+ }
+
+ @Override
+ protected void prepareForExecution() {
+ //do nothing
+ }
+
+ protected void processItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) throws AtlasBaseException {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("processItem(typeName={}, vertexId={})", typeName, vertexId);
+ }
+
+ getEntityGraphMapper().updateClassificationText(vertex);
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("processItem(typeName={}, vertexId={}): Done!", typeName, vertexId);
+ }
+ }
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java b/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java
new file mode 100644
index 0000000..813a9c1
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.patches;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.pc.WorkItemBuilder;
+import org.apache.atlas.pc.WorkItemConsumer;
+import org.apache.atlas.pc.WorkItemManager;
+import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
+import org.apache.atlas.repository.graphdb.*;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+public abstract class ConcurrentPatchProcessor {
+ private static final Logger LOG = LoggerFactory.getLogger(ConcurrentPatchProcessor.class);
+
+ private static final String NUM_WORKERS_PROPERTY = "atlas.patch.numWorkers";
+ private static final String BATCH_SIZE_PROPERTY = "atlas.patch.batchSize";
+ private static final String ATLAS_SOLR_SHARDS = "ATLAS_SOLR_SHARDS";
+ private static final String WORKER_NAME_PREFIX = "patchWorkItem";
+ private static final int NUM_WORKERS;
+ private static final int BATCH_SIZE;
+ private final EntityGraphMapper entityGraphMapper;
+
+ public AtlasGraph getGraph() {
+ return graph;
+ }
+
+ public GraphBackedSearchIndexer getIndexer() {
+ return indexer;
+ }
+
+ public AtlasTypeRegistry getTypeRegistry() {
+ return typeRegistry;
+ }
+
+ private final AtlasGraph graph;
+ private final GraphBackedSearchIndexer indexer;
+ private final AtlasTypeRegistry typeRegistry;
+
+ static {
+ int numWorkers = 3;
+ int batchSize = 300;
+
+ try {
+ numWorkers = ApplicationProperties.get().getInt(NUM_WORKERS_PROPERTY, getDefaultNumWorkers());
+ batchSize = ApplicationProperties.get().getInt(BATCH_SIZE_PROPERTY, 300);
+
+ LOG.info("UniqueAttributePatch: {}={}, {}={}", NUM_WORKERS_PROPERTY, numWorkers, BATCH_SIZE_PROPERTY, batchSize);
+ } catch (Exception e) {
+ LOG.error("Error retrieving configuration.", e);
+ }
+
+ NUM_WORKERS = numWorkers;
+ BATCH_SIZE = batchSize;
+ }
+
+ private static int getDefaultNumWorkers() throws AtlasException {
+ return ApplicationProperties.get().getInt(ATLAS_SOLR_SHARDS, 1) * 3;
+ }
+
+ public ConcurrentPatchProcessor(PatchContext context) {
+ this.graph = context.getGraph();
+ this.indexer = context.getIndexer();
+ this.typeRegistry = context.getTypeRegistry();
+ this.entityGraphMapper = context.getEntityGraphMapper();
+ }
+
+ public EntityGraphMapper getEntityGraphMapper() {
+ return entityGraphMapper;
+ }
+ public void apply() throws AtlasBaseException {
+ prepareForExecution();
+ execute();
+ }
+
+ private void execute() {
+ Iterable<Object> iterable = graph.query().vertexIds();
+ WorkItemManager manager = new WorkItemManager(
+ new ConsumerBuilder(graph, typeRegistry, this), WORKER_NAME_PREFIX,
+ BATCH_SIZE, NUM_WORKERS, false);
+ try {
+ for (Iterator<Object> iter = iterable.iterator(); iter.hasNext(); ) {
+ Object vertexId = iter.next();
+ submitForProcessing((Long) vertexId, manager);
+ }
+
+ manager.drain();
+ } finally {
+ try {
+ manager.shutdown();
+ } catch (InterruptedException e) {
+ LOG.error("ConcurrentPatchProcessor.execute(): interrupted during WorkItemManager shutdown.", e);
+ }
+ }
+ }
+
+ private void submitForProcessing(Long vertexId, WorkItemManager manager) {
+ manager.checkProduce(vertexId);
+ }
+
+ private static class ConsumerBuilder implements WorkItemBuilder<Consumer, Long> {
+ private final AtlasTypeRegistry typeRegistry;
+ private final AtlasGraph graph;
+ private final ConcurrentPatchProcessor patchItemProcessor;
+
+ public ConsumerBuilder(AtlasGraph graph, AtlasTypeRegistry typeRegistry, ConcurrentPatchProcessor patchItemProcessor) {
+ this.graph = graph;
+ this.typeRegistry = typeRegistry;
+ this.patchItemProcessor = patchItemProcessor;
+ }
+
+ @Override
+ public Consumer build(BlockingQueue<Long> queue) {
+ return new Consumer(graph, typeRegistry, queue, patchItemProcessor);
+ }
+ }
+
+ private static class Consumer extends WorkItemConsumer<Long> {
+ private int MAX_COMMIT_RETRY_COUNT = 3;
+ private final AtlasGraph graph;
+ private final AtlasTypeRegistry typeRegistry;
+
+ private final AtomicLong counter;
+ private final ConcurrentPatchProcessor individualItemProcessor;
+
+ public Consumer(AtlasGraph graph, AtlasTypeRegistry typeRegistry, BlockingQueue<Long> queue, ConcurrentPatchProcessor individualItemProcessor) {
+ super(queue);
+
+ this.graph = graph;
+ this.typeRegistry = typeRegistry;
+ this.counter = new AtomicLong(0);
+ this.individualItemProcessor = individualItemProcessor;
+ }
+
+ @Override
+ protected void doCommit() {
+ if (counter.get() % BATCH_SIZE == 0) {
+ LOG.info("Processed: {}", counter.get());
+
+ attemptCommit();
+ }
+ }
+
+ @Override
+ protected void commitDirty() {
+ attemptCommit();
+
+ LOG.info("Total: Commit: {}", counter.get());
+
+ super.commitDirty();
+ }
+
+ private void attemptCommit() {
+ for (int retryCount = 1; retryCount <= MAX_COMMIT_RETRY_COUNT; retryCount++) {
+ try {
+ graph.commit();
+
+ break;
+ } catch(Exception ex) {
+ LOG.error("Commit exception: ", retryCount, ex);
+
+ try {
+ Thread.currentThread().sleep(300 * retryCount);
+ } catch (InterruptedException e) {
+ LOG.error("Commit exception: Pause: Interrputed!", e);
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void processItem(Long vertexId) {
+ counter.incrementAndGet();
+ AtlasVertex vertex = graph.getVertex(Long.toString(vertexId));
+
+ if (vertex == null) {
+ LOG.warn("processItem(vertexId={}): AtlasVertex not found!", vertexId);
+
+ return;
+ }
+
+ if (AtlasGraphUtilsV2.isTypeVertex(vertex)) {
+ return;
+ }
+
+ if (AtlasGraphUtilsV2.getState(vertex) != AtlasEntity.Status.ACTIVE) {
+ return;
+ }
+
+ String typeName = AtlasGraphUtilsV2.getTypeName(vertex);
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
+ if (entityType == null) {
+ return;
+ }
+
+ try {
+ individualItemProcessor.processVertexItem(vertexId, vertex, typeName, entityType);
+ } catch (AtlasBaseException e) {
+ LOG.error("Error processing: {}", vertexId, e);
+ }
+ }
+ }
+
+ protected abstract void processVertexItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) throws AtlasBaseException;
+ protected abstract void prepareForExecution() throws AtlasBaseException;
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/PatchContext.java b/repository/src/main/java/org/apache/atlas/repository/patches/PatchContext.java
index 3508c74..c4b43d0 100644
--- a/repository/src/main/java/org/apache/atlas/repository/patches/PatchContext.java
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/PatchContext.java
@@ -20,6 +20,7 @@ package org.apache.atlas.repository.patches;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
import org.apache.atlas.type.AtlasTypeRegistry;
public class PatchContext {
@@ -27,12 +28,14 @@ public class PatchContext {
private final AtlasTypeRegistry typeRegistry;
private final GraphBackedSearchIndexer indexer;
private final AtlasPatchRegistry patchRegistry;
+ private final EntityGraphMapper entityGraphMapper;
- public PatchContext(AtlasGraph graph, AtlasTypeRegistry typeRegistry, GraphBackedSearchIndexer indexer) {
+ public PatchContext(AtlasGraph graph, AtlasTypeRegistry typeRegistry, GraphBackedSearchIndexer indexer, EntityGraphMapper entityGraphMapper) {
this.graph = graph;
this.typeRegistry = typeRegistry;
this.indexer = indexer;
this.patchRegistry = new AtlasPatchRegistry(this.graph);
+ this.entityGraphMapper = entityGraphMapper;
}
public AtlasGraph getGraph() {
@@ -50,4 +53,5 @@ public class PatchContext {
public AtlasPatchRegistry getPatchRegistry() {
return patchRegistry;
}
+ public EntityGraphMapper getEntityGraphMapper() { return entityGraphMapper;}
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
index 9338e74..bd5f525 100644
--- a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
@@ -19,16 +19,11 @@ package org.apache.atlas.repository.patches;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
-import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
-import org.apache.atlas.pc.WorkItemBuilder;
-import org.apache.atlas.pc.WorkItemConsumer;
-import org.apache.atlas.pc.WorkItemManager;
import org.apache.atlas.repository.IndexException;
-import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer.UniqueKind;
import org.apache.atlas.repository.graphdb.AtlasCardinality;
-import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
import org.apache.atlas.repository.graphdb.AtlasVertex;
@@ -36,15 +31,11 @@ import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
-import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
-import java.util.Iterator;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex;
@@ -64,8 +55,8 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
}
@Override
- public void apply() {
- UniqueAttributePatchProcessor patchProcessor = new UniqueAttributePatchProcessor(context);
+ public void apply() throws AtlasBaseException {
+ ConcurrentPatchProcessor patchProcessor = new UniqueAttributePatchProcessor(context);
patchProcessor.apply();
@@ -74,17 +65,13 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
LOG.info("UniqueAttributePatch.apply(): patchId={}, status={}", getPatchId(), getStatus());
}
- public static class UniqueAttributePatchProcessor {
+ public static class UniqueAttributePatchProcessor extends ConcurrentPatchProcessor {
private static final String NUM_WORKERS_PROPERTY = "atlas.patch.unique_attribute_patch.numWorkers";
private static final String BATCH_SIZE_PROPERTY = "atlas.patch.unique_attribute_patch.batchSize";
private static final String ATLAS_SOLR_SHARDS = "ATLAS_SOLR_SHARDS";
private static final int NUM_WORKERS;
private static final int BATCH_SIZE;
- private final AtlasGraph graph;
- private final GraphBackedSearchIndexer indexer;
- private final AtlasTypeRegistry typeRegistry;
-
static {
int numWorkers = 3;
int batchSize = 300;
@@ -103,37 +90,23 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
}
public UniqueAttributePatchProcessor(PatchContext context) {
- this.graph = context.getGraph();
- this.indexer = context.getIndexer();
- this.typeRegistry = context.getTypeRegistry();
+ super(context);
}
- public void apply() {
- createIndexForUniqueAttributes();
- addUniqueAttributeToAllVertices();
+ @Override
+ protected void processVertexItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) {
+ //process the vertex
+ processItem(vertexId, vertex, typeName, entityType);
}
- private void addUniqueAttributeToAllVertices() {
- Iterable<Object> iterable = graph.query().vertexIds();
- WorkItemManager manager = new WorkItemManager(new ConsumerBuilder(graph, typeRegistry), BATCH_SIZE, NUM_WORKERS);
- try {
- for (Iterator<Object> iter = iterable.iterator(); iter.hasNext(); ) {
- Object vertexId = iter.next();
- submitForProcessing((Long) vertexId, manager);
- }
-
- manager.drain();
- } finally {
- try {
- manager.shutdown();
- } catch (InterruptedException e) {
- LOG.error("UniqueAttributePatchProcessor.apply(): interrupted during WorkItemManager shutdown", e);
- }
- }
+ @Override
+ protected void prepareForExecution() {
+ //create the new attribute for all unique attributes.
+ createIndexForUniqueAttributes();
}
private void createIndexForUniqueAttributes() {
- for (AtlasEntityType entityType : typeRegistry.getAllEntityTypes()) {
+ for (AtlasEntityType entityType : getTypeRegistry().getAllEntityTypes()) {
String typeName = entityType.getTypeName();
Collection<AtlasAttribute> uniqAttributes = entityType.getUniqAttributes().values();
@@ -150,7 +123,7 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
private void createIndexForUniqueAttributes(String typeName, Collection<AtlasAttribute> attributes) {
try {
- AtlasGraphManagement management = graph.getManagementSystem();
+ AtlasGraphManagement management = getGraph().getManagementSystem();
for (AtlasAttribute attribute : attributes) {
String uniquePropertyName = attribute.getVertexUniquePropertyName();
@@ -162,14 +135,14 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
AtlasAttributeDef attributeDef = attribute.getAttributeDef();
boolean isIndexable = attributeDef.getIsIndexable();
String attribTypeName = attributeDef.getTypeName();
- Class propertyClass = indexer.getPrimitiveClass(attribTypeName);
- AtlasCardinality cardinality = indexer.toAtlasCardinality(attributeDef.getCardinality());
+ Class propertyClass = getIndexer().getPrimitiveClass(attribTypeName);
+ AtlasCardinality cardinality = getIndexer().toAtlasCardinality(attributeDef.getCardinality());
- indexer.createVertexIndex(management, uniquePropertyName, UniqueKind.PER_TYPE_UNIQUE, propertyClass, cardinality, isIndexable, true);
+ getIndexer().createVertexIndex(management, uniquePropertyName, UniqueKind.PER_TYPE_UNIQUE, propertyClass, cardinality, isIndexable, true);
}
- indexer.commit(management);
- graph.commit();
+ getIndexer().commit(management);
+ getGraph().commit();
LOG.info("Unique attributes: type: {}: Registered!", typeName);
} catch (IndexException e) {
@@ -181,134 +154,29 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
return ApplicationProperties.get().getInt(ATLAS_SOLR_SHARDS, 1) * 3;
}
- private void submitForProcessing(Long vertexId, WorkItemManager manager) {
- manager.checkProduce(vertexId);
- }
-
- private static class ConsumerBuilder implements WorkItemBuilder<Consumer, Long> {
- private final AtlasTypeRegistry typeRegistry;
- private final AtlasGraph graph;
-
- public ConsumerBuilder(AtlasGraph graph, AtlasTypeRegistry typeRegistry) {
- this.graph = graph;
- this.typeRegistry = typeRegistry;
- }
-
- @Override
- public Consumer build(BlockingQueue<Long> queue) {
- return new Consumer(graph, typeRegistry, queue);
- }
- }
-
- private static class Consumer extends WorkItemConsumer<Long> {
- private int MAX_COMMIT_RETRY_COUNT = 3;
- private final AtlasGraph graph;
- private final AtlasTypeRegistry typeRegistry;
-
- private final AtomicLong counter;
-
- public Consumer(AtlasGraph graph, AtlasTypeRegistry typeRegistry, BlockingQueue<Long> queue) {
- super(queue);
-
- this.graph = graph;
- this.typeRegistry = typeRegistry;
- this.counter = new AtomicLong(0);
- }
-
- @Override
- protected void doCommit() {
- if (counter.get() % BATCH_SIZE == 0) {
- LOG.info("Processed: {}", counter.get());
-
- attemptCommit();
- }
- }
+ protected void processItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) {
+ LOG.debug("processItem(typeName={}, vertexId={})", typeName, vertexId);
- @Override
- protected void commitDirty() {
- attemptCommit();
+ for (AtlasAttribute attribute : entityType.getUniqAttributes().values()) {
+ String uniquePropertyKey = attribute.getVertexUniquePropertyName();
+ Collection<? extends String> propertyKeys = vertex.getPropertyKeys();
+ Object uniqAttrValue = null;
- LOG.info("Total: Commit: {}", counter.get());
-
- super.commitDirty();
- }
-
- private void attemptCommit() {
- for (int retryCount = 1; retryCount <= MAX_COMMIT_RETRY_COUNT; retryCount++) {
+ if (propertyKeys == null || !propertyKeys.contains(uniquePropertyKey)) {
try {
- graph.commit();
+ String propertyKey = attribute.getVertexPropertyName();
- break;
- } catch(Exception ex) {
- LOG.error("Commit exception: ", retryCount, ex);
+ uniqAttrValue = EntityGraphRetriever.mapVertexToPrimitive(vertex, propertyKey, attribute.getAttributeDef());
- try {
- Thread.currentThread().sleep(300 * retryCount);
- } catch (InterruptedException e) {
- LOG.error("Commit exception: Pause: Interrputed!", e);
- }
+ AtlasGraphUtilsV2.setEncodedProperty(vertex, uniquePropertyKey, uniqAttrValue);
+ } catch(AtlasSchemaViolationException ex) {
+ LOG.error("Duplicates detected: {}:{}:{}", typeName, uniqAttrValue, getIdFromVertex(vertex));
+ vertex.removeProperty(uniquePropertyKey);
}
}
}
- @Override
- protected void processItem(Long vertexId) {
- AtlasVertex vertex = graph.getVertex(Long.toString(vertexId));
-
- if (vertex == null) {
- LOG.warn("processItem(vertexId={}): AtlasVertex not found!", vertexId);
-
- return;
- }
-
- if (AtlasGraphUtilsV2.isTypeVertex(vertex)) {
- return;
- }
-
- if (AtlasGraphUtilsV2.getState(vertex) != AtlasEntity.Status.ACTIVE) {
- return;
- }
-
- String typeName = AtlasGraphUtilsV2.getTypeName(vertex);
- AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
- if (entityType == null) {
- return;
- }
-
- processItem(vertexId, vertex, typeName, entityType);
- }
-
- private void processItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) {
- try {
- counter.incrementAndGet();
- LOG.debug("processItem(typeName={}, vertexId={})", typeName, vertexId);
-
- for (AtlasAttribute attribute : entityType.getUniqAttributes().values()) {
- String uniquePropertyKey = attribute.getVertexUniquePropertyName();
- Collection<? extends String> propertyKeys = vertex.getPropertyKeys();
- Object uniqAttrValue = null;
-
- if (propertyKeys == null || !propertyKeys.contains(uniquePropertyKey)) {
- try {
- String propertyKey = attribute.getVertexPropertyName();
-
- uniqAttrValue = EntityGraphRetriever.mapVertexToPrimitive(vertex, propertyKey, attribute.getAttributeDef());
-
- AtlasGraphUtilsV2.setEncodedProperty(vertex, uniquePropertyKey, uniqAttrValue);
- } catch(AtlasSchemaViolationException ex) {
- LOG.error("Duplicates detected: {}:{}:{}", typeName, uniqAttrValue, getIdFromVertex(vertex));
- vertex.removeProperty(uniquePropertyKey);
- }
- }
- }
-
- LOG.debug("processItem(typeName={}, vertexId={}): Done!", typeName, vertexId);
- } catch (Exception ex) {
- LOG.error("processItem(typeName={}, vertexId={}): failed!", typeName, vertexId, ex);
- } finally {
- commit();
- }
- }
+ LOG.debug("processItem(typeName={}, vertexId={}): Done!", typeName, vertexId);
}
}
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index 66eeded..64f5503 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -1675,16 +1675,22 @@ public class EntityGraphMapper {
updateModificationMetadata(entityVertex);
for (Map.Entry<AtlasVertex, List<AtlasClassification>> entry : removedClassifications.entrySet()) {
- AtlasVertex vertex = entry.getKey();
- String guid = GraphHelper.getGuid(vertex);
- List<AtlasClassification> deletedClassificationNames = entry.getValue();
- AtlasEntity entity = instanceConverter.getAndCacheEntity(guid);
+ AtlasEntity entity = updateClassificationText(entry.getKey());
+
- vertex.setProperty(CLASSIFICATION_TEXT_KEY, fullTextMapperV2.getClassificationTextForEntity(entity));
+ List<AtlasClassification> deletedClassificationNames = entry.getValue();
entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassificationNames);
}
}
+ public AtlasEntity updateClassificationText(AtlasVertex vertex) throws AtlasBaseException {
+ String guid = GraphHelper.getGuid(vertex);
+ AtlasEntity entity = instanceConverter.getAndCacheEntity(guid);
+
+ vertex.setProperty(CLASSIFICATION_TEXT_KEY, fullTextMapperV2.getClassificationTextForEntity(entity));
+ return entity;
+ }
+
public void updateClassifications(EntityMutationContext context, String guid, List<AtlasClassification> classifications) throws AtlasBaseException {
if (CollectionUtils.isEmpty(classifications)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_CLASSIFICATION_PARAMS, "update", guid);