You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2020/03/10 16:38:44 UTC

[atlas] branch master updated: ATLAS-3320: Migration Import implementation.

This is an automated email from the ASF dual-hosted git repository.

amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 765ea58  ATLAS-3320: Migration Import implementation.
765ea58 is described below

commit 765ea583b23d25ecc919d520092747c4158466a5
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Thu Mar 5 16:06:56 2020 -0800

    ATLAS-3320: Migration Import implementation.
---
 .../repository/graphdb/janus/AtlasJanusGraph.java  |   2 +-
 .../atlas/model/impexp/AtlasImportRequest.java     |  21 ++
 .../apache/atlas/GraphTransactionInterceptor.java  |   4 +
 .../atlas/repository/graph/FullTextMapperV2.java   |   9 +-
 .../atlas/repository/graph/IFullTextMapper.java    |  45 +++++
 .../atlas/repository/impexp/ImportService.java     |   5 +-
 .../atlas/repository/impexp/ZipSourceDirect.java   |   8 +
 .../migration/ZipFileMigrationImporter.java        |  61 +++++-
 .../repository/store/graph/AtlasEntityStore.java   |   8 +
 .../store/graph/v2/AtlasEntityChangeNotifier.java  |  13 +-
 .../store/graph/v2/AtlasEntityStoreV2.java         |   9 +-
 .../store/graph/v2/AtlasRelationshipStoreV2.java   |   4 +-
 .../store/graph/v2/BulkImporterImpl.java           | 224 +++++----------------
 .../store/graph/v2/EntityGraphMapper.java          |  10 +-
 .../store/graph/v2/IAtlasEntityChangeNotifier.java |  54 +++++
 .../v2/bulkimport/EntityChangeNotifierNop.java     |  88 ++++++++
 .../graph/v2/bulkimport/FullTextMapperV2Nop.java   |  57 ++++++
 .../store/graph/v2/bulkimport/ImportStrategy.java  |  28 +++
 .../store/graph/v2/bulkimport/MigrationImport.java | 124 ++++++++++++
 .../RegularImport.java}                            |  77 +++----
 .../graph/v2/bulkimport/pc/EntityConsumer.java     | 209 +++++++++++++++++++
 .../v2/bulkimport/pc/EntityConsumerBuilder.java    |  50 +++++
 .../v2/bulkimport/pc/EntityCreationManager.java    | 126 ++++++++++++
 .../test/java/org/apache/atlas/TestModules.java    |   5 +
 24 files changed, 987 insertions(+), 254 deletions(-)

diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
index 4acb371..0176ba7 100644
--- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
+++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
@@ -116,7 +116,7 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE
             }
         }
 
-        janusGraph = (StandardJanusGraph) AtlasJanusGraphDatabase.getGraphInstance();
+        janusGraph = (StandardJanusGraph) graphInstance;
     }
 
     @Override
diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
index 3362bf1..09dafdf 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
@@ -44,6 +44,9 @@ public class AtlasImportRequest implements Serializable {
     public  static final String TRANSFORMS_KEY             = "transforms";
     public  static final String TRANSFORMERS_KEY           = "transformers";
     public  static final String OPTION_KEY_REPLICATED_FROM = "replicatedFrom";
+    public  static final String OPTION_KEY_MIGRATION       = "migration";
+    public  static final String OPTION_KEY_NUM_WORKERS     = "numWorkers";
+    public  static final String OPTION_KEY_BATCH_SIZE      = "batchSize";
     public  static final String OPTION_KEY_FORMAT          = "format";
     public  static final String OPTION_KEY_FORMAT_ZIP_DIRECT = "zipDirect";
     private static final String START_POSITION_KEY         = "startPosition";
@@ -124,6 +127,24 @@ public class AtlasImportRequest implements Serializable {
         return isReplicationOptionSet() ? options.get(OPTION_KEY_REPLICATED_FROM) : StringUtils.EMPTY;
     }
 
+    @JsonIgnore
+    public int getOptionKeyNumWorkers() {
+        return getOptionsValue(OPTION_KEY_NUM_WORKERS, 1);
+    }
+
+    @JsonIgnore
+    public int getOptionKeyBatchSize() {
+        return getOptionsValue(OPTION_KEY_BATCH_SIZE, 1);
+    }
+
+    private int getOptionsValue(String optionKeyBatchSize, int defaultValue) {
+        String optionsValue = getOptionForKey(optionKeyBatchSize);
+
+        return StringUtils.isEmpty(optionsValue) ?
+                defaultValue :
+                Integer.valueOf(optionsValue);
+    }
+
     @JsonAnySetter
     public void setOption(String key, String value) {
         if (null == options) {
diff --git a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
index bbe0dc5..57e454a 100644
--- a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
+++ b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
@@ -199,6 +199,10 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
         return cache.get(guid);
     }
 
+    public static void clearCache() {
+        guidVertexCache.get().clear();
+    }
+
     boolean logException(Throwable t) {
         if (t instanceof AtlasBaseException) {
             Response.Status httpCode = ((AtlasBaseException) t).getAtlasErrorCode().getHttpCode();
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java b/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java
index 0f2b4bf..417c96c 100644
--- a/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java
@@ -53,7 +53,7 @@ import java.util.Set;
 
 
 @Component
-public class FullTextMapperV2 {
+public class FullTextMapperV2 implements IFullTextMapper {
     private static final Logger LOG = LoggerFactory.getLogger(FullTextMapperV2.class);
 
     private static final String FULL_TEXT_DELIMITER                  = " ";
@@ -84,6 +84,8 @@ public class FullTextMapperV2 {
      * @return Full text string ONLY for the added classifications
      * @throws AtlasBaseException
      */
+
+    @Override
     public String getIndexTextForClassifications(String guid, List<AtlasClassification> classifications) throws AtlasBaseException {
         String                       ret     = null;
         final AtlasEntityWithExtInfo entityWithExtInfo;
@@ -120,6 +122,7 @@ public class FullTextMapperV2 {
         return ret;
     }
 
+    @Override
     public String getIndexTextForEntity(String guid) throws AtlasBaseException {
         String                   ret    = null;
         final AtlasEntity        entity;
@@ -150,6 +153,7 @@ public class FullTextMapperV2 {
         return ret;
     }
 
+    @Override
     public String getClassificationTextForEntity(AtlasEntity entity) throws AtlasBaseException {
         String                   ret    = null;
 
@@ -271,10 +275,12 @@ public class FullTextMapperV2 {
         }
     }
 
+    @Override
     public AtlasEntity getAndCacheEntity(String guid) throws AtlasBaseException {
         return getAndCacheEntity(guid, true);
     }
 
+    @Override
     public AtlasEntity  getAndCacheEntity(String guid, boolean includeReferences) throws AtlasBaseException {
         RequestContext context = RequestContext.get();
         AtlasEntity    entity  = context.getEntity(guid);
@@ -294,6 +300,7 @@ public class FullTextMapperV2 {
         return entity;
     }
 
+    @Override
     public AtlasEntityWithExtInfo getAndCacheEntityWithExtInfo(String guid) throws AtlasBaseException {
         RequestContext         context           = RequestContext.get();
         AtlasEntityWithExtInfo entityWithExtInfo = context.getEntityWithExtInfo(guid);
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/IFullTextMapper.java b/repository/src/main/java/org/apache/atlas/repository/graph/IFullTextMapper.java
new file mode 100644
index 0000000..2bbf4d8
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/IFullTextMapper.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.graph;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+
+import java.util.List;
+
+public interface IFullTextMapper {
+    /**
+     * Map newly associated/defined classifications for the entity with given GUID
+     * @param guid Entity guid
+     * @param classifications new classifications added to the entity
+     * @return Full text string ONLY for the added classifications
+     * @throws AtlasBaseException
+     */
+    String getIndexTextForClassifications(String guid, List<AtlasClassification> classifications) throws AtlasBaseException;
+
+    String getIndexTextForEntity(String guid) throws AtlasBaseException;
+
+    String getClassificationTextForEntity(AtlasEntity entity) throws AtlasBaseException;
+
+    AtlasEntity getAndCacheEntity(String guid) throws AtlasBaseException;
+
+    AtlasEntity  getAndCacheEntity(String guid, boolean includeReferences) throws AtlasBaseException;
+
+    AtlasEntity.AtlasEntityWithExtInfo getAndCacheEntityWithExtInfo(String guid) throws AtlasBaseException;
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index 1964ade..c18c4ab 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -250,8 +250,9 @@ public class ImportService {
 
     private EntityImportStream createZipSource(AtlasImportRequest request, InputStream inputStream, String configuredTemporaryDirectory) throws AtlasBaseException {
         try {
-            if (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_FORMAT) &&
-                    request.getOptions().get(AtlasImportRequest.OPTION_KEY_FORMAT).equals(AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT) ) {
+            if (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION) || (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_FORMAT) &&
+                    request.getOptions().get(AtlasImportRequest.OPTION_KEY_FORMAT).equals(AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT))) {
+                LOG.info("ZipSource Format: ZipDirect: Size: {}", request.getOptions().get("size"));
                 return getZipDirectEntityImportStream(request, inputStream);
             }
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
index cb5a7ac..75b8e9e 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
@@ -64,6 +64,10 @@ public class ZipSourceDirect implements EntityImportStream {
         this.zipInputStream = new ZipInputStream(inputStream);
         this.streamSize = streamSize;
         prepareStreamForFetch();
+
+        if (this.streamSize == 1) {
+            LOG.info("ZipSourceDirect: Stream Size set to: {}. This will cause inaccurate percentage reporting.", this.streamSize);
+        }
     }
 
     @Override
@@ -226,6 +230,10 @@ public class ZipSourceDirect implements EntityImportStream {
     }
 
     public int size() {
+        if (this.streamSize == 1) {
+            return currentPosition;
+        }
+
         return this.streamSize;
     }
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
index f552525..35a76ea 100644
--- a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
+++ b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
@@ -24,6 +24,8 @@ import org.apache.atlas.RequestContext;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.impexp.AtlasImportRequest;
 import org.apache.atlas.repository.impexp.ImportService;
+import org.apache.atlas.type.AtlasType;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,11 +34,20 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetAddress;
+import java.util.Map;
+import java.util.zip.ZipFile;
 
 public class ZipFileMigrationImporter implements Runnable {
     private static final Logger LOG = LoggerFactory.getLogger(ZipFileMigrationImporter.class);
 
-    private static String ENV_USER_NAME = "user.name";
+    private static final String APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS = "atlas.migration.mode.workers";
+    private static final String APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE       = "atlas.migration.mode.batch.size";
+    private static final String DEFAULT_NUMBER_OF_WORKERS = "4";
+    private static final String DEFAULT_BATCH_SIZE = "100";
+    private static final String ZIP_FILE_COMMENT_ENTITIES_COUNT = "entitiesCount";
+    private static final String ZIP_FILE_COMMENT_TOTAL_COUNT = "total";
+
+    private final static String ENV_USER_NAME = "user.name";
 
     private final ImportService importService;
     private final String fileToImport;
@@ -52,7 +63,8 @@ public class ZipFileMigrationImporter implements Runnable {
             FileWatcher fileWatcher = new FileWatcher(fileToImport);
             fileWatcher.start();
 
-            performImport(new FileInputStream(new File(fileToImport)));
+            int streamSize = getStreamSizeFromComment(fileToImport);
+            performImport(new FileInputStream(new File(fileToImport)), streamSize);
         } catch (IOException e) {
             LOG.error("Migration Import: IO Error!", e);
         } catch (AtlasBaseException e) {
@@ -60,19 +72,46 @@ public class ZipFileMigrationImporter implements Runnable {
         }
     }
 
-    private void performImport(InputStream fs) throws AtlasBaseException {
+    private int getStreamSizeFromComment(String fileToImport) {
+        int ret = 1;
+        try {
+            ZipFile zipFile = new ZipFile(fileToImport);
+            String comment = zipFile.getComment();
+            ret = processZipFileStreamSizeComment(comment);
+            zipFile.close();
+        } catch (IOException e) {
+            LOG.error("Error opening ZIP file: {}", fileToImport, e);
+        }
+
+        return ret;
+    }
+
+    private int processZipFileStreamSizeComment(String comment) {
+        if (StringUtils.isEmpty(comment)) {
+            return 1;
+        }
+
+        Map map = AtlasType.fromJson(comment, Map.class);
+        int entitiesCount = (int) map.get(ZIP_FILE_COMMENT_ENTITIES_COUNT);
+        int totalCount = (int) map.get(ZIP_FILE_COMMENT_TOTAL_COUNT);
+        LOG.info("ZipFileMigrationImporter: Zip file: Comment: streamSize: {}: total: {}", entitiesCount, totalCount);
+
+        return entitiesCount;
+    }
+
+    private void performImport(InputStream fs, int streamSize) throws AtlasBaseException {
         try {
             LOG.info("Migration Import: {}: Starting...", fileToImport);
 
             RequestContext.get().setUser(getUserNameFromEnvironment(), null);
 
-            importService.run(fs, getImportRequest(),
+            importService.run(fs, getImportRequest(streamSize),
                     getUserNameFromEnvironment(),
                     InetAddress.getLocalHost().getHostName(),
                     InetAddress.getLocalHost().getHostAddress());
 
         } catch (Exception ex) {
-            LOG.error("Error loading zip for migration", ex);
+            LOG.error("Migration Import: Error loading zip for migration!", ex);
             throw new AtlasBaseException(ex);
         } finally {
             LOG.info("Migration Import: {}: Done!", fileToImport);
@@ -83,9 +122,17 @@ public class ZipFileMigrationImporter implements Runnable {
         return System.getProperty(ENV_USER_NAME);
     }
 
-    private AtlasImportRequest getImportRequest() throws AtlasException {
+    private AtlasImportRequest getImportRequest(int streamSize) throws AtlasException {
         AtlasImportRequest request = new AtlasImportRequest();
-        request.setOption(AtlasImportRequest.OPTION_KEY_FORMAT, AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT);
+
+        request.setSizeOption(streamSize);
+        request.setOption(AtlasImportRequest.OPTION_KEY_MIGRATION, "true");
+        request.setOption(AtlasImportRequest.OPTION_KEY_NUM_WORKERS, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS, DEFAULT_NUMBER_OF_WORKERS));
+        request.setOption(AtlasImportRequest.OPTION_KEY_BATCH_SIZE, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE, DEFAULT_BATCH_SIZE));
+
         return request;
     }
+    private String getPropertyValue(String property, String defaultValue) throws AtlasException {
+        return ApplicationProperties.get().getString(property, defaultValue);
+    }
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
index d33f404..834c9d1 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
@@ -150,6 +150,14 @@ public interface AtlasEntityStore {
     EntityMutationResponse createOrUpdateForImport(EntityStream entityStream) throws AtlasBaseException;
 
     /**
+     * Create or update  entities with parameters necessary for import process without commit. Caller will have to do take care of commit.
+     * @param entityStream AtlasEntityStream
+     * @return EntityMutationResponse Entity mutations operations with the corresponding set of entities on which these operations were performed
+     * @throws AtlasBaseException
+     */
+    EntityMutationResponse createOrUpdateForImportNoCommit(EntityStream entityStream) throws AtlasBaseException;
+
+    /**
      * Update a single entity
      * @param objectId     ID of the entity
      * @param updatedEntityInfo updated entity information
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
index d7020a7..00c0114 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
@@ -66,7 +66,7 @@ import static org.apache.atlas.repository.Constants.ENTITY_TEXT_PROPERTY_KEY;
 
 
 @Component
-public class AtlasEntityChangeNotifier {
+public class AtlasEntityChangeNotifier implements IAtlasEntityChangeNotifier {
     private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityChangeNotifier.class);
 
     private final Set<EntityChangeListener>   entityChangeListeners;
@@ -91,6 +91,7 @@ public class AtlasEntityChangeNotifier {
         this.isV2EntityNotificationEnabled = AtlasRepositoryConfiguration.isV2EntityNotificationEnabled();
     }
 
+    @Override
     public void onEntitiesMutated(EntityMutationResponse entityMutationResponse, boolean isImport) throws AtlasBaseException {
         if (CollectionUtils.isEmpty(entityChangeListeners)) {
             return;
@@ -119,6 +120,7 @@ public class AtlasEntityChangeNotifier {
         notifyPropagatedEntities();
     }
 
+    @Override
     public void notifyRelationshipMutation(AtlasRelationship relationship, EntityNotification.EntityNotificationV2.OperationType operationType) throws AtlasBaseException {
         if (CollectionUtils.isEmpty(entityChangeListeners)) {
             return;
@@ -137,6 +139,7 @@ public class AtlasEntityChangeNotifier {
         }
     }
 
+    @Override
     public void onClassificationAddedToEntity(AtlasEntity entity, List<AtlasClassification> addedClassifications) throws AtlasBaseException {
         if (isV2EntityNotificationEnabled) {
             doFullTextMapping(entity.getGuid());
@@ -166,6 +169,7 @@ public class AtlasEntityChangeNotifier {
         }
     }
 
+    @Override
     public void onClassificationsAddedToEntities(List<AtlasEntity> entities, List<AtlasClassification> addedClassifications) throws AtlasBaseException {
         if (isV2EntityNotificationEnabled) {
             doFullTextMappingHelper(entities);
@@ -201,6 +205,7 @@ public class AtlasEntityChangeNotifier {
         }
     }
 
+    @Override
     public void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException {
         doFullTextMapping(entity.getGuid());
 
@@ -228,6 +233,7 @@ public class AtlasEntityChangeNotifier {
         }
     }
 
+    @Override
     public void onClassificationDeletedFromEntity(AtlasEntity entity, List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
         doFullTextMapping(entity.getGuid());
 
@@ -255,6 +261,7 @@ public class AtlasEntityChangeNotifier {
         }
     }
 
+    @Override
     public void onClassificationsDeletedFromEntities(List<AtlasEntity> entities, List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
         doFullTextMappingHelper(entities);
 
@@ -288,6 +295,7 @@ public class AtlasEntityChangeNotifier {
         }
     }
 
+    @Override
     public void onTermAddedToEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
         // listeners notified on term-entity association only if v2 notifications are enabled
         if (isV2EntityNotificationEnabled) {
@@ -307,6 +315,7 @@ public class AtlasEntityChangeNotifier {
         }
     }
 
+    @Override
     public void onTermDeletedFromEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
         // listeners notified on term-entity disassociation only if v2 notifications are enabled
         if (isV2EntityNotificationEnabled) {
@@ -326,6 +335,7 @@ public class AtlasEntityChangeNotifier {
         }
     }
 
+    @Override
     public void onLabelsUpdatedFromEntity(String entityGuid, Set<String> addedLabels, Set<String> deletedLabels) throws AtlasBaseException {
         doFullTextMapping(entityGuid);
 
@@ -339,6 +349,7 @@ public class AtlasEntityChangeNotifier {
         }
     }
 
+    @Override
     public void notifyPropagatedEntities() throws AtlasBaseException {
         RequestContext                         context             = RequestContext.get();
         Map<String, List<AtlasClassification>> addedPropagations   = context.getAddedPropagations();
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index 6de57e3..1e40f48 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -90,13 +90,13 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
 
     private final DeleteHandlerDelegate     deleteDelegate;
     private final AtlasTypeRegistry         typeRegistry;
-    private final AtlasEntityChangeNotifier entityChangeNotifier;
+    private final IAtlasEntityChangeNotifier entityChangeNotifier;
     private final EntityGraphMapper         entityGraphMapper;
     private final EntityGraphRetriever      entityRetriever;
 
     @Inject
     public AtlasEntityStoreV2(DeleteHandlerDelegate deleteDelegate, AtlasTypeRegistry typeRegistry,
-                              AtlasEntityChangeNotifier entityChangeNotifier, EntityGraphMapper entityGraphMapper) {
+                              IAtlasEntityChangeNotifier entityChangeNotifier, EntityGraphMapper entityGraphMapper) {
         this.deleteDelegate       = deleteDelegate;
         this.typeRegistry         = typeRegistry;
         this.entityChangeNotifier = entityChangeNotifier;
@@ -332,6 +332,11 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
     }
 
     @Override
+    public EntityMutationResponse createOrUpdateForImportNoCommit(EntityStream entityStream) throws AtlasBaseException {
+        return createOrUpdate(entityStream, false, true, true);
+    }
+
+    @Override
     @GraphTransaction
     public EntityMutationResponse updateEntity(AtlasObjectId objectId, AtlasEntityWithExtInfo updatedEntityInfo, boolean isPartialUpdate) throws AtlasBaseException {
         if (LOG.isDebugEnabled()) {
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
index fdf117a..6a75ba9 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
@@ -99,10 +99,10 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
     private final EntityGraphRetriever      entityRetriever;
     private final DeleteHandlerDelegate     deleteDelegate;
     private final GraphHelper               graphHelper = GraphHelper.getInstance();
-    private final AtlasEntityChangeNotifier entityChangeNotifier;
+    private final IAtlasEntityChangeNotifier entityChangeNotifier;
 
     @Inject
-    public AtlasRelationshipStoreV2(AtlasTypeRegistry typeRegistry, DeleteHandlerDelegate deleteDelegate, AtlasEntityChangeNotifier entityChangeNotifier) {
+    public AtlasRelationshipStoreV2(AtlasTypeRegistry typeRegistry, DeleteHandlerDelegate deleteDelegate, IAtlasEntityChangeNotifier entityChangeNotifier) {
         this.typeRegistry         = typeRegistry;
         this.entityRetriever      = new EntityGraphRetriever(typeRegistry);
         this.deleteDelegate       = deleteDelegate;
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
index 54c32c5..a4d732a 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
@@ -18,33 +18,29 @@
 package org.apache.atlas.repository.store.graph.v2;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.atlas.AtlasConfiguration;
-import org.apache.atlas.AtlasErrorCode;
-import org.apache.atlas.RequestContext;
-import org.apache.atlas.annotation.GraphTransaction;
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
 import org.apache.atlas.model.impexp.AtlasImportResult;
 import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.EntityMutationResponse;
-import org.apache.atlas.repository.Constants;
-import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.BulkImporter;
+import org.apache.atlas.repository.store.graph.v2.bulkimport.ImportStrategy;
+import org.apache.atlas.repository.store.graph.v2.bulkimport.MigrationImport;
+import org.apache.atlas.repository.store.graph.v2.bulkimport.RegularImport;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.type.Constants;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
 import javax.inject.Inject;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -55,131 +51,27 @@ public class BulkImporterImpl implements BulkImporter {
     private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV2.class);
 
     private final AtlasEntityStore entityStore;
-    private final EntityGraphRetriever entityGraphRetriever;
     private AtlasTypeRegistry typeRegistry;
-    private final int MAX_ATTEMPTS = 2;
-    private boolean directoryBasedImportConfigured;
 
     @Inject
     public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
         this.entityStore = entityStore;
-        this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
         this.typeRegistry = typeRegistry;
-        this.directoryBasedImportConfigured = StringUtils.isNotEmpty(AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
     }
 
     @Override
     public EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("==> bulkImport()");
-        }
-
-        if (entityStream == null || !entityStream.hasNext()) {
-            throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
-        }
-
-        EntityMutationResponse ret = new EntityMutationResponse();
-        ret.setGuidAssignments(new HashMap<>());
-
-        Set<String>  processedGuids = new HashSet<>();
-        float        currentPercent = 0f;
-        List<String> residualList   = new ArrayList<>();
-
-        EntityImportStreamWithResidualList entityImportStreamWithResidualList = new EntityImportStreamWithResidualList(entityStream, residualList);
-
-        while (entityImportStreamWithResidualList.hasNext()) {
-            AtlasEntityWithExtInfo entityWithExtInfo = entityImportStreamWithResidualList.getNextEntityWithExtInfo();
-            AtlasEntity            entity            = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
-
-            if (entity == null) {
-                continue;
-            }
-
-            for (int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) {
-                try {
-                    AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null);
-                    EntityMutationResponse resp = entityStore.createOrUpdateForImport(oneEntityStream);
-
-                    if (resp.getGuidAssignments() != null) {
-                        ret.getGuidAssignments().putAll(resp.getGuidAssignments());
-                    }
+        ImportStrategy importStrategy = null;
 
-                    currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids,
-                            entityStream.getPosition(),
-                            entityImportStreamWithResidualList.getStreamSize(),
-                            currentPercent);
-
-                    entityStream.onImportComplete(entity.getGuid());
-                    break;
-                } catch (AtlasBaseException e) {
-                    if (!updateResidualList(e, residualList, entityWithExtInfo.getEntity().getGuid())) {
-                        throw e;
-                    }
-                    break;
-                } catch (AtlasSchemaViolationException e) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Entity: {}", entity.getGuid(), e);
-                    }
-
-                    if (attempt == 0) {
-                        updateVertexGuid(entity);
-                    } else {
-                        LOG.error("Guid update failed: {}", entityWithExtInfo.getEntity().getGuid());
-                        throw e;
-                    }
-                } catch (Throwable e) {
-                    AtlasBaseException abe = new AtlasBaseException(e);
-                    if (!updateResidualList(abe, residualList, entityWithExtInfo.getEntity().getGuid())) {
-                        throw abe;
-                    }
-
-                    LOG.warn("Exception: {}", entity.getGuid(), e);
-                    break;
-                } finally {
-                    RequestContext.get().clearCache();
-                }
-            }
-        }
-
-        importResult.getProcessedEntities().addAll(processedGuids);
-        LOG.info("bulkImport(): done. Total number of entities (including referred entities) imported: {}", processedGuids.size());
-
-        return ret;
-    }
-
-    @GraphTransaction
-    public void updateVertexGuid(AtlasEntity entity) {
-        String entityGuid = entity.getGuid();
-        AtlasObjectId objectId = entityGraphRetriever.toAtlasObjectIdWithoutGuid(entity);
-
-        AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
-        String vertexGuid = null;
-        try {
-            vertexGuid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, objectId.getUniqueAttributes());
-        } catch (AtlasBaseException e) {
-            LOG.warn("Entity: {}: Does not exist!", objectId);
-            return;
-        }
-
-        if (StringUtils.isEmpty(vertexGuid) || vertexGuid.equals(entityGuid)) {
-            return;
-        }
-
-        AtlasVertex v = AtlasGraphUtilsV2.findByGuid(vertexGuid);
-        if (v == null) {
-            return;
+        if (importResult.getRequest().getOptions() != null &&
+                importResult.getRequest().getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION)) {
+            importStrategy = new MigrationImport(new AtlasGraphProvider(), this.typeRegistry);
+        } else {
+            importStrategy = new RegularImport(this.entityStore, this.typeRegistry);
         }
 
-        addHistoricalGuid(v, vertexGuid);
-        AtlasGraphUtilsV2.setProperty(v, Constants.GUID_PROPERTY_KEY, entityGuid);
-
-        LOG.warn("GUID Updated: Entity: {}: from: {}: to: {}", objectId, vertexGuid, entity.getGuid());
-    }
-
-    private void addHistoricalGuid(AtlasVertex v, String vertexGuid) {
-        String existingJson = AtlasGraphUtilsV2.getProperty(v, HISTORICAL_GUID_PROPERTY_KEY, String.class);
-
-        AtlasGraphUtilsV2.setProperty(v, HISTORICAL_GUID_PROPERTY_KEY, getJsonArray(existingJson, vertexGuid));
+        LOG.info("BulkImportImpl: {}", importStrategy.getClass().getSimpleName());
+        return importStrategy.run(entityStream, importResult);
     }
 
     @VisibleForTesting
@@ -193,38 +85,16 @@ public class BulkImporterImpl implements BulkImporter {
         return json;
     }
 
-    private boolean updateResidualList(AtlasBaseException e, List<String> lineageList, String guid) {
-        if (!e.getAtlasErrorCode().getErrorCode().equals(AtlasErrorCode.INVALID_OBJECT_ID.getErrorCode())) {
-            return false;
-        }
-
-        lineageList.add(guid);
-
-        return true;
-    }
-
-    private float updateImportMetrics(AtlasEntity.AtlasEntityWithExtInfo currentEntity,
-                                      EntityMutationResponse             resp,
-                                      AtlasImportResult                  importResult,
-                                      Set<String>                        processedGuids,
-                                      int currentIndex, int streamSize, float currentPercent) {
-        if (!directoryBasedImportConfigured) {
-            updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
-            updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
-            updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
-        }
-
-        String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid());
-
-        return updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported);
-    }
-
     @VisibleForTesting
-    static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent, String additionalInfo) {
+    public static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent, String additionalInfo) {
         final double tolerance   = 0.000001;
         final int    MAX_PERCENT = 100;
 
         int     maxSize        = (currentIndex <= streamSize) ? streamSize : currentIndex;
+        if (maxSize <= 0) {
+            return currentPercent;
+        }
+
         float   percent        = (float) ((currentIndex * MAX_PERCENT) / maxSize);
         boolean updateLog      = Double.compare(percent, currentPercent) > tolerance;
         float   updatedPercent = (MAX_PERCENT < maxSize) ? percent : ((updateLog) ? ++currentPercent : currentPercent);
@@ -236,7 +106,7 @@ public class BulkImporterImpl implements BulkImporter {
         return updatedPercent;
     }
 
-    private static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) {
+    public static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) {
         if (list == null) {
             return;
         }
@@ -251,41 +121,37 @@ public class BulkImporterImpl implements BulkImporter {
         }
     }
 
-    private static class EntityImportStreamWithResidualList {
-        private final EntityImportStream stream;
-        private final List<String>       residualList;
-        private       boolean            navigateResidualList;
-        private       int                currentResidualListIndex;
-
+    public static void updateVertexGuid(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityGraphRetriever, AtlasEntity entity) {
+        String entityGuid = entity.getGuid();
+        AtlasObjectId objectId = entityGraphRetriever.toAtlasObjectIdWithoutGuid(entity);
 
-        public EntityImportStreamWithResidualList(EntityImportStream stream, List<String> residualList) {
-            this.stream                   = stream;
-            this.residualList             = residualList;
-            this.navigateResidualList     = false;
-            this.currentResidualListIndex = 0;
+        AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
+        String vertexGuid = null;
+        try {
+            vertexGuid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, objectId.getUniqueAttributes());
+        } catch (AtlasBaseException e) {
+            LOG.warn("Entity: {}: Does not exist!", objectId);
+            return;
         }
 
-        public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
-            if (navigateResidualList == false) {
-                return stream.getNextEntityWithExtInfo();
-            } else {
-                stream.setPositionUsingEntityGuid(residualList.get(currentResidualListIndex++));
-                return stream.getNextEntityWithExtInfo();
-            }
+        if (StringUtils.isEmpty(vertexGuid) || vertexGuid.equals(entityGuid)) {
+            return;
         }
 
-        public boolean hasNext() {
-            if (!navigateResidualList) {
-                boolean streamHasNext = stream.hasNext();
-                navigateResidualList = (streamHasNext == false);
-                return streamHasNext ? streamHasNext : (currentResidualListIndex < residualList.size());
-            } else {
-                return (currentResidualListIndex < residualList.size());
-            }
+        AtlasVertex v = AtlasGraphUtilsV2.findByGuid(vertexGuid);
+        if (v == null) {
+            return;
         }
 
-        public int getStreamSize() {
-            return stream.size() + residualList.size();
-        }
+        addHistoricalGuid(v, vertexGuid);
+        AtlasGraphUtilsV2.setProperty(v, Constants.GUID_PROPERTY_KEY, entityGuid);
+
+        LOG.warn("GUID Updated: Entity: {}: from: {}: to: {}", objectId, vertexGuid, entity.getGuid());
+    }
+
+    public static void addHistoricalGuid(AtlasVertex v, String vertexGuid) {
+        String existingJson = AtlasGraphUtilsV2.getProperty(v, HISTORICAL_GUID_PROPERTY_KEY, String.class);
+
+        AtlasGraphUtilsV2.setProperty(v, HISTORICAL_GUID_PROPERTY_KEY, getJsonArray(existingJson, vertexGuid));
     }
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index 291d8b5..d3ba18f 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -39,7 +39,7 @@ import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinali
 import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.RepositoryException;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
-import org.apache.atlas.repository.graph.FullTextMapperV2;
+import org.apache.atlas.repository.graph.IFullTextMapper;
 import org.apache.atlas.repository.graph.GraphHelper;
 import org.apache.atlas.repository.graphdb.AtlasEdge;
 import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
@@ -132,15 +132,15 @@ public class EntityGraphMapper {
     private final DeleteHandlerDelegate     deleteDelegate;
     private final AtlasTypeRegistry         typeRegistry;
     private final AtlasRelationshipStore    relationshipStore;
-    private final AtlasEntityChangeNotifier entityChangeNotifier;
+    private final IAtlasEntityChangeNotifier entityChangeNotifier;
     private final AtlasInstanceConverter    instanceConverter;
     private final EntityGraphRetriever      entityRetriever;
-    private final FullTextMapperV2          fullTextMapperV2;
+    private final IFullTextMapper fullTextMapperV2;
 
     @Inject
     public EntityGraphMapper(DeleteHandlerDelegate deleteDelegate, AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph,
-                             AtlasRelationshipStore relationshipStore, AtlasEntityChangeNotifier entityChangeNotifier,
-                             AtlasInstanceConverter instanceConverter, FullTextMapperV2 fullTextMapperV2) {
+                             AtlasRelationshipStore relationshipStore, IAtlasEntityChangeNotifier entityChangeNotifier,
+                             AtlasInstanceConverter instanceConverter, IFullTextMapper fullTextMapperV2) {
         this.deleteDelegate       = deleteDelegate;
         this.typeRegistry         = typeRegistry;
         this.graph                = atlasGraph;
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IAtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IAtlasEntityChangeNotifier.java
new file mode 100644
index 0000000..c4dc5a1
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IAtlasEntityChangeNotifier.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
+import org.apache.atlas.model.instance.AtlasRelationship;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.notification.EntityNotification;
+
+import java.util.List;
+import java.util.Set;
+
+public interface IAtlasEntityChangeNotifier {
+    void onEntitiesMutated(EntityMutationResponse entityMutationResponse, boolean isImport) throws AtlasBaseException;
+
+    void notifyRelationshipMutation(AtlasRelationship relationship, EntityNotification.EntityNotificationV2.OperationType operationType) throws AtlasBaseException;
+
+    void onClassificationAddedToEntity(AtlasEntity entity, List<AtlasClassification> addedClassifications) throws AtlasBaseException;
+
+    void onClassificationsAddedToEntities(List<AtlasEntity> entities, List<AtlasClassification> addedClassifications) throws AtlasBaseException;
+
+    void onClassificationDeletedFromEntity(AtlasEntity entity, List<AtlasClassification> deletedClassifications) throws AtlasBaseException;
+
+    void onClassificationsDeletedFromEntities(List<AtlasEntity> entities, List<AtlasClassification> deletedClassifications) throws AtlasBaseException;
+
+    void onTermAddedToEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException;
+
+    void onTermDeletedFromEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException;
+
+    void onLabelsUpdatedFromEntity(String entityGuid, Set<String> addedLabels, Set<String> deletedLabels) throws AtlasBaseException;
+
+    void notifyPropagatedEntities() throws AtlasBaseException;
+
+    void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException;
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/EntityChangeNotifierNop.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/EntityChangeNotifierNop.java
new file mode 100644
index 0000000..2943ea9
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/EntityChangeNotifierNop.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2.bulkimport;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
+import org.apache.atlas.model.instance.AtlasRelationship;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.notification.EntityNotification;
+import org.apache.atlas.repository.store.graph.v2.IAtlasEntityChangeNotifier;
+
+import java.util.List;
+import java.util.Set;
+
+public class EntityChangeNotifierNop implements IAtlasEntityChangeNotifier {
+    @Override
+    public void onEntitiesMutated(EntityMutationResponse entityMutationResponse, boolean isImport) throws AtlasBaseException {
+
+    }
+
+    @Override
+    public void notifyRelationshipMutation(AtlasRelationship relationship, EntityNotification.EntityNotificationV2.OperationType operationType) throws AtlasBaseException {
+
+    }
+
+    @Override
+    public void onClassificationAddedToEntity(AtlasEntity entity, List<AtlasClassification> addedClassifications) throws AtlasBaseException {
+
+    }
+
+    @Override
+    public void onClassificationsAddedToEntities(List<AtlasEntity> entities, List<AtlasClassification> addedClassifications) throws AtlasBaseException {
+
+    }
+
+    @Override
+    public void onClassificationDeletedFromEntity(AtlasEntity entity, List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
+
+    }
+
+    @Override
+    public void onClassificationsDeletedFromEntities(List<AtlasEntity> entities, List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
+
+    }
+
+    @Override
+    public void onTermAddedToEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
+
+    }
+
+    @Override
+    public void onTermDeletedFromEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
+
+    }
+
+    @Override
+    public void onLabelsUpdatedFromEntity(String entityGuid, Set<String> addedLabels, Set<String> deletedLabels) throws AtlasBaseException {
+
+    }
+
+    @Override
+    public void notifyPropagatedEntities() throws AtlasBaseException {
+
+    }
+
+    @Override
+    public void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException {
+
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/FullTextMapperV2Nop.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/FullTextMapperV2Nop.java
new file mode 100644
index 0000000..7b7e543
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/FullTextMapperV2Nop.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2.bulkimport;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.repository.graph.IFullTextMapper;
+
+import java.util.List;
+
+public class FullTextMapperV2Nop implements IFullTextMapper {
+    @Override
+    public String getIndexTextForClassifications(String guid, List<AtlasClassification> classifications) throws AtlasBaseException {
+        return null;
+    }
+
+    @Override
+    public String getIndexTextForEntity(String guid) throws AtlasBaseException {
+        return null;
+    }
+
+    @Override
+    public String getClassificationTextForEntity(AtlasEntity entity) throws AtlasBaseException {
+        return null;
+    }
+
+    @Override
+    public AtlasEntity getAndCacheEntity(String guid) throws AtlasBaseException {
+        return null;
+    }
+
+    @Override
+    public AtlasEntity getAndCacheEntity(String guid, boolean includeReferences) throws AtlasBaseException {
+        return null;
+    }
+
+    @Override
+    public AtlasEntity.AtlasEntityWithExtInfo getAndCacheEntityWithExtInfo(String guid) throws AtlasBaseException {
+        return null;
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/ImportStrategy.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/ImportStrategy.java
new file mode 100644
index 0000000..6b70eab
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/ImportStrategy.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.store.graph.v2.bulkimport;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
+
+public abstract class ImportStrategy {
+    public abstract EntityMutationResponse run(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException;
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
new file mode 100644
index 0000000..4c912fd
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.store.graph.v2.bulkimport;
+
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.repository.converters.AtlasFormatConverters;
+import org.apache.atlas.repository.converters.AtlasInstanceConverter;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
+import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
+import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
+import org.apache.atlas.repository.store.graph.v2.AtlasRelationshipStoreV2;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
+import org.apache.atlas.repository.store.graph.v2.IAtlasEntityChangeNotifier;
+import org.apache.atlas.repository.store.graph.v2.bulkimport.pc.EntityConsumerBuilder;
+import org.apache.atlas.repository.store.graph.v2.bulkimport.pc.EntityCreationManager;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MigrationImport extends ImportStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(MigrationImport.class);
+
+    private final AtlasTypeRegistry typeRegistry;
+    private AtlasGraph atlasGraph;
+    private EntityGraphRetriever entityGraphRetriever;
+    private EntityGraphMapper entityGraphMapper;
+    private AtlasEntityStore entityStore;
+
+    public MigrationImport(AtlasGraphProvider atlasGraphProvider, AtlasTypeRegistry typeRegistry) {
+        this.typeRegistry = typeRegistry;
+        setupEntityStore(atlasGraphProvider, typeRegistry);
+        LOG.info("MigrationImport: Using bulkLoading...");
+    }
+
+    public EntityMutationResponse run(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
+        if (entityStream == null || !entityStream.hasNext()) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
+        }
+
+        if (importResult.getRequest() == null) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "importResult should contain request");
+        }
+
+        int index = 0;
+        int streamSize = entityStream.size();
+        EntityMutationResponse ret = new EntityMutationResponse();
+        EntityCreationManager creationManager = createEntityCreationManager(atlasGraph, importResult);
+
+        try {
+            LOG.info("Migration Import: Size: {}: Starting...", streamSize);
+            index = creationManager.read(entityStream);
+            creationManager.drain();
+            creationManager.extractResults();
+        } catch (Exception ex) {
+            LOG.error("Migration Import: Error: Current position: {}", index, ex);
+        } finally {
+            shutdownEntityCreationManager(creationManager);
+        }
+
+        LOG.info("Migration Import: Size: {}: Done!", streamSize);
+        return ret;
+    }
+
+    private EntityCreationManager createEntityCreationManager(AtlasGraph threadedAtlasGraph, AtlasImportResult importResult) {
+        int batchSize = importResult.getRequest().getOptionKeyBatchSize();
+        int numWorkers = getNumWorkers(importResult.getRequest().getOptionKeyNumWorkers());
+
+        EntityConsumerBuilder consumerBuilder =
+                new EntityConsumerBuilder(threadedAtlasGraph, entityStore, entityGraphRetriever, typeRegistry, batchSize);
+
+        return new EntityCreationManager(consumerBuilder, batchSize, numWorkers, importResult);
+    }
+
+    private static int getNumWorkers(int numWorkersFromOptions) {
+        int ret = (numWorkersFromOptions > 0) ? numWorkersFromOptions : 1;
+        LOG.info("Migration Import: Setting numWorkers: {}", ret);
+        return ret;
+    }
+
+    private void setupEntityStore(AtlasGraphProvider atlasGraphProvider, AtlasTypeRegistry typeRegistry) {
+        this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
+        this.atlasGraph = atlasGraphProvider.getBulkLoading();
+        DeleteHandlerDelegate deleteDelegate = new DeleteHandlerDelegate(typeRegistry);
+
+        IAtlasEntityChangeNotifier entityChangeNotifier = new EntityChangeNotifierNop();
+        AtlasRelationshipStore relationshipStore = new AtlasRelationshipStoreV2(typeRegistry, deleteDelegate, entityChangeNotifier);
+        AtlasFormatConverters formatConverters = new AtlasFormatConverters(typeRegistry);
+        AtlasInstanceConverter instanceConverter = new AtlasInstanceConverter(typeRegistry, formatConverters);
+        this.entityGraphMapper = new EntityGraphMapper(deleteDelegate, typeRegistry, atlasGraph, relationshipStore, entityChangeNotifier, instanceConverter, new FullTextMapperV2Nop());
+        this.entityStore = new AtlasEntityStoreV2(deleteDelegate, typeRegistry, entityChangeNotifier, entityGraphMapper);
+    }
+
+    private void shutdownEntityCreationManager(EntityCreationManager creationManager) {
+        try {
+            creationManager.shutdown();
+        } catch (InterruptedException e) {
+            LOG.error("Migration Import: Shutdown: Interrupted!", e);
+        }
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImport.java
similarity index 80%
copy from repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
copy to repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImport.java
index 54c32c5..ecce1b0 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImport.java
@@ -6,16 +6,18 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.atlas.repository.store.graph.v2;
+
+package org.apache.atlas.repository.store.graph.v2.bulkimport;
+
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.AtlasConfiguration;
@@ -26,22 +28,23 @@ import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.impexp.AtlasImportResult;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
-import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
-import org.apache.atlas.repository.store.graph.BulkImporter;
+import org.apache.atlas.repository.store.graph.v2.AtlasEntityStreamForImport;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
 
-import javax.inject.Inject;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -49,27 +52,25 @@ import java.util.List;
 import java.util.Set;
 
 import static org.apache.atlas.repository.Constants.HISTORICAL_GUID_PROPERTY_KEY;
+import static org.apache.atlas.repository.store.graph.v2.BulkImporterImpl.updateImportProgress;
 
-@Component
-public class BulkImporterImpl implements BulkImporter {
-    private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV2.class);
-
+public class RegularImport extends ImportStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(RegularImport.class);
+    private static final int MAX_ATTEMPTS = 3;
     private final AtlasEntityStore entityStore;
+    private final AtlasTypeRegistry typeRegistry;
     private final EntityGraphRetriever entityGraphRetriever;
-    private AtlasTypeRegistry typeRegistry;
-    private final int MAX_ATTEMPTS = 2;
     private boolean directoryBasedImportConfigured;
 
-    @Inject
-    public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
+    public RegularImport(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
         this.entityStore = entityStore;
-        this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
         this.typeRegistry = typeRegistry;
+        this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
         this.directoryBasedImportConfigured = StringUtils.isNotEmpty(AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
     }
 
     @Override
-    public EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
+    public EntityMutationResponse run(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("==> bulkImport()");
         }
@@ -81,7 +82,7 @@ public class BulkImporterImpl implements BulkImporter {
         EntityMutationResponse ret = new EntityMutationResponse();
         ret.setGuidAssignments(new HashMap<>());
 
-        Set<String>  processedGuids = new HashSet<>();
+        Set<String> processedGuids = new HashSet<>();
         float        currentPercent = 0f;
         List<String> residualList   = new ArrayList<>();
 
@@ -209,9 +210,9 @@ public class BulkImporterImpl implements BulkImporter {
                                       Set<String>                        processedGuids,
                                       int currentIndex, int streamSize, float currentPercent) {
         if (!directoryBasedImportConfigured) {
-            updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
-            updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
-            updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
+            BulkImporterImpl.updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
+            BulkImporterImpl.updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
+            BulkImporterImpl.updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
         }
 
         String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid());
@@ -219,38 +220,6 @@ public class BulkImporterImpl implements BulkImporter {
         return updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported);
     }
 
-    @VisibleForTesting
-    static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent, String additionalInfo) {
-        final double tolerance   = 0.000001;
-        final int    MAX_PERCENT = 100;
-
-        int     maxSize        = (currentIndex <= streamSize) ? streamSize : currentIndex;
-        float   percent        = (float) ((currentIndex * MAX_PERCENT) / maxSize);
-        boolean updateLog      = Double.compare(percent, currentPercent) > tolerance;
-        float   updatedPercent = (MAX_PERCENT < maxSize) ? percent : ((updateLog) ? ++currentPercent : currentPercent);
-
-        if (updateLog) {
-            log.info("bulkImport(): progress: {}% (of {}) - {}", (int) Math.ceil(percent), maxSize, additionalInfo);
-        }
-
-        return updatedPercent;
-    }
-
-    private static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) {
-        if (list == null) {
-            return;
-        }
-
-        for (AtlasEntityHeader h : list) {
-            if (processedGuids.contains(h.getGuid())) {
-                continue;
-            }
-
-            processedGuids.add(h.getGuid());
-            importResult.incrementMeticsCounter(String.format(prefix, h.getTypeName()));
-        }
-    }
-
     private static class EntityImportStreamWithResidualList {
         private final EntityImportStream stream;
         private final List<String>       residualList;
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
new file mode 100644
index 0000000..e8f4b02
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2.bulkimport.pc;
+
+import org.apache.atlas.GraphTransactionInterceptor;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.pc.WorkItemConsumer;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.repository.store.graph.v2.AtlasEntityStreamForImport;
+import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWithExtInfo> {
+    private static final Logger LOG = LoggerFactory.getLogger(EntityConsumer.class);
+    private static final int MAX_COMMIT_RETRY_COUNT = 3;
+
+    private final int batchSize;
+    private AtomicLong counter = new AtomicLong(1);
+    private AtomicLong currentBatch = new AtomicLong(1);
+
+    private final AtlasGraph atlasGraph;
+    private final AtlasEntityStore entityStoreV2;
+    private final AtlasTypeRegistry typeRegistry;
+    private final EntityGraphRetriever entityGraphRetriever;
+
+    private List<AtlasEntity.AtlasEntityWithExtInfo> entityBuffer = new ArrayList<>();
+    private List<EntityMutationResponse> localResults = new ArrayList<>();
+
+    public EntityConsumer(AtlasGraph atlasGraph, AtlasEntityStore entityStore,
+                          EntityGraphRetriever entityGraphRetriever, AtlasTypeRegistry typeRegistry,
+                          BlockingQueue queue, int batchSize) {
+        super(queue);
+
+        this.atlasGraph = atlasGraph;
+        this.entityStoreV2 = entityStore;
+        this.entityGraphRetriever = entityGraphRetriever;
+        this.typeRegistry = typeRegistry;
+        this.batchSize = batchSize;
+    }
+
+    @Override
+    protected void processItem(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
+        int delta = (MapUtils.isEmpty(entityWithExtInfo.getReferredEntities())
+                ? 1
+                : entityWithExtInfo.getReferredEntities().size()) + 1;
+
+        long currentCount = counter.addAndGet(delta);
+        currentBatch.addAndGet(delta);
+        entityBuffer.add(entityWithExtInfo);
+
+        try {
+            processEntity(entityWithExtInfo, currentCount);
+            attemptCommit();
+        } catch (Exception e) {
+            LOG.info("Data loss: Please re-submit!", e);
+        }
+    }
+
+    private void processEntity(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo, long currentCount) {
+        try {
+            RequestContext.get().setImportInProgress(true);
+            AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null);
+
+            LOG.debug("Processing: {}", currentCount);
+            EntityMutationResponse result = entityStoreV2.createOrUpdateForImportNoCommit(oneEntityStream);
+            localResults.add(result);
+        } catch (AtlasBaseException e) {
+            addResult(entityWithExtInfo.getEntity().getGuid());
+            LOG.warn("Exception: {}", entityWithExtInfo.getEntity().getGuid(), e);
+        } catch (AtlasSchemaViolationException e) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Entity: {}", entityWithExtInfo.getEntity().getGuid(), e);
+            }
+
+            BulkImporterImpl.updateVertexGuid(typeRegistry, entityGraphRetriever, entityWithExtInfo.getEntity());
+        }
+    }
+
+    private void attemptCommit() {
+        if (currentBatch.get() < batchSize) {
+            return;
+        }
+
+        doCommit();
+    }
+
+    @Override
+    protected void doCommit() {
+        for (int retryCount = 1; retryCount <= MAX_COMMIT_RETRY_COUNT; retryCount++) {
+            if (commitWithRetry(retryCount)) {
+                return;
+            }
+        }
+
+        LOG.error("Retries exceeded! Potential data loss! Please correct data and re-attempt. Buffer: {}: Counter: {}", entityBuffer.size(), counter.get());
+        clear();
+    }
+
+    @Override
+    protected void commitDirty() {
+        super.commitDirty();
+        LOG.info("Total: Commit: {}", counter.get());
+        counter.set(0);
+    }
+
+    private boolean commitWithRetry(int retryCount) {
+        try {
+            atlasGraph.commit();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Commit: Done!: Buffer: {}: Batch: {}: Counter: {}", entityBuffer.size(), currentBatch.get(), counter.get());
+            }
+
+            dispatchResults();
+            return true;
+        } catch (Exception ex) {
+            rollbackPauseRetry(retryCount, ex);
+            return false;
+        }
+    }
+
+    private void rollbackPauseRetry(int retryCount, Exception ex) {
+        atlasGraph.rollback();
+        clearCache();
+
+        LOG.error("Rollback: Done! Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount);
+        pause(retryCount);
+        LOG.warn("Commit error! Will pause and retry: Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount, ex);
+        retryProcessEntity(retryCount);
+    }
+
+    private void retryProcessEntity(int retryCount) {
+        LOG.info("Replaying: Starting!: Buffer: {}: Retry count: {}", entityBuffer.size(), retryCount);
+        for (AtlasEntity.AtlasEntityWithExtInfo e : entityBuffer) {
+            processEntity(e, counter.get());
+        }
+        LOG.info("Replaying: Done!: Buffer: {}: Retry count: {}", entityBuffer.size(), retryCount);
+    }
+
+    private void dispatchResults() {
+        localResults.stream().forEach(x -> {
+            addResultsFromResponse(x.getCreatedEntities());
+            addResultsFromResponse(x.getUpdatedEntities());
+            addResultsFromResponse(x.getDeletedEntities());
+        });
+
+        clear();
+    }
+
+    private void pause(int retryCount) {
+        try {
+            Thread.sleep(1000 * retryCount);
+        } catch (InterruptedException e) {
+            LOG.error("pause: Interrupted!", e);
+        }
+    }
+
+    private void addResultsFromResponse(List<AtlasEntityHeader> entities) {
+        if (CollectionUtils.isEmpty(entities)) {
+            return;
+        }
+
+        for (AtlasEntityHeader eh : entities) {
+            addResult(eh.getGuid());
+        }
+    }
+
+    private void clear() {
+        localResults.clear();
+        entityBuffer.clear();
+        clearCache();
+        currentBatch.set(0);
+    }
+
+    private void clearCache() {
+        GraphTransactionInterceptor.clearCache();
+        RequestContext.get().clearCache();
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java
new file mode 100644
index 0000000..69d33b2
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.store.graph.v2.bulkimport.pc;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.pc.WorkItemBuilder;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import org.apache.atlas.type.AtlasTypeRegistry;
+
+import java.util.concurrent.BlockingQueue;
+
+public class EntityConsumerBuilder implements WorkItemBuilder<EntityConsumer, AtlasEntity.AtlasEntityWithExtInfo> {
+    private AtlasGraph atlasGraph;
+    private AtlasEntityStore entityStore;
+    private final EntityGraphRetriever entityGraphRetriever;
+    private final AtlasTypeRegistry typeRegistry;
+    private int batchSize;
+
+    public EntityConsumerBuilder(AtlasGraph atlasGraph, AtlasEntityStore entityStore,
+                                 EntityGraphRetriever entityGraphRetriever, AtlasTypeRegistry typeRegistry, int batchSize) {
+        this.atlasGraph = atlasGraph;
+        this.entityStore = entityStore;
+        this.entityGraphRetriever = entityGraphRetriever;
+        this.typeRegistry = typeRegistry;
+        this.batchSize = batchSize;
+    }
+
+    @Override
+    public EntityConsumer build(BlockingQueue<AtlasEntity.AtlasEntityWithExtInfo> queue) {
+        return new EntityConsumer(atlasGraph, entityStore, entityGraphRetriever, typeRegistry, queue, this.batchSize);
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
new file mode 100644
index 0000000..16bb49e
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.store.graph.v2.bulkimport.pc;
+
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.pc.StatusReporter;
+import org.apache.atlas.pc.WorkItemBuilder;
+import org.apache.atlas.pc.WorkItemManager;
+import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
+import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManager {
+    private static final Logger LOG = LoggerFactory.getLogger(EntityCreationManager.class);
+    private static final String WORKER_PREFIX = "migration-import";
+
+    private final StatusReporter<String, String> statusReporter;
+    private final AtlasImportResult importResult;
+    private String currentTypeName;
+    private float currentPercent;
+    private EntityImportStream entityImportStream;
+
+    public EntityCreationManager(WorkItemBuilder builder, int batchSize, int numWorkers, AtlasImportResult importResult) {
+        super(builder, WORKER_PREFIX, batchSize, numWorkers, true);
+        this.importResult = importResult;
+
+        this.statusReporter = new StatusReporter<>();
+    }
+
+    public int read(EntityImportStream entityStream) {
+        int currentIndex = 0;
+        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo;
+        this.entityImportStream = entityStream;
+        while ((entityWithExtInfo = entityStream.getNextEntityWithExtInfo()) != null) {
+            AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
+            if (entity == null) {
+                continue;
+            }
+
+            try {
+                produce(currentIndex++, entity.getTypeName(), entityWithExtInfo);
+            } catch (Throwable e) {
+                LOG.warn("Exception: {}", entity.getGuid(), e);
+                break;
+            }
+        }
+        return currentIndex;
+    }
+
+    private void produce(int currentIndex, String typeName, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
+        String previousTypeName = getCurrentTypeName();
+
+        if (StringUtils.isNotEmpty(typeName)
+                && StringUtils.isNotEmpty(previousTypeName)
+                && !StringUtils.equals(previousTypeName, typeName)) {
+            LOG.info("Waiting: '{}' to complete...", previousTypeName);
+            super.drain();
+            LOG.info("Switching entity type processing: From: '{}' To: '{}'...", previousTypeName, typeName);
+        }
+
+        setCurrentTypeName(typeName);
+        statusReporter.produced(entityWithExtInfo.getEntity().getGuid(), String.format("%s:%s", entityWithExtInfo.getEntity().getTypeName(), currentIndex));
+        super.checkProduce(entityWithExtInfo);
+        extractResults();
+    }
+
+    public void extractResults() {
+        Object result;
+        while (((result = getResults().poll())) != null) {
+            statusReporter.processed((String) result);
+        }
+
+        logStatus();
+    }
+
+    private void logStatus() {
+        String ack = statusReporter.ack();
+        if (StringUtils.isEmpty(ack)) {
+            return;
+        }
+
+        String[] split = ack.split(":");
+        if (split.length == 0 || split.length < 2) {
+            return;
+        }
+
+        importResult.incrementMeticsCounter(split[0]);
+        this.currentPercent = updateImportMetrics(split[0], Integer.parseInt(split[1]), this.entityImportStream.size(), getCurrentPercent());
+    }
+
+    private static float updateImportMetrics(String typeNameGuid, int currentIndex, int streamSize, float currentPercent) {
+        String lastEntityImported = String.format("entity:last-imported:%s:(%s)", typeNameGuid, currentIndex);
+        return BulkImporterImpl.updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported);
+    }
+
+    private String getCurrentTypeName() {
+        return this.currentTypeName;
+    }
+
+    private void setCurrentTypeName(String typeName) {
+        this.currentTypeName = typeName;
+    }
+
+    private float getCurrentPercent() {
+        return this.currentPercent;
+    }
+}
diff --git a/repository/src/test/java/org/apache/atlas/TestModules.java b/repository/src/test/java/org/apache/atlas/TestModules.java
index 06e0ebc..a298934 100644
--- a/repository/src/test/java/org/apache/atlas/TestModules.java
+++ b/repository/src/test/java/org/apache/atlas/TestModules.java
@@ -36,7 +36,9 @@ import org.apache.atlas.listener.TypeDefChangeListener;
 import org.apache.atlas.repository.audit.EntityAuditListener;
 import org.apache.atlas.repository.audit.EntityAuditListenerV2;
 import org.apache.atlas.repository.audit.EntityAuditRepository;
+import org.apache.atlas.repository.graph.FullTextMapperV2;
 import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
+import org.apache.atlas.repository.graph.IFullTextMapper;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.GraphDBMigrator;
 import org.apache.atlas.repository.graphdb.janus.migration.GraphDBGraphSONMigrator;
@@ -61,6 +63,7 @@ import org.apache.atlas.repository.store.graph.v2.AtlasRelationshipStoreV2;
 import org.apache.atlas.repository.store.graph.v2.AtlasTypeDefGraphStoreV2;
 import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
 import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
+import org.apache.atlas.repository.store.graph.v2.IAtlasEntityChangeNotifier;
 import org.apache.atlas.runner.LocalSolrRunner;
 import org.apache.atlas.service.Service;
 import org.apache.atlas.store.AtlasTypeDefStore;
@@ -144,6 +147,8 @@ public class TestModules {
 
             bind(AtlasEntityStore.class).to(AtlasEntityStoreV2.class);
             bind(AtlasRelationshipStore.class).to(AtlasRelationshipStoreV2.class);
+            bind(IAtlasEntityChangeNotifier.class).to(AtlasEntityChangeNotifier.class);
+            bind(IFullTextMapper.class).to(FullTextMapperV2.class);
 
             // bind the DiscoveryService interface to an implementation
             bind(AtlasDiscoveryService.class).to(EntityDiscoveryService.class).asEagerSingleton();