You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2020/03/10 16:38:44 UTC
[atlas] branch master updated: ATLAS-3320: Migration Import
implementation.
This is an automated email from the ASF dual-hosted git repository.
amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 765ea58 ATLAS-3320: Migration Import implementation.
765ea58 is described below
commit 765ea583b23d25ecc919d520092747c4158466a5
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Thu Mar 5 16:06:56 2020 -0800
ATLAS-3320: Migration Import implementation.
---
.../repository/graphdb/janus/AtlasJanusGraph.java | 2 +-
.../atlas/model/impexp/AtlasImportRequest.java | 21 ++
.../apache/atlas/GraphTransactionInterceptor.java | 4 +
.../atlas/repository/graph/FullTextMapperV2.java | 9 +-
.../atlas/repository/graph/IFullTextMapper.java | 45 +++++
.../atlas/repository/impexp/ImportService.java | 5 +-
.../atlas/repository/impexp/ZipSourceDirect.java | 8 +
.../migration/ZipFileMigrationImporter.java | 61 +++++-
.../repository/store/graph/AtlasEntityStore.java | 8 +
.../store/graph/v2/AtlasEntityChangeNotifier.java | 13 +-
.../store/graph/v2/AtlasEntityStoreV2.java | 9 +-
.../store/graph/v2/AtlasRelationshipStoreV2.java | 4 +-
.../store/graph/v2/BulkImporterImpl.java | 224 +++++----------------
.../store/graph/v2/EntityGraphMapper.java | 10 +-
.../store/graph/v2/IAtlasEntityChangeNotifier.java | 54 +++++
.../v2/bulkimport/EntityChangeNotifierNop.java | 88 ++++++++
.../graph/v2/bulkimport/FullTextMapperV2Nop.java | 57 ++++++
.../store/graph/v2/bulkimport/ImportStrategy.java | 28 +++
.../store/graph/v2/bulkimport/MigrationImport.java | 124 ++++++++++++
.../RegularImport.java} | 77 +++----
.../graph/v2/bulkimport/pc/EntityConsumer.java | 209 +++++++++++++++++++
.../v2/bulkimport/pc/EntityConsumerBuilder.java | 50 +++++
.../v2/bulkimport/pc/EntityCreationManager.java | 126 ++++++++++++
.../test/java/org/apache/atlas/TestModules.java | 5 +
24 files changed, 987 insertions(+), 254 deletions(-)
diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
index 4acb371..0176ba7 100644
--- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
+++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
@@ -116,7 +116,7 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE
}
}
- janusGraph = (StandardJanusGraph) AtlasJanusGraphDatabase.getGraphInstance();
+ janusGraph = (StandardJanusGraph) graphInstance;
}
@Override
diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
index 3362bf1..09dafdf 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
@@ -44,6 +44,9 @@ public class AtlasImportRequest implements Serializable {
public static final String TRANSFORMS_KEY = "transforms";
public static final String TRANSFORMERS_KEY = "transformers";
public static final String OPTION_KEY_REPLICATED_FROM = "replicatedFrom";
+ public static final String OPTION_KEY_MIGRATION = "migration";
+ public static final String OPTION_KEY_NUM_WORKERS = "numWorkers";
+ public static final String OPTION_KEY_BATCH_SIZE = "batchSize";
public static final String OPTION_KEY_FORMAT = "format";
public static final String OPTION_KEY_FORMAT_ZIP_DIRECT = "zipDirect";
private static final String START_POSITION_KEY = "startPosition";
@@ -124,6 +127,24 @@ public class AtlasImportRequest implements Serializable {
return isReplicationOptionSet() ? options.get(OPTION_KEY_REPLICATED_FROM) : StringUtils.EMPTY;
}
+ @JsonIgnore
+ public int getOptionKeyNumWorkers() {
+ return getOptionsValue(OPTION_KEY_NUM_WORKERS, 1);
+ }
+
+ @JsonIgnore
+ public int getOptionKeyBatchSize() {
+ return getOptionsValue(OPTION_KEY_BATCH_SIZE, 1);
+ }
+
+ private int getOptionsValue(String optionKeyBatchSize, int defaultValue) {
+ String optionsValue = getOptionForKey(optionKeyBatchSize);
+
+ return StringUtils.isEmpty(optionsValue) ?
+ defaultValue :
+ Integer.valueOf(optionsValue);
+ }
+
@JsonAnySetter
public void setOption(String key, String value) {
if (null == options) {
diff --git a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
index bbe0dc5..57e454a 100644
--- a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
+++ b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
@@ -199,6 +199,10 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
return cache.get(guid);
}
+ public static void clearCache() {
+ guidVertexCache.get().clear();
+ }
+
boolean logException(Throwable t) {
if (t instanceof AtlasBaseException) {
Response.Status httpCode = ((AtlasBaseException) t).getAtlasErrorCode().getHttpCode();
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java b/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java
index 0f2b4bf..417c96c 100644
--- a/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java
@@ -53,7 +53,7 @@ import java.util.Set;
@Component
-public class FullTextMapperV2 {
+public class FullTextMapperV2 implements IFullTextMapper {
private static final Logger LOG = LoggerFactory.getLogger(FullTextMapperV2.class);
private static final String FULL_TEXT_DELIMITER = " ";
@@ -84,6 +84,8 @@ public class FullTextMapperV2 {
* @return Full text string ONLY for the added classifications
* @throws AtlasBaseException
*/
+
+ @Override
public String getIndexTextForClassifications(String guid, List<AtlasClassification> classifications) throws AtlasBaseException {
String ret = null;
final AtlasEntityWithExtInfo entityWithExtInfo;
@@ -120,6 +122,7 @@ public class FullTextMapperV2 {
return ret;
}
+ @Override
public String getIndexTextForEntity(String guid) throws AtlasBaseException {
String ret = null;
final AtlasEntity entity;
@@ -150,6 +153,7 @@ public class FullTextMapperV2 {
return ret;
}
+ @Override
public String getClassificationTextForEntity(AtlasEntity entity) throws AtlasBaseException {
String ret = null;
@@ -271,10 +275,12 @@ public class FullTextMapperV2 {
}
}
+ @Override
public AtlasEntity getAndCacheEntity(String guid) throws AtlasBaseException {
return getAndCacheEntity(guid, true);
}
+ @Override
public AtlasEntity getAndCacheEntity(String guid, boolean includeReferences) throws AtlasBaseException {
RequestContext context = RequestContext.get();
AtlasEntity entity = context.getEntity(guid);
@@ -294,6 +300,7 @@ public class FullTextMapperV2 {
return entity;
}
+ @Override
public AtlasEntityWithExtInfo getAndCacheEntityWithExtInfo(String guid) throws AtlasBaseException {
RequestContext context = RequestContext.get();
AtlasEntityWithExtInfo entityWithExtInfo = context.getEntityWithExtInfo(guid);
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/IFullTextMapper.java b/repository/src/main/java/org/apache/atlas/repository/graph/IFullTextMapper.java
new file mode 100644
index 0000000..2bbf4d8
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/IFullTextMapper.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.graph;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+
+import java.util.List;
+
+public interface IFullTextMapper {
+ /**
+ * Map newly associated/defined classifications for the entity with given GUID
+ * @param guid Entity guid
+ * @param classifications new classifications added to the entity
+ * @return Full text string ONLY for the added classifications
+ * @throws AtlasBaseException
+ */
+ String getIndexTextForClassifications(String guid, List<AtlasClassification> classifications) throws AtlasBaseException;
+
+ String getIndexTextForEntity(String guid) throws AtlasBaseException;
+
+ String getClassificationTextForEntity(AtlasEntity entity) throws AtlasBaseException;
+
+ AtlasEntity getAndCacheEntity(String guid) throws AtlasBaseException;
+
+ AtlasEntity getAndCacheEntity(String guid, boolean includeReferences) throws AtlasBaseException;
+
+ AtlasEntity.AtlasEntityWithExtInfo getAndCacheEntityWithExtInfo(String guid) throws AtlasBaseException;
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index 1964ade..c18c4ab 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -250,8 +250,9 @@ public class ImportService {
private EntityImportStream createZipSource(AtlasImportRequest request, InputStream inputStream, String configuredTemporaryDirectory) throws AtlasBaseException {
try {
- if (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_FORMAT) &&
- request.getOptions().get(AtlasImportRequest.OPTION_KEY_FORMAT).equals(AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT) ) {
+ if (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION) || (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_FORMAT) &&
+ request.getOptions().get(AtlasImportRequest.OPTION_KEY_FORMAT).equals(AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT))) {
+ LOG.info("ZipSource Format: ZipDirect: Size: {}", request.getOptions().get("size"));
return getZipDirectEntityImportStream(request, inputStream);
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
index cb5a7ac..75b8e9e 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
@@ -64,6 +64,10 @@ public class ZipSourceDirect implements EntityImportStream {
this.zipInputStream = new ZipInputStream(inputStream);
this.streamSize = streamSize;
prepareStreamForFetch();
+
+ if (this.streamSize == 1) {
+ LOG.info("ZipSourceDirect: Stream Size set to: {}. This will cause inaccurate percentage reporting.", this.streamSize);
+ }
}
@Override
@@ -226,6 +230,10 @@ public class ZipSourceDirect implements EntityImportStream {
}
public int size() {
+ if (this.streamSize == 1) {
+ return currentPosition;
+ }
+
return this.streamSize;
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
index f552525..35a76ea 100644
--- a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
+++ b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
@@ -24,6 +24,8 @@ import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.repository.impexp.ImportService;
+import org.apache.atlas.type.AtlasType;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,11 +34,20 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
+import java.util.Map;
+import java.util.zip.ZipFile;
public class ZipFileMigrationImporter implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ZipFileMigrationImporter.class);
- private static String ENV_USER_NAME = "user.name";
+ private static final String APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS = "atlas.migration.mode.workers";
+ private static final String APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE = "atlas.migration.mode.batch.size";
+ private static final String DEFAULT_NUMBER_OF_WORKERS = "4";
+ private static final String DEFAULT_BATCH_SIZE = "100";
+ private static final String ZIP_FILE_COMMENT_ENTITIES_COUNT = "entitiesCount";
+ private static final String ZIP_FILE_COMMENT_TOTAL_COUNT = "total";
+
+ private final static String ENV_USER_NAME = "user.name";
private final ImportService importService;
private final String fileToImport;
@@ -52,7 +63,8 @@ public class ZipFileMigrationImporter implements Runnable {
FileWatcher fileWatcher = new FileWatcher(fileToImport);
fileWatcher.start();
- performImport(new FileInputStream(new File(fileToImport)));
+ int streamSize = getStreamSizeFromComment(fileToImport);
+ performImport(new FileInputStream(new File(fileToImport)), streamSize);
} catch (IOException e) {
LOG.error("Migration Import: IO Error!", e);
} catch (AtlasBaseException e) {
@@ -60,19 +72,46 @@ public class ZipFileMigrationImporter implements Runnable {
}
}
- private void performImport(InputStream fs) throws AtlasBaseException {
+ private int getStreamSizeFromComment(String fileToImport) {
+ int ret = 1;
+ try {
+ ZipFile zipFile = new ZipFile(fileToImport);
+ String comment = zipFile.getComment();
+ ret = processZipFileStreamSizeComment(comment);
+ zipFile.close();
+ } catch (IOException e) {
+ LOG.error("Error opening ZIP file: {}", fileToImport, e);
+ }
+
+ return ret;
+ }
+
+ private int processZipFileStreamSizeComment(String comment) {
+ if (StringUtils.isEmpty(comment)) {
+ return 1;
+ }
+
+ Map map = AtlasType.fromJson(comment, Map.class);
+ int entitiesCount = (int) map.get(ZIP_FILE_COMMENT_ENTITIES_COUNT);
+ int totalCount = (int) map.get(ZIP_FILE_COMMENT_TOTAL_COUNT);
+ LOG.info("ZipFileMigrationImporter: Zip file: Comment: streamSize: {}: total: {}", entitiesCount, totalCount);
+
+ return entitiesCount;
+ }
+
+ private void performImport(InputStream fs, int streamSize) throws AtlasBaseException {
try {
LOG.info("Migration Import: {}: Starting...", fileToImport);
RequestContext.get().setUser(getUserNameFromEnvironment(), null);
- importService.run(fs, getImportRequest(),
+ importService.run(fs, getImportRequest(streamSize),
getUserNameFromEnvironment(),
InetAddress.getLocalHost().getHostName(),
InetAddress.getLocalHost().getHostAddress());
} catch (Exception ex) {
- LOG.error("Error loading zip for migration", ex);
+ LOG.error("Migration Import: Error loading zip for migration!", ex);
throw new AtlasBaseException(ex);
} finally {
LOG.info("Migration Import: {}: Done!", fileToImport);
@@ -83,9 +122,17 @@ public class ZipFileMigrationImporter implements Runnable {
return System.getProperty(ENV_USER_NAME);
}
- private AtlasImportRequest getImportRequest() throws AtlasException {
+ private AtlasImportRequest getImportRequest(int streamSize) throws AtlasException {
AtlasImportRequest request = new AtlasImportRequest();
- request.setOption(AtlasImportRequest.OPTION_KEY_FORMAT, AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT);
+
+ request.setSizeOption(streamSize);
+ request.setOption(AtlasImportRequest.OPTION_KEY_MIGRATION, "true");
+ request.setOption(AtlasImportRequest.OPTION_KEY_NUM_WORKERS, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS, DEFAULT_NUMBER_OF_WORKERS));
+ request.setOption(AtlasImportRequest.OPTION_KEY_BATCH_SIZE, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE, DEFAULT_BATCH_SIZE));
+
return request;
}
+ private String getPropertyValue(String property, String defaultValue) throws AtlasException {
+ return ApplicationProperties.get().getString(property, defaultValue);
+ }
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
index d33f404..834c9d1 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
@@ -150,6 +150,14 @@ public interface AtlasEntityStore {
EntityMutationResponse createOrUpdateForImport(EntityStream entityStream) throws AtlasBaseException;
/**
+ * Create or update entities with parameters necessary for import process without commit. Caller will have to do take care of commit.
+ * @param entityStream AtlasEntityStream
+ * @return EntityMutationResponse Entity mutations operations with the corresponding set of entities on which these operations were performed
+ * @throws AtlasBaseException
+ */
+ EntityMutationResponse createOrUpdateForImportNoCommit(EntityStream entityStream) throws AtlasBaseException;
+
+ /**
* Update a single entity
* @param objectId ID of the entity
* @param updatedEntityInfo updated entity information
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
index d7020a7..00c0114 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
@@ -66,7 +66,7 @@ import static org.apache.atlas.repository.Constants.ENTITY_TEXT_PROPERTY_KEY;
@Component
-public class AtlasEntityChangeNotifier {
+public class AtlasEntityChangeNotifier implements IAtlasEntityChangeNotifier {
private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityChangeNotifier.class);
private final Set<EntityChangeListener> entityChangeListeners;
@@ -91,6 +91,7 @@ public class AtlasEntityChangeNotifier {
this.isV2EntityNotificationEnabled = AtlasRepositoryConfiguration.isV2EntityNotificationEnabled();
}
+ @Override
public void onEntitiesMutated(EntityMutationResponse entityMutationResponse, boolean isImport) throws AtlasBaseException {
if (CollectionUtils.isEmpty(entityChangeListeners)) {
return;
@@ -119,6 +120,7 @@ public class AtlasEntityChangeNotifier {
notifyPropagatedEntities();
}
+ @Override
public void notifyRelationshipMutation(AtlasRelationship relationship, EntityNotification.EntityNotificationV2.OperationType operationType) throws AtlasBaseException {
if (CollectionUtils.isEmpty(entityChangeListeners)) {
return;
@@ -137,6 +139,7 @@ public class AtlasEntityChangeNotifier {
}
}
+ @Override
public void onClassificationAddedToEntity(AtlasEntity entity, List<AtlasClassification> addedClassifications) throws AtlasBaseException {
if (isV2EntityNotificationEnabled) {
doFullTextMapping(entity.getGuid());
@@ -166,6 +169,7 @@ public class AtlasEntityChangeNotifier {
}
}
+ @Override
public void onClassificationsAddedToEntities(List<AtlasEntity> entities, List<AtlasClassification> addedClassifications) throws AtlasBaseException {
if (isV2EntityNotificationEnabled) {
doFullTextMappingHelper(entities);
@@ -201,6 +205,7 @@ public class AtlasEntityChangeNotifier {
}
}
+ @Override
public void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException {
doFullTextMapping(entity.getGuid());
@@ -228,6 +233,7 @@ public class AtlasEntityChangeNotifier {
}
}
+ @Override
public void onClassificationDeletedFromEntity(AtlasEntity entity, List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
doFullTextMapping(entity.getGuid());
@@ -255,6 +261,7 @@ public class AtlasEntityChangeNotifier {
}
}
+ @Override
public void onClassificationsDeletedFromEntities(List<AtlasEntity> entities, List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
doFullTextMappingHelper(entities);
@@ -288,6 +295,7 @@ public class AtlasEntityChangeNotifier {
}
}
+ @Override
public void onTermAddedToEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
// listeners notified on term-entity association only if v2 notifications are enabled
if (isV2EntityNotificationEnabled) {
@@ -307,6 +315,7 @@ public class AtlasEntityChangeNotifier {
}
}
+ @Override
public void onTermDeletedFromEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
// listeners notified on term-entity disassociation only if v2 notifications are enabled
if (isV2EntityNotificationEnabled) {
@@ -326,6 +335,7 @@ public class AtlasEntityChangeNotifier {
}
}
+ @Override
public void onLabelsUpdatedFromEntity(String entityGuid, Set<String> addedLabels, Set<String> deletedLabels) throws AtlasBaseException {
doFullTextMapping(entityGuid);
@@ -339,6 +349,7 @@ public class AtlasEntityChangeNotifier {
}
}
+ @Override
public void notifyPropagatedEntities() throws AtlasBaseException {
RequestContext context = RequestContext.get();
Map<String, List<AtlasClassification>> addedPropagations = context.getAddedPropagations();
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index 6de57e3..1e40f48 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -90,13 +90,13 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
private final DeleteHandlerDelegate deleteDelegate;
private final AtlasTypeRegistry typeRegistry;
- private final AtlasEntityChangeNotifier entityChangeNotifier;
+ private final IAtlasEntityChangeNotifier entityChangeNotifier;
private final EntityGraphMapper entityGraphMapper;
private final EntityGraphRetriever entityRetriever;
@Inject
public AtlasEntityStoreV2(DeleteHandlerDelegate deleteDelegate, AtlasTypeRegistry typeRegistry,
- AtlasEntityChangeNotifier entityChangeNotifier, EntityGraphMapper entityGraphMapper) {
+ IAtlasEntityChangeNotifier entityChangeNotifier, EntityGraphMapper entityGraphMapper) {
this.deleteDelegate = deleteDelegate;
this.typeRegistry = typeRegistry;
this.entityChangeNotifier = entityChangeNotifier;
@@ -332,6 +332,11 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
}
@Override
+ public EntityMutationResponse createOrUpdateForImportNoCommit(EntityStream entityStream) throws AtlasBaseException {
+ return createOrUpdate(entityStream, false, true, true);
+ }
+
+ @Override
@GraphTransaction
public EntityMutationResponse updateEntity(AtlasObjectId objectId, AtlasEntityWithExtInfo updatedEntityInfo, boolean isPartialUpdate) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
index fdf117a..6a75ba9 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
@@ -99,10 +99,10 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
private final EntityGraphRetriever entityRetriever;
private final DeleteHandlerDelegate deleteDelegate;
private final GraphHelper graphHelper = GraphHelper.getInstance();
- private final AtlasEntityChangeNotifier entityChangeNotifier;
+ private final IAtlasEntityChangeNotifier entityChangeNotifier;
@Inject
- public AtlasRelationshipStoreV2(AtlasTypeRegistry typeRegistry, DeleteHandlerDelegate deleteDelegate, AtlasEntityChangeNotifier entityChangeNotifier) {
+ public AtlasRelationshipStoreV2(AtlasTypeRegistry typeRegistry, DeleteHandlerDelegate deleteDelegate, IAtlasEntityChangeNotifier entityChangeNotifier) {
this.typeRegistry = typeRegistry;
this.entityRetriever = new EntityGraphRetriever(typeRegistry);
this.deleteDelegate = deleteDelegate;
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
index 54c32c5..a4d732a 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
@@ -18,33 +18,29 @@
package org.apache.atlas.repository.store.graph.v2;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.atlas.AtlasConfiguration;
-import org.apache.atlas.AtlasErrorCode;
-import org.apache.atlas.RequestContext;
-import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
-import org.apache.atlas.repository.Constants;
-import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.BulkImporter;
+import org.apache.atlas.repository.store.graph.v2.bulkimport.ImportStrategy;
+import org.apache.atlas.repository.store.graph.v2.bulkimport.MigrationImport;
+import org.apache.atlas.repository.store.graph.v2.bulkimport.RegularImport;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.type.Constants;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -55,131 +51,27 @@ public class BulkImporterImpl implements BulkImporter {
private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV2.class);
private final AtlasEntityStore entityStore;
- private final EntityGraphRetriever entityGraphRetriever;
private AtlasTypeRegistry typeRegistry;
- private final int MAX_ATTEMPTS = 2;
- private boolean directoryBasedImportConfigured;
@Inject
public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
this.entityStore = entityStore;
- this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
this.typeRegistry = typeRegistry;
- this.directoryBasedImportConfigured = StringUtils.isNotEmpty(AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
}
@Override
public EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> bulkImport()");
- }
-
- if (entityStream == null || !entityStream.hasNext()) {
- throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
- }
-
- EntityMutationResponse ret = new EntityMutationResponse();
- ret.setGuidAssignments(new HashMap<>());
-
- Set<String> processedGuids = new HashSet<>();
- float currentPercent = 0f;
- List<String> residualList = new ArrayList<>();
-
- EntityImportStreamWithResidualList entityImportStreamWithResidualList = new EntityImportStreamWithResidualList(entityStream, residualList);
-
- while (entityImportStreamWithResidualList.hasNext()) {
- AtlasEntityWithExtInfo entityWithExtInfo = entityImportStreamWithResidualList.getNextEntityWithExtInfo();
- AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
-
- if (entity == null) {
- continue;
- }
-
- for (int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) {
- try {
- AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null);
- EntityMutationResponse resp = entityStore.createOrUpdateForImport(oneEntityStream);
-
- if (resp.getGuidAssignments() != null) {
- ret.getGuidAssignments().putAll(resp.getGuidAssignments());
- }
+ ImportStrategy importStrategy = null;
- currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids,
- entityStream.getPosition(),
- entityImportStreamWithResidualList.getStreamSize(),
- currentPercent);
-
- entityStream.onImportComplete(entity.getGuid());
- break;
- } catch (AtlasBaseException e) {
- if (!updateResidualList(e, residualList, entityWithExtInfo.getEntity().getGuid())) {
- throw e;
- }
- break;
- } catch (AtlasSchemaViolationException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Entity: {}", entity.getGuid(), e);
- }
-
- if (attempt == 0) {
- updateVertexGuid(entity);
- } else {
- LOG.error("Guid update failed: {}", entityWithExtInfo.getEntity().getGuid());
- throw e;
- }
- } catch (Throwable e) {
- AtlasBaseException abe = new AtlasBaseException(e);
- if (!updateResidualList(abe, residualList, entityWithExtInfo.getEntity().getGuid())) {
- throw abe;
- }
-
- LOG.warn("Exception: {}", entity.getGuid(), e);
- break;
- } finally {
- RequestContext.get().clearCache();
- }
- }
- }
-
- importResult.getProcessedEntities().addAll(processedGuids);
- LOG.info("bulkImport(): done. Total number of entities (including referred entities) imported: {}", processedGuids.size());
-
- return ret;
- }
-
- @GraphTransaction
- public void updateVertexGuid(AtlasEntity entity) {
- String entityGuid = entity.getGuid();
- AtlasObjectId objectId = entityGraphRetriever.toAtlasObjectIdWithoutGuid(entity);
-
- AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
- String vertexGuid = null;
- try {
- vertexGuid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, objectId.getUniqueAttributes());
- } catch (AtlasBaseException e) {
- LOG.warn("Entity: {}: Does not exist!", objectId);
- return;
- }
-
- if (StringUtils.isEmpty(vertexGuid) || vertexGuid.equals(entityGuid)) {
- return;
- }
-
- AtlasVertex v = AtlasGraphUtilsV2.findByGuid(vertexGuid);
- if (v == null) {
- return;
+ if (importResult.getRequest().getOptions() != null &&
+ importResult.getRequest().getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION)) {
+ importStrategy = new MigrationImport(new AtlasGraphProvider(), this.typeRegistry);
+ } else {
+ importStrategy = new RegularImport(this.entityStore, this.typeRegistry);
}
- addHistoricalGuid(v, vertexGuid);
- AtlasGraphUtilsV2.setProperty(v, Constants.GUID_PROPERTY_KEY, entityGuid);
-
- LOG.warn("GUID Updated: Entity: {}: from: {}: to: {}", objectId, vertexGuid, entity.getGuid());
- }
-
- private void addHistoricalGuid(AtlasVertex v, String vertexGuid) {
- String existingJson = AtlasGraphUtilsV2.getProperty(v, HISTORICAL_GUID_PROPERTY_KEY, String.class);
-
- AtlasGraphUtilsV2.setProperty(v, HISTORICAL_GUID_PROPERTY_KEY, getJsonArray(existingJson, vertexGuid));
+ LOG.info("BulkImportImpl: {}", importStrategy.getClass().getSimpleName());
+ return importStrategy.run(entityStream, importResult);
}
@VisibleForTesting
@@ -193,38 +85,16 @@ public class BulkImporterImpl implements BulkImporter {
return json;
}
- private boolean updateResidualList(AtlasBaseException e, List<String> lineageList, String guid) {
- if (!e.getAtlasErrorCode().getErrorCode().equals(AtlasErrorCode.INVALID_OBJECT_ID.getErrorCode())) {
- return false;
- }
-
- lineageList.add(guid);
-
- return true;
- }
-
- private float updateImportMetrics(AtlasEntity.AtlasEntityWithExtInfo currentEntity,
- EntityMutationResponse resp,
- AtlasImportResult importResult,
- Set<String> processedGuids,
- int currentIndex, int streamSize, float currentPercent) {
- if (!directoryBasedImportConfigured) {
- updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
- updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
- updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
- }
-
- String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid());
-
- return updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported);
- }
-
@VisibleForTesting
- static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent, String additionalInfo) {
+ public static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent, String additionalInfo) {
final double tolerance = 0.000001;
final int MAX_PERCENT = 100;
int maxSize = (currentIndex <= streamSize) ? streamSize : currentIndex;
+ if (maxSize <= 0) {
+ return currentPercent;
+ }
+
float percent = (float) ((currentIndex * MAX_PERCENT) / maxSize);
boolean updateLog = Double.compare(percent, currentPercent) > tolerance;
float updatedPercent = (MAX_PERCENT < maxSize) ? percent : ((updateLog) ? ++currentPercent : currentPercent);
@@ -236,7 +106,7 @@ public class BulkImporterImpl implements BulkImporter {
return updatedPercent;
}
- private static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) {
+ public static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) {
if (list == null) {
return;
}
@@ -251,41 +121,37 @@ public class BulkImporterImpl implements BulkImporter {
}
}
- private static class EntityImportStreamWithResidualList {
- private final EntityImportStream stream;
- private final List<String> residualList;
- private boolean navigateResidualList;
- private int currentResidualListIndex;
-
+ public static void updateVertexGuid(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityGraphRetriever, AtlasEntity entity) {
+ String entityGuid = entity.getGuid();
+ AtlasObjectId objectId = entityGraphRetriever.toAtlasObjectIdWithoutGuid(entity);
- public EntityImportStreamWithResidualList(EntityImportStream stream, List<String> residualList) {
- this.stream = stream;
- this.residualList = residualList;
- this.navigateResidualList = false;
- this.currentResidualListIndex = 0;
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
+ String vertexGuid = null;
+ try {
+ vertexGuid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, objectId.getUniqueAttributes());
+ } catch (AtlasBaseException e) {
+ LOG.warn("Entity: {}: Does not exist!", objectId);
+ return;
}
- public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
- if (navigateResidualList == false) {
- return stream.getNextEntityWithExtInfo();
- } else {
- stream.setPositionUsingEntityGuid(residualList.get(currentResidualListIndex++));
- return stream.getNextEntityWithExtInfo();
- }
+ if (StringUtils.isEmpty(vertexGuid) || vertexGuid.equals(entityGuid)) {
+ return;
}
- public boolean hasNext() {
- if (!navigateResidualList) {
- boolean streamHasNext = stream.hasNext();
- navigateResidualList = (streamHasNext == false);
- return streamHasNext ? streamHasNext : (currentResidualListIndex < residualList.size());
- } else {
- return (currentResidualListIndex < residualList.size());
- }
+ AtlasVertex v = AtlasGraphUtilsV2.findByGuid(vertexGuid);
+ if (v == null) {
+ return;
}
- public int getStreamSize() {
- return stream.size() + residualList.size();
- }
+ addHistoricalGuid(v, vertexGuid);
+ AtlasGraphUtilsV2.setProperty(v, Constants.GUID_PROPERTY_KEY, entityGuid);
+
+ LOG.warn("GUID Updated: Entity: {}: from: {}: to: {}", objectId, vertexGuid, entity.getGuid());
+ }
+
+ public static void addHistoricalGuid(AtlasVertex v, String vertexGuid) {
+ String existingJson = AtlasGraphUtilsV2.getProperty(v, HISTORICAL_GUID_PROPERTY_KEY, String.class);
+
+ AtlasGraphUtilsV2.setProperty(v, HISTORICAL_GUID_PROPERTY_KEY, getJsonArray(existingJson, vertexGuid));
}
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index 291d8b5..d3ba18f 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -39,7 +39,7 @@ import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinali
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.RepositoryException;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
-import org.apache.atlas.repository.graph.FullTextMapperV2;
+import org.apache.atlas.repository.graph.IFullTextMapper;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
@@ -132,15 +132,15 @@ public class EntityGraphMapper {
private final DeleteHandlerDelegate deleteDelegate;
private final AtlasTypeRegistry typeRegistry;
private final AtlasRelationshipStore relationshipStore;
- private final AtlasEntityChangeNotifier entityChangeNotifier;
+ private final IAtlasEntityChangeNotifier entityChangeNotifier;
private final AtlasInstanceConverter instanceConverter;
private final EntityGraphRetriever entityRetriever;
- private final FullTextMapperV2 fullTextMapperV2;
+ private final IFullTextMapper fullTextMapperV2;
@Inject
public EntityGraphMapper(DeleteHandlerDelegate deleteDelegate, AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph,
- AtlasRelationshipStore relationshipStore, AtlasEntityChangeNotifier entityChangeNotifier,
- AtlasInstanceConverter instanceConverter, FullTextMapperV2 fullTextMapperV2) {
+ AtlasRelationshipStore relationshipStore, IAtlasEntityChangeNotifier entityChangeNotifier,
+ AtlasInstanceConverter instanceConverter, IFullTextMapper fullTextMapperV2) {
this.deleteDelegate = deleteDelegate;
this.typeRegistry = typeRegistry;
this.graph = atlasGraph;
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IAtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IAtlasEntityChangeNotifier.java
new file mode 100644
index 0000000..c4dc5a1
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IAtlasEntityChangeNotifier.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
+import org.apache.atlas.model.instance.AtlasRelationship;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.notification.EntityNotification;
+
+import java.util.List;
+import java.util.Set;
+
+public interface IAtlasEntityChangeNotifier {
+ void onEntitiesMutated(EntityMutationResponse entityMutationResponse, boolean isImport) throws AtlasBaseException;
+
+ void notifyRelationshipMutation(AtlasRelationship relationship, EntityNotification.EntityNotificationV2.OperationType operationType) throws AtlasBaseException;
+
+ void onClassificationAddedToEntity(AtlasEntity entity, List<AtlasClassification> addedClassifications) throws AtlasBaseException;
+
+ void onClassificationsAddedToEntities(List<AtlasEntity> entities, List<AtlasClassification> addedClassifications) throws AtlasBaseException;
+
+ void onClassificationDeletedFromEntity(AtlasEntity entity, List<AtlasClassification> deletedClassifications) throws AtlasBaseException;
+
+ void onClassificationsDeletedFromEntities(List<AtlasEntity> entities, List<AtlasClassification> deletedClassifications) throws AtlasBaseException;
+
+ void onTermAddedToEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException;
+
+ void onTermDeletedFromEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException;
+
+ void onLabelsUpdatedFromEntity(String entityGuid, Set<String> addedLabels, Set<String> deletedLabels) throws AtlasBaseException;
+
+ void notifyPropagatedEntities() throws AtlasBaseException;
+
+ void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException;
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/EntityChangeNotifierNop.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/EntityChangeNotifierNop.java
new file mode 100644
index 0000000..2943ea9
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/EntityChangeNotifierNop.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2.bulkimport;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
+import org.apache.atlas.model.instance.AtlasRelationship;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.notification.EntityNotification;
+import org.apache.atlas.repository.store.graph.v2.IAtlasEntityChangeNotifier;
+
+import java.util.List;
+import java.util.Set;
+
+public class EntityChangeNotifierNop implements IAtlasEntityChangeNotifier {
+ @Override
+ public void onEntitiesMutated(EntityMutationResponse entityMutationResponse, boolean isImport) throws AtlasBaseException {
+
+ }
+
+ @Override
+ public void notifyRelationshipMutation(AtlasRelationship relationship, EntityNotification.EntityNotificationV2.OperationType operationType) throws AtlasBaseException {
+
+ }
+
+ @Override
+ public void onClassificationAddedToEntity(AtlasEntity entity, List<AtlasClassification> addedClassifications) throws AtlasBaseException {
+
+ }
+
+ @Override
+ public void onClassificationsAddedToEntities(List<AtlasEntity> entities, List<AtlasClassification> addedClassifications) throws AtlasBaseException {
+
+ }
+
+ @Override
+ public void onClassificationDeletedFromEntity(AtlasEntity entity, List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
+
+ }
+
+ @Override
+ public void onClassificationsDeletedFromEntities(List<AtlasEntity> entities, List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
+
+ }
+
+ @Override
+ public void onTermAddedToEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
+
+ }
+
+ @Override
+ public void onTermDeletedFromEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
+
+ }
+
+ @Override
+ public void onLabelsUpdatedFromEntity(String entityGuid, Set<String> addedLabels, Set<String> deletedLabels) throws AtlasBaseException {
+
+ }
+
+ @Override
+ public void notifyPropagatedEntities() throws AtlasBaseException {
+
+ }
+
+ @Override
+ public void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException {
+
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/FullTextMapperV2Nop.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/FullTextMapperV2Nop.java
new file mode 100644
index 0000000..7b7e543
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/FullTextMapperV2Nop.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2.bulkimport;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.repository.graph.IFullTextMapper;
+
+import java.util.List;
+
+public class FullTextMapperV2Nop implements IFullTextMapper {
+ @Override
+ public String getIndexTextForClassifications(String guid, List<AtlasClassification> classifications) throws AtlasBaseException {
+ return null;
+ }
+
+ @Override
+ public String getIndexTextForEntity(String guid) throws AtlasBaseException {
+ return null;
+ }
+
+ @Override
+ public String getClassificationTextForEntity(AtlasEntity entity) throws AtlasBaseException {
+ return null;
+ }
+
+ @Override
+ public AtlasEntity getAndCacheEntity(String guid) throws AtlasBaseException {
+ return null;
+ }
+
+ @Override
+ public AtlasEntity getAndCacheEntity(String guid, boolean includeReferences) throws AtlasBaseException {
+ return null;
+ }
+
+ @Override
+ public AtlasEntity.AtlasEntityWithExtInfo getAndCacheEntityWithExtInfo(String guid) throws AtlasBaseException {
+ return null;
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/ImportStrategy.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/ImportStrategy.java
new file mode 100644
index 0000000..6b70eab
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/ImportStrategy.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.store.graph.v2.bulkimport;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
+
+public abstract class ImportStrategy {
+ public abstract EntityMutationResponse run(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException;
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
new file mode 100644
index 0000000..4c912fd
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.store.graph.v2.bulkimport;
+
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.repository.converters.AtlasFormatConverters;
+import org.apache.atlas.repository.converters.AtlasInstanceConverter;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
+import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
+import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
+import org.apache.atlas.repository.store.graph.v2.AtlasRelationshipStoreV2;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
+import org.apache.atlas.repository.store.graph.v2.IAtlasEntityChangeNotifier;
+import org.apache.atlas.repository.store.graph.v2.bulkimport.pc.EntityConsumerBuilder;
+import org.apache.atlas.repository.store.graph.v2.bulkimport.pc.EntityCreationManager;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MigrationImport extends ImportStrategy {
+ private static final Logger LOG = LoggerFactory.getLogger(MigrationImport.class);
+
+ private final AtlasTypeRegistry typeRegistry;
+ private AtlasGraph atlasGraph;
+ private EntityGraphRetriever entityGraphRetriever;
+ private EntityGraphMapper entityGraphMapper;
+ private AtlasEntityStore entityStore;
+
+ public MigrationImport(AtlasGraphProvider atlasGraphProvider, AtlasTypeRegistry typeRegistry) {
+ this.typeRegistry = typeRegistry;
+ setupEntityStore(atlasGraphProvider, typeRegistry);
+ LOG.info("MigrationImport: Using bulkLoading...");
+ }
+
+ public EntityMutationResponse run(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
+ if (entityStream == null || !entityStream.hasNext()) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
+ }
+
+ if (importResult.getRequest() == null) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "importResult should contain request");
+ }
+
+ int index = 0;
+ int streamSize = entityStream.size();
+ EntityMutationResponse ret = new EntityMutationResponse();
+ EntityCreationManager creationManager = createEntityCreationManager(atlasGraph, importResult);
+
+ try {
+ LOG.info("Migration Import: Size: {}: Starting...", streamSize);
+ index = creationManager.read(entityStream);
+ creationManager.drain();
+ creationManager.extractResults();
+ } catch (Exception ex) {
+ LOG.error("Migration Import: Error: Current position: {}", index, ex);
+ } finally {
+ shutdownEntityCreationManager(creationManager);
+ }
+
+ LOG.info("Migration Import: Size: {}: Done!", streamSize);
+ return ret;
+ }
+
+ private EntityCreationManager createEntityCreationManager(AtlasGraph threadedAtlasGraph, AtlasImportResult importResult) {
+ int batchSize = importResult.getRequest().getOptionKeyBatchSize();
+ int numWorkers = getNumWorkers(importResult.getRequest().getOptionKeyNumWorkers());
+
+ EntityConsumerBuilder consumerBuilder =
+ new EntityConsumerBuilder(threadedAtlasGraph, entityStore, entityGraphRetriever, typeRegistry, batchSize);
+
+ return new EntityCreationManager(consumerBuilder, batchSize, numWorkers, importResult);
+ }
+
+ private static int getNumWorkers(int numWorkersFromOptions) {
+ int ret = (numWorkersFromOptions > 0) ? numWorkersFromOptions : 1;
+ LOG.info("Migration Import: Setting numWorkers: {}", ret);
+ return ret;
+ }
+
+ private void setupEntityStore(AtlasGraphProvider atlasGraphProvider, AtlasTypeRegistry typeRegistry) {
+ this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
+ this.atlasGraph = atlasGraphProvider.getBulkLoading();
+ DeleteHandlerDelegate deleteDelegate = new DeleteHandlerDelegate(typeRegistry);
+
+ IAtlasEntityChangeNotifier entityChangeNotifier = new EntityChangeNotifierNop();
+ AtlasRelationshipStore relationshipStore = new AtlasRelationshipStoreV2(typeRegistry, deleteDelegate, entityChangeNotifier);
+ AtlasFormatConverters formatConverters = new AtlasFormatConverters(typeRegistry);
+ AtlasInstanceConverter instanceConverter = new AtlasInstanceConverter(typeRegistry, formatConverters);
+ this.entityGraphMapper = new EntityGraphMapper(deleteDelegate, typeRegistry, atlasGraph, relationshipStore, entityChangeNotifier, instanceConverter, new FullTextMapperV2Nop());
+ this.entityStore = new AtlasEntityStoreV2(deleteDelegate, typeRegistry, entityChangeNotifier, entityGraphMapper);
+ }
+
+ private void shutdownEntityCreationManager(EntityCreationManager creationManager) {
+ try {
+ creationManager.shutdown();
+ } catch (InterruptedException e) {
+ LOG.error("Migration Import: Shutdown: Interrupted!", e);
+ }
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImport.java
similarity index 80%
copy from repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
copy to repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImport.java
index 54c32c5..ecce1b0 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImport.java
@@ -6,16 +6,18 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.atlas.repository.store.graph.v2;
+
+package org.apache.atlas.repository.store.graph.v2.bulkimport;
+
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.AtlasConfiguration;
@@ -26,22 +28,23 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
-import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
-import org.apache.atlas.repository.store.graph.BulkImporter;
+import org.apache.atlas.repository.store.graph.v2.AtlasEntityStreamForImport;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-import javax.inject.Inject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -49,27 +52,25 @@ import java.util.List;
import java.util.Set;
import static org.apache.atlas.repository.Constants.HISTORICAL_GUID_PROPERTY_KEY;
+import static org.apache.atlas.repository.store.graph.v2.BulkImporterImpl.updateImportProgress;
-@Component
-public class BulkImporterImpl implements BulkImporter {
- private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV2.class);
-
+public class RegularImport extends ImportStrategy {
+ private static final Logger LOG = LoggerFactory.getLogger(RegularImport.class);
+ private static final int MAX_ATTEMPTS = 3;
private final AtlasEntityStore entityStore;
+ private final AtlasTypeRegistry typeRegistry;
private final EntityGraphRetriever entityGraphRetriever;
- private AtlasTypeRegistry typeRegistry;
- private final int MAX_ATTEMPTS = 2;
private boolean directoryBasedImportConfigured;
- @Inject
- public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
+ public RegularImport(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
this.entityStore = entityStore;
- this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
this.typeRegistry = typeRegistry;
+ this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
this.directoryBasedImportConfigured = StringUtils.isNotEmpty(AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
}
@Override
- public EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
+ public EntityMutationResponse run(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> bulkImport()");
}
@@ -81,7 +82,7 @@ public class BulkImporterImpl implements BulkImporter {
EntityMutationResponse ret = new EntityMutationResponse();
ret.setGuidAssignments(new HashMap<>());
- Set<String> processedGuids = new HashSet<>();
+ Set<String> processedGuids = new HashSet<>();
float currentPercent = 0f;
List<String> residualList = new ArrayList<>();
@@ -209,9 +210,9 @@ public class BulkImporterImpl implements BulkImporter {
Set<String> processedGuids,
int currentIndex, int streamSize, float currentPercent) {
if (!directoryBasedImportConfigured) {
- updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
- updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
- updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
+ BulkImporterImpl.updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
+ BulkImporterImpl.updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
+ BulkImporterImpl.updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
}
String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid());
@@ -219,38 +220,6 @@ public class BulkImporterImpl implements BulkImporter {
return updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported);
}
- @VisibleForTesting
- static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent, String additionalInfo) {
- final double tolerance = 0.000001;
- final int MAX_PERCENT = 100;
-
- int maxSize = (currentIndex <= streamSize) ? streamSize : currentIndex;
- float percent = (float) ((currentIndex * MAX_PERCENT) / maxSize);
- boolean updateLog = Double.compare(percent, currentPercent) > tolerance;
- float updatedPercent = (MAX_PERCENT < maxSize) ? percent : ((updateLog) ? ++currentPercent : currentPercent);
-
- if (updateLog) {
- log.info("bulkImport(): progress: {}% (of {}) - {}", (int) Math.ceil(percent), maxSize, additionalInfo);
- }
-
- return updatedPercent;
- }
-
- private static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) {
- if (list == null) {
- return;
- }
-
- for (AtlasEntityHeader h : list) {
- if (processedGuids.contains(h.getGuid())) {
- continue;
- }
-
- processedGuids.add(h.getGuid());
- importResult.incrementMeticsCounter(String.format(prefix, h.getTypeName()));
- }
- }
-
private static class EntityImportStreamWithResidualList {
private final EntityImportStream stream;
private final List<String> residualList;
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
new file mode 100644
index 0000000..e8f4b02
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2.bulkimport.pc;
+
+import org.apache.atlas.GraphTransactionInterceptor;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.pc.WorkItemConsumer;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.repository.store.graph.v2.AtlasEntityStreamForImport;
+import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWithExtInfo> {
+ private static final Logger LOG = LoggerFactory.getLogger(EntityConsumer.class);
+ private static final int MAX_COMMIT_RETRY_COUNT = 3;
+
+ private final int batchSize;
+ private AtomicLong counter = new AtomicLong(1);
+ private AtomicLong currentBatch = new AtomicLong(1);
+
+ private final AtlasGraph atlasGraph;
+ private final AtlasEntityStore entityStoreV2;
+ private final AtlasTypeRegistry typeRegistry;
+ private final EntityGraphRetriever entityGraphRetriever;
+
+ private List<AtlasEntity.AtlasEntityWithExtInfo> entityBuffer = new ArrayList<>();
+ private List<EntityMutationResponse> localResults = new ArrayList<>();
+
+ public EntityConsumer(AtlasGraph atlasGraph, AtlasEntityStore entityStore,
+ EntityGraphRetriever entityGraphRetriever, AtlasTypeRegistry typeRegistry,
+ BlockingQueue queue, int batchSize) {
+ super(queue);
+
+ this.atlasGraph = atlasGraph;
+ this.entityStoreV2 = entityStore;
+ this.entityGraphRetriever = entityGraphRetriever;
+ this.typeRegistry = typeRegistry;
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ protected void processItem(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
+ int delta = (MapUtils.isEmpty(entityWithExtInfo.getReferredEntities())
+ ? 1
+ : entityWithExtInfo.getReferredEntities().size()) + 1;
+
+ long currentCount = counter.addAndGet(delta);
+ currentBatch.addAndGet(delta);
+ entityBuffer.add(entityWithExtInfo);
+
+ try {
+ processEntity(entityWithExtInfo, currentCount);
+ attemptCommit();
+ } catch (Exception e) {
+ LOG.info("Data loss: Please re-submit!", e);
+ }
+ }
+
+ private void processEntity(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo, long currentCount) {
+ try {
+ RequestContext.get().setImportInProgress(true);
+ AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null);
+
+ LOG.debug("Processing: {}", currentCount);
+ EntityMutationResponse result = entityStoreV2.createOrUpdateForImportNoCommit(oneEntityStream);
+ localResults.add(result);
+ } catch (AtlasBaseException e) {
+ addResult(entityWithExtInfo.getEntity().getGuid());
+ LOG.warn("Exception: {}", entityWithExtInfo.getEntity().getGuid(), e);
+ } catch (AtlasSchemaViolationException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Entity: {}", entityWithExtInfo.getEntity().getGuid(), e);
+ }
+
+ BulkImporterImpl.updateVertexGuid(typeRegistry, entityGraphRetriever, entityWithExtInfo.getEntity());
+ }
+ }
+
+ private void attemptCommit() {
+ if (currentBatch.get() < batchSize) {
+ return;
+ }
+
+ doCommit();
+ }
+
+ @Override
+ protected void doCommit() {
+ for (int retryCount = 1; retryCount <= MAX_COMMIT_RETRY_COUNT; retryCount++) {
+ if (commitWithRetry(retryCount)) {
+ return;
+ }
+ }
+
+ LOG.error("Retries exceeded! Potential data loss! Please correct data and re-attempt. Buffer: {}: Counter: {}", entityBuffer.size(), counter.get());
+ clear();
+ }
+
+ @Override
+ protected void commitDirty() {
+ super.commitDirty();
+ LOG.info("Total: Commit: {}", counter.get());
+ counter.set(0);
+ }
+
+ private boolean commitWithRetry(int retryCount) {
+ try {
+ atlasGraph.commit();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Commit: Done!: Buffer: {}: Batch: {}: Counter: {}", entityBuffer.size(), currentBatch.get(), counter.get());
+ }
+
+ dispatchResults();
+ return true;
+ } catch (Exception ex) {
+ rollbackPauseRetry(retryCount, ex);
+ return false;
+ }
+ }
+
+ private void rollbackPauseRetry(int retryCount, Exception ex) {
+ atlasGraph.rollback();
+ clearCache();
+
+ LOG.error("Rollback: Done! Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount);
+ pause(retryCount);
+ LOG.warn("Commit error! Will pause and retry: Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount, ex);
+ retryProcessEntity(retryCount);
+ }
+
+ private void retryProcessEntity(int retryCount) {
+ LOG.info("Replaying: Starting!: Buffer: {}: Retry count: {}", entityBuffer.size(), retryCount);
+ for (AtlasEntity.AtlasEntityWithExtInfo e : entityBuffer) {
+ processEntity(e, counter.get());
+ }
+ LOG.info("Replaying: Done!: Buffer: {}: Retry count: {}", entityBuffer.size(), retryCount);
+ }
+
+ private void dispatchResults() {
+ localResults.stream().forEach(x -> {
+ addResultsFromResponse(x.getCreatedEntities());
+ addResultsFromResponse(x.getUpdatedEntities());
+ addResultsFromResponse(x.getDeletedEntities());
+ });
+
+ clear();
+ }
+
+ private void pause(int retryCount) {
+ try {
+ Thread.sleep(1000 * retryCount);
+ } catch (InterruptedException e) {
+ LOG.error("pause: Interrupted!", e);
+ }
+ }
+
+ private void addResultsFromResponse(List<AtlasEntityHeader> entities) {
+ if (CollectionUtils.isEmpty(entities)) {
+ return;
+ }
+
+ for (AtlasEntityHeader eh : entities) {
+ addResult(eh.getGuid());
+ }
+ }
+
+ private void clear() {
+ localResults.clear();
+ entityBuffer.clear();
+ clearCache();
+ currentBatch.set(0);
+ }
+
+ private void clearCache() {
+ GraphTransactionInterceptor.clearCache();
+ RequestContext.get().clearCache();
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java
new file mode 100644
index 0000000..69d33b2
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.store.graph.v2.bulkimport.pc;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.pc.WorkItemBuilder;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import org.apache.atlas.type.AtlasTypeRegistry;
+
+import java.util.concurrent.BlockingQueue;
+
+public class EntityConsumerBuilder implements WorkItemBuilder<EntityConsumer, AtlasEntity.AtlasEntityWithExtInfo> {
+ private AtlasGraph atlasGraph;
+ private AtlasEntityStore entityStore;
+ private final EntityGraphRetriever entityGraphRetriever;
+ private final AtlasTypeRegistry typeRegistry;
+ private int batchSize;
+
+ public EntityConsumerBuilder(AtlasGraph atlasGraph, AtlasEntityStore entityStore,
+ EntityGraphRetriever entityGraphRetriever, AtlasTypeRegistry typeRegistry, int batchSize) {
+ this.atlasGraph = atlasGraph;
+ this.entityStore = entityStore;
+ this.entityGraphRetriever = entityGraphRetriever;
+ this.typeRegistry = typeRegistry;
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public EntityConsumer build(BlockingQueue<AtlasEntity.AtlasEntityWithExtInfo> queue) {
+ return new EntityConsumer(atlasGraph, entityStore, entityGraphRetriever, typeRegistry, queue, this.batchSize);
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
new file mode 100644
index 0000000..16bb49e
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.store.graph.v2.bulkimport.pc;
+
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.pc.StatusReporter;
+import org.apache.atlas.pc.WorkItemBuilder;
+import org.apache.atlas.pc.WorkItemManager;
+import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
+import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManager {
+ private static final Logger LOG = LoggerFactory.getLogger(EntityCreationManager.class);
+ private static final String WORKER_PREFIX = "migration-import";
+
+ private final StatusReporter<String, String> statusReporter;
+ private final AtlasImportResult importResult;
+ private String currentTypeName;
+ private float currentPercent;
+ private EntityImportStream entityImportStream;
+
+ public EntityCreationManager(WorkItemBuilder builder, int batchSize, int numWorkers, AtlasImportResult importResult) {
+ super(builder, WORKER_PREFIX, batchSize, numWorkers, true);
+ this.importResult = importResult;
+
+ this.statusReporter = new StatusReporter<>();
+ }
+
+ public int read(EntityImportStream entityStream) {
+ int currentIndex = 0;
+ AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo;
+ this.entityImportStream = entityStream;
+ while ((entityWithExtInfo = entityStream.getNextEntityWithExtInfo()) != null) {
+ AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
+ if (entity == null) {
+ continue;
+ }
+
+ try {
+ produce(currentIndex++, entity.getTypeName(), entityWithExtInfo);
+ } catch (Throwable e) {
+ LOG.warn("Exception: {}", entity.getGuid(), e);
+ break;
+ }
+ }
+ return currentIndex;
+ }
+
+ private void produce(int currentIndex, String typeName, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
+ String previousTypeName = getCurrentTypeName();
+
+ if (StringUtils.isNotEmpty(typeName)
+ && StringUtils.isNotEmpty(previousTypeName)
+ && !StringUtils.equals(previousTypeName, typeName)) {
+ LOG.info("Waiting: '{}' to complete...", previousTypeName);
+ super.drain();
+ LOG.info("Switching entity type processing: From: '{}' To: '{}'...", previousTypeName, typeName);
+ }
+
+ setCurrentTypeName(typeName);
+ statusReporter.produced(entityWithExtInfo.getEntity().getGuid(), String.format("%s:%s", entityWithExtInfo.getEntity().getTypeName(), currentIndex));
+ super.checkProduce(entityWithExtInfo);
+ extractResults();
+ }
+
+ public void extractResults() {
+ Object result;
+ while (((result = getResults().poll())) != null) {
+ statusReporter.processed((String) result);
+ }
+
+ logStatus();
+ }
+
+ private void logStatus() {
+ String ack = statusReporter.ack();
+ if (StringUtils.isEmpty(ack)) {
+ return;
+ }
+
+ String[] split = ack.split(":");
+ if (split.length == 0 || split.length < 2) {
+ return;
+ }
+
+ importResult.incrementMeticsCounter(split[0]);
+ this.currentPercent = updateImportMetrics(split[0], Integer.parseInt(split[1]), this.entityImportStream.size(), getCurrentPercent());
+ }
+
+ private static float updateImportMetrics(String typeNameGuid, int currentIndex, int streamSize, float currentPercent) {
+ String lastEntityImported = String.format("entity:last-imported:%s:(%s)", typeNameGuid, currentIndex);
+ return BulkImporterImpl.updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported);
+ }
+
+ private String getCurrentTypeName() {
+ return this.currentTypeName;
+ }
+
+ private void setCurrentTypeName(String typeName) {
+ this.currentTypeName = typeName;
+ }
+
+ private float getCurrentPercent() {
+ return this.currentPercent;
+ }
+}
diff --git a/repository/src/test/java/org/apache/atlas/TestModules.java b/repository/src/test/java/org/apache/atlas/TestModules.java
index 06e0ebc..a298934 100644
--- a/repository/src/test/java/org/apache/atlas/TestModules.java
+++ b/repository/src/test/java/org/apache/atlas/TestModules.java
@@ -36,7 +36,9 @@ import org.apache.atlas.listener.TypeDefChangeListener;
import org.apache.atlas.repository.audit.EntityAuditListener;
import org.apache.atlas.repository.audit.EntityAuditListenerV2;
import org.apache.atlas.repository.audit.EntityAuditRepository;
+import org.apache.atlas.repository.graph.FullTextMapperV2;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
+import org.apache.atlas.repository.graph.IFullTextMapper;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.GraphDBMigrator;
import org.apache.atlas.repository.graphdb.janus.migration.GraphDBGraphSONMigrator;
@@ -61,6 +63,7 @@ import org.apache.atlas.repository.store.graph.v2.AtlasRelationshipStoreV2;
import org.apache.atlas.repository.store.graph.v2.AtlasTypeDefGraphStoreV2;
import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
+import org.apache.atlas.repository.store.graph.v2.IAtlasEntityChangeNotifier;
import org.apache.atlas.runner.LocalSolrRunner;
import org.apache.atlas.service.Service;
import org.apache.atlas.store.AtlasTypeDefStore;
@@ -144,6 +147,8 @@ public class TestModules {
bind(AtlasEntityStore.class).to(AtlasEntityStoreV2.class);
bind(AtlasRelationshipStore.class).to(AtlasRelationshipStoreV2.class);
+ bind(IAtlasEntityChangeNotifier.class).to(AtlasEntityChangeNotifier.class);
+ bind(IFullTextMapper.class).to(FullTextMapperV2.class);
// bind the DiscoveryService interface to an implementation
bind(AtlasDiscoveryService.class).to(EntityDiscoveryService.class).asEagerSingleton();