You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2020/02/21 05:27:47 UTC

[atlas] branch master updated (e159f76 -> 54042d3)

This is an automated email from the ASF dual-hosted git repository.

amestry pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git.


    from e159f76  ATLAS-3623 : Beta UI: Basic search improvement to search for namespace attribute: Bug fix for column selection
     new a2ccfb9  ATLAS-3320: Import Service. Support concurrent ingest.
     new 54042d3  DataMigration: Automatic resume.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 addons/models/0000-Area0/0010-base_model.json      |  50 ++++
 .../repository/graphdb/janus/AtlasJanusGraph.java  |   2 +-
 .../java/org/apache/atlas/AtlasConfiguration.java  |   1 +
 .../atlas/model/impexp/AtlasImportRequest.java     |  43 +++-
 .../MigrationImportStatus.java}                    |  78 +++---
 .../java/org/apache/atlas/pc/WorkItemConsumer.java |  11 +-
 .../java/org/apache/atlas/pc/WorkItemManager.java  |   9 +-
 .../apache/atlas/GraphTransactionInterceptor.java  |   4 +
 .../atlas/repository/impexp/AuditsWriter.java      |   3 +-
 .../atlas/repository/impexp/ImportService.java     |  22 +-
 .../repository/impexp/ZipExportFileNames.java      |   4 +
 .../atlas/repository/impexp/ZipSourceDirect.java   | 269 +++++++++++++++++++++
 .../repository/migration/DataMigrationService.java |   4 +-
 .../migration/DataMigrationStatusService.java      | 104 ++++++++
 .../migration/ZipFileMigrationImporter.java        |  81 ++++++-
 .../repository/ogm/MigrationImportStatusDTO.java   | 103 ++++++++
 .../repository/patches/UniqueAttributePatch.java   |   4 +-
 .../repository/store/graph/AtlasEntityStore.java   |   8 +
 .../store/graph/v2/AtlasEntityStoreV2.java         |  11 +-
 .../store/graph/v2/AtlasRelationshipStoreV2.java   |   4 +
 .../store/graph/v2/BulkImporterImpl.java           | 232 ++++--------------
 .../store/graph/v2/EntityGraphMapper.java          |  41 +++-
 .../bulkimport/ImportStrategy.java}                |  11 +-
 .../store/graph/v2/bulkimport/MigrationImport.java | 125 ++++++++++
 .../RegularImport.java}                            |  76 ++----
 .../graph/v2/bulkimport/pc/EntityConsumer.java     | 213 ++++++++++++++++
 .../v2/bulkimport/pc/EntityConsumerBuilder.java    |  50 ++++
 .../v2/bulkimport/pc/EntityCreationManager.java    | 135 +++++++++++
 .../graph/v2/bulkimport/pc/StatusReporter.java     | 131 ++++++++++
 .../atlas/repository/impexp/ImportServiceTest.java |  16 ++
 .../repository/impexp/MigrationImportTest.java     |  77 ++++++
 .../repository/impexp/StatusReporterTest.java      |  99 ++++++++
 .../atlas/repository/impexp/ZipDirectTest.java     |  61 +++++
 .../impexp/ZipFileResourceTestUtils.java           |   7 +-
 .../src/test/resources/zip-direct-1.zip            | Bin
 repository/src/test/resources/zip-direct-2.zip     | Bin 0 -> 1720553 bytes
 36 files changed, 1759 insertions(+), 330 deletions(-)
 copy intg/src/main/java/org/apache/atlas/model/{profile/AtlasUserProfile.java => migration/MigrationImportStatus.java} (54%)
 create mode 100644 repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
 create mode 100644 repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java
 create mode 100644 repository/src/main/java/org/apache/atlas/repository/ogm/MigrationImportStatusDTO.java
 copy repository/src/main/java/org/apache/atlas/repository/store/graph/{EntityResolver.java => v2/bulkimport/ImportStrategy.java} (66%)
 create mode 100644 repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
 copy repository/src/main/java/org/apache/atlas/repository/store/graph/v2/{BulkImporterImpl.java => bulkimport/RegularImport.java} (80%)
 create mode 100644 repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
 create mode 100644 repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java
 create mode 100644 repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
 create mode 100644 repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/StatusReporter.java
 create mode 100644 repository/src/test/java/org/apache/atlas/repository/impexp/MigrationImportTest.java
 create mode 100644 repository/src/test/java/org/apache/atlas/repository/impexp/StatusReporterTest.java
 create mode 100644 repository/src/test/java/org/apache/atlas/repository/impexp/ZipDirectTest.java
 copy webapp/src/test/resources/empty-1.zip => repository/src/test/resources/zip-direct-1.zip (100%)
 create mode 100644 repository/src/test/resources/zip-direct-2.zip


[atlas] 01/02: ATLAS-3320: Import Service. Support concurrent ingest.

Posted by am...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit a2ccfb9f3577e911103041d8d4b91c169697f6a1
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Thu Feb 20 11:46:37 2020 -0800

    ATLAS-3320: Import Service. Support concurrent ingest.
---
 .../repository/graphdb/janus/AtlasJanusGraph.java  |   2 +-
 .../java/org/apache/atlas/AtlasConfiguration.java  |   1 +
 .../atlas/model/impexp/AtlasImportRequest.java     |  43 +++-
 .../java/org/apache/atlas/pc/WorkItemConsumer.java |  11 +-
 .../java/org/apache/atlas/pc/WorkItemManager.java  |   9 +-
 .../apache/atlas/GraphTransactionInterceptor.java  |   4 +
 .../atlas/repository/impexp/AuditsWriter.java      |   3 +-
 .../atlas/repository/impexp/ImportService.java     |  22 +-
 .../repository/impexp/ZipExportFileNames.java      |   4 +
 .../atlas/repository/impexp/ZipSourceDirect.java   | 269 +++++++++++++++++++++
 .../migration/ZipFileMigrationImporter.java        |  58 ++++-
 .../repository/patches/UniqueAttributePatch.java   |   4 +-
 .../repository/store/graph/AtlasEntityStore.java   |   8 +
 .../store/graph/v2/AtlasEntityStoreV2.java         |  11 +-
 .../store/graph/v2/AtlasRelationshipStoreV2.java   |   4 +
 .../store/graph/v2/BulkImporterImpl.java           | 228 ++++-------------
 .../store/graph/v2/EntityGraphMapper.java          |  41 +++-
 .../graph/v2/bulkimport/ImportStrategy.java}       |  22 +-
 .../store/graph/v2/bulkimport/MigrationImport.java | 122 ++++++++++
 .../RegularImport.java}                            |  76 ++----
 .../graph/v2/bulkimport/pc/EntityConsumer.java     | 213 ++++++++++++++++
 .../v2/bulkimport/pc/EntityConsumerBuilder.java    |  50 ++++
 .../v2/bulkimport/pc/EntityCreationManager.java    | 130 ++++++++++
 .../graph/v2/bulkimport/pc/StatusReporter.java     | 131 ++++++++++
 .../atlas/repository/impexp/ImportServiceTest.java |  16 ++
 .../repository/impexp/MigrationImportTest.java     |  77 ++++++
 .../repository/impexp/StatusReporterTest.java      |  99 ++++++++
 .../atlas/repository/impexp/ZipDirectTest.java     |  61 +++++
 .../impexp/ZipFileResourceTestUtils.java           |   7 +-
 repository/src/test/resources/zip-direct-1.zip     | Bin 0 -> 22 bytes
 repository/src/test/resources/zip-direct-2.zip     | Bin 0 -> 1720553 bytes
 31 files changed, 1432 insertions(+), 294 deletions(-)

diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
index 4acb371..0176ba7 100644
--- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
+++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
@@ -116,7 +116,7 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE
             }
         }
 
-        janusGraph = (StandardJanusGraph) AtlasJanusGraphDatabase.getGraphInstance();
+        janusGraph = (StandardJanusGraph) graphInstance;
     }
 
     @Override
diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index 1a0d0cc..f8d7f8c 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -64,6 +64,7 @@ public enum AtlasConfiguration {
     CUSTOM_ATTRIBUTE_VALUE_MAX_LENGTH("atlas.custom.attribute.value.max.length", 500),
     LABEL_MAX_LENGTH("atlas.entity.label.max.length", 50),
     IMPORT_TEMP_DIRECTORY("atlas.import.temp.directory", ""),
+    MIGRATION_IMPORT_START_POSITION("atlas.migration.import.start.position", 0),
     LINEAGE_USING_GREMLIN("atlas.lineage.query.use.gremlin", false);
 
     private static final Configuration APPLICATION_PROPERTIES;
diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
index 0b3ede9..0ad3673 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
@@ -44,10 +44,16 @@ public class AtlasImportRequest implements Serializable {
     public  static final String TRANSFORMS_KEY             = "transforms";
     public  static final String TRANSFORMERS_KEY           = "transformers";
     public  static final String OPTION_KEY_REPLICATED_FROM = "replicatedFrom";
-    private static final String START_POSITION_KEY         = "startPosition";
+    public  static final String OPTION_KEY_MIGRATION       = "migration";
+    public  static final String OPTION_KEY_NUM_WORKERS     = "numWorkers";
+    public  static final String OPTION_KEY_BATCH_SIZE      = "batchSize";
+    public  static final String OPTION_KEY_FORMAT          = "format";
+    public  static final String OPTION_KEY_FORMAT_ZIP_DIRECT = "zipDirect";
+    public  static final String START_POSITION_KEY         = "startPosition";
     private static final String START_GUID_KEY             = "startGuid";
     private static final String FILE_NAME_KEY              = "fileName";
     private static final String UPDATE_TYPE_DEFINITION_KEY = "updateTypeDefinition";
+    private static final String OPTION_KEY_STREAM_SIZE     = "size";
 
     private Map<String, String> options;
 
@@ -108,7 +114,7 @@ public class AtlasImportRequest implements Serializable {
             return null;
         }
 
-        return (String) this.options.get(key);
+        return this.options.get(key);
     }
 
     @JsonIgnore
@@ -121,10 +127,41 @@ public class AtlasImportRequest implements Serializable {
         return isReplicationOptionSet() ? options.get(OPTION_KEY_REPLICATED_FROM) : StringUtils.EMPTY;
     }
 
+    @JsonIgnore
+    public int getOptionKeyNumWorkers() {
+        return getOptionsValue(OPTION_KEY_NUM_WORKERS, 1);
+    }
+
+    @JsonIgnore
+    public int getOptionKeyBatchSize() {
+        return getOptionsValue(OPTION_KEY_BATCH_SIZE, 1);
+    }
+
+    private int getOptionsValue(String optionKeyBatchSize, int defaultValue) {
+        String optionsValue = getOptionForKey(optionKeyBatchSize);
+
+        return StringUtils.isEmpty(optionsValue) ?
+                defaultValue :
+                Integer.valueOf(optionsValue);
+    }
+
     @JsonAnySetter
     public void setOption(String key, String value) {
         if (null == options) {
             options = new HashMap<>();
         }
         options.put(key, value);
-    }}
+    }
+
+    public void setSizeOption(int size) {
+        setOption(OPTION_KEY_STREAM_SIZE, Integer.toString(size));
+    }
+
+    public int getSizeOption() {
+        if (!this.options.containsKey(OPTION_KEY_STREAM_SIZE)) {
+            return 1;
+        }
+
+        return Integer.valueOf(this.options.get(OPTION_KEY_STREAM_SIZE));
+    }
+}
diff --git a/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java b/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
index 9ba4bf4..dd76697 100644
--- a/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
+++ b/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
@@ -21,6 +21,7 @@ package org.apache.atlas.pc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -37,7 +38,7 @@ public abstract class WorkItemConsumer<T> implements Runnable {
     private final AtomicBoolean    isDirty           = new AtomicBoolean(false);
     private final AtomicLong       maxCommitTimeInMs = new AtomicLong(DEFAULT_COMMIT_TIME_IN_MS);
     private CountDownLatch         countdownLatch;
-    private BlockingQueue<Object>  results;
+    private Queue<Object>          results;
 
     public WorkItemConsumer(BlockingQueue<T> queue) {
         this.queue          = queue;
@@ -101,11 +102,7 @@ public abstract class WorkItemConsumer<T> implements Runnable {
     protected abstract void processItem(T item);
 
     protected void addResult(Object value) {
-        try {
-            results.put(value);
-        } catch (InterruptedException e) {
-            LOG.error("Interrupted while adding result: {}", value);
-        }
+        results.add(value);
     }
 
     protected void updateCommitTime(long commitTime) {
@@ -118,7 +115,7 @@ public abstract class WorkItemConsumer<T> implements Runnable {
         this.countdownLatch = countdownLatch;
     }
 
-    public <V> void setResults(BlockingQueue<Object> queue) {
+    public <V> void setResults(Queue<Object> queue) {
         this.results = queue;
     }
 }
diff --git a/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java b/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java
index a7ba67c..351421e 100644
--- a/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java
+++ b/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.*;
 
 public class WorkItemManager<T, U extends WorkItemConsumer> {
@@ -33,7 +34,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
     private final ExecutorService  service;
     private final List<U>          consumers = new ArrayList<>();
     private CountDownLatch         countdownLatch;
-    private BlockingQueue<Object>  resultsQueue;
+    private Queue<Object>          resultsQueue;
 
     public WorkItemManager(WorkItemBuilder builder, String namePrefix, int batchSize, int numWorkers, boolean collectResults) {
         this.numWorkers = numWorkers;
@@ -49,13 +50,13 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
         this(builder, "workItemConsumer", batchSize, numWorkers, false);
     }
 
-    public void setResultsCollection(BlockingQueue<Object> resultsQueue) {
+    public void setResultsCollection(Queue<Object> resultsQueue) {
         this.resultsQueue = resultsQueue;
     }
 
     private void createConsumers(WorkItemBuilder builder, int numWorkers, boolean collectResults) {
         if (collectResults) {
-            setResultsCollection(new LinkedBlockingQueue<>());
+            setResultsCollection(new ConcurrentLinkedQueue<>());
         }
 
         for (int i = 0; i < numWorkers; i++) {
@@ -124,7 +125,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
         LOG.info("WorkItemManager: Shutdown done!");
     }
 
-    public BlockingQueue getResults() {
+    public Queue getResults() {
         return this.resultsQueue;
     }
 
diff --git a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
index bbe0dc5..57e454a 100644
--- a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
+++ b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
@@ -199,6 +199,10 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
         return cache.get(guid);
     }
 
+    public static void clearCache() {
+        guidVertexCache.get().clear();
+    }
+
     boolean logException(Throwable t) {
         if (t instanceof AtlasBaseException) {
             Response.Status httpCode = ((AtlasBaseException) t).getAtlasErrorCode().getHttpCode();
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
index 55990f7..373921d 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
@@ -247,7 +247,8 @@ public class AuditsWriter {
             }
 
             updateReplicationAttribute(replicationOptionState, sourceServerName, sourceServerFullName, entityGuids,
-                    Constants.ATTR_NAME_REPLICATED_FROM, result.getExportResult().getChangeMarker());
+                    Constants.ATTR_NAME_REPLICATED_FROM,
+                    (result.getExportResult() != null) ? result.getExportResult().getChangeMarker() : 0);
         }
 
         public void add(String userName, String sourceCluster, long startTime,
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index 27001e3..cd1deab 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -92,7 +92,7 @@ public class ImportService {
             request = new AtlasImportRequest();
         }
 
-        EntityImportStream source = createZipSource(inputStream, AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
+        EntityImportStream source = createZipSource(request, inputStream, AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
         return run(source, request, userName, hostName, requestingIP);
     }
 
@@ -248,8 +248,18 @@ public class ImportService {
         return (int) (endTime - startTime);
     }
 
-    private EntityImportStream createZipSource(InputStream inputStream, String configuredTemporaryDirectory) throws AtlasBaseException {
+    private EntityImportStream createZipSource(AtlasImportRequest request, InputStream inputStream, String configuredTemporaryDirectory) throws AtlasBaseException {
         try {
+            if (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION)) {
+                LOG.info("Migration mode: Detected...", request.getOptions().get("size"));
+                return getZipDirectEntityImportStream(request, inputStream);
+            }
+
+            if (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_FORMAT) &&
+                    request.getOptions().get(AtlasImportRequest.OPTION_KEY_FORMAT).equals(AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT) ) {
+                return getZipDirectEntityImportStream(request, inputStream);
+            }
+
             if (StringUtils.isEmpty(configuredTemporaryDirectory)) {
                 return new ZipSource(inputStream);
             }
@@ -260,9 +270,15 @@ public class ImportService {
         }
     }
 
+    private EntityImportStream getZipDirectEntityImportStream(AtlasImportRequest request, InputStream inputStream) throws IOException, AtlasBaseException {
+        ZipSourceDirect zipSourceDirect = new ZipSourceDirect(inputStream, request.getSizeOption());
+        LOG.info("Using ZipSourceDirect: Size: {} entities", zipSourceDirect.size());
+        return zipSourceDirect;
+    }
+
     @VisibleForTesting
     boolean checkHiveTableIncrementalSkipLineage(AtlasImportRequest importRequest, AtlasExportRequest exportRequest) {
-        if (CollectionUtils.isEmpty(exportRequest.getItemsToExport())) {
+        if (exportRequest == null || CollectionUtils.isEmpty(exportRequest.getItemsToExport())) {
             return false;
         }
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java
index 351b475..8347b91 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java
@@ -31,4 +31,8 @@ public enum ZipExportFileNames {
     public String toString() {
         return this.name;
     }
+
+    public String toEntryFileName() {
+        return this.name + ".json";
+    }
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
new file mode 100644
index 0000000..260c4af
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.entitytransform.BaseEntityHandler;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportResult;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
+import org.apache.atlas.type.AtlasType;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+import static org.apache.atlas.AtlasErrorCode.IMPORT_ATTEMPTING_EMPTY_ZIP;
+
+public class ZipSourceDirect implements EntityImportStream {
+    private static final Logger LOG = LoggerFactory.getLogger(ZipSourceDirect.class);
+
+    private final ZipInputStream zipInputStream;
+    private int currentPosition;
+
+    private ImportTransforms importTransform;
+    private List<BaseEntityHandler> entityHandlers;
+    private AtlasTypesDef typesDef;
+    private ZipEntry zipEntryNext;
+    private int streamSize = 1;
+
+    public ZipSourceDirect(InputStream inputStream, int streamSize) throws IOException, AtlasBaseException {
+        this.zipInputStream = new ZipInputStream(inputStream);
+        this.streamSize = streamSize;
+        prepareStreamForFetch();
+    }
+
+    @Override
+    public ImportTransforms getImportTransform() { return this.importTransform; }
+
+    @Override
+    public void setImportTransform(ImportTransforms importTransform) {
+        this.importTransform = importTransform;
+    }
+
+    @Override
+    public List<BaseEntityHandler> getEntityHandlers() {
+        return entityHandlers;
+    }
+
+    @Override
+    public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) {
+        this.entityHandlers = entityHandlers;
+    }
+
+    @Override
+    public AtlasTypesDef getTypesDef() throws AtlasBaseException {
+        return this.typesDef;
+    }
+
+    @Override
+    public
+    AtlasExportResult getExportResult() throws AtlasBaseException {
+        return new AtlasExportResult();
+    }
+
+    @Override
+    public List<String> getCreationOrder() {
+        return new ArrayList<>();
+    }
+
+    @Override
+    public int getPosition() {
+        return currentPosition;
+    }
+
+    @Override
+    public AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String json) throws AtlasBaseException {
+        if (StringUtils.isEmpty(json)) {
+            return null;
+        }
+
+        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = convertFromJson(AtlasEntity.AtlasEntityWithExtInfo.class, json);
+
+        if (importTransform != null) {
+            entityWithExtInfo = importTransform.apply(entityWithExtInfo);
+        }
+
+        if (entityHandlers != null) {
+            applyTransformers(entityWithExtInfo);
+        }
+
+        return entityWithExtInfo;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return (this.zipEntryNext != null
+                && !zipEntryNext.getName().equals(ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toEntryFileName())
+                && !zipEntryNext.getName().equals(ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toEntryFileName()));
+    }
+
+    @Override
+    public AtlasEntity next() {
+        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = getNextEntityWithExtInfo();
+
+        return entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
+    }
+
+    @Override
+    public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
+        try {
+            if (hasNext()) {
+                String json = moveNext();
+                return getEntityWithExtInfo(json);
+            }
+        } catch (AtlasBaseException e) {
+            LOG.error("getNextEntityWithExtInfo", e);
+        }
+        return null;
+    }
+
+    @Override
+    public void reset() {
+        currentPosition = 0;
+    }
+
+    @Override
+    public AtlasEntity getByGuid(String guid) {
+        try {
+            return getEntity(guid);
+        } catch (AtlasBaseException e) {
+            LOG.error("getByGuid: {} failed!", guid, e);
+            return null;
+        }
+    }
+
+    @Override
+    public void onImportComplete(String guid) {
+    }
+
+    @Override
+    public void setPosition(int index) {
+        try {
+            for (int i = 0; i < index; i++) {
+                moveNextEntry();
+            }
+        }
+        catch (IOException e) {
+            LOG.error("Error setting position: {}. Position may be beyond the stream size.", index);
+        }
+    }
+
+    @Override
+    public void setPositionUsingEntityGuid(String guid) {
+    }
+
+    @Override
+    public void close() {
+    }
+
+    private void applyTransformers(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
+        if (entityWithExtInfo == null) {
+            return;
+        }
+
+        transform(entityWithExtInfo.getEntity());
+
+        if (MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) {
+            for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
+                transform(e);
+            }
+        }
+    }
+
+    private void transform(AtlasEntity e) {
+        for (BaseEntityHandler handler : entityHandlers) {
+            handler.transform(e);
+        }
+    }
+
+    private <T> T convertFromJson(Class<T> clazz, String jsonData) throws AtlasBaseException {
+        try {
+            return AtlasType.fromJson(jsonData, clazz);
+
+        } catch (Exception e) {
+            throw new AtlasBaseException("Error converting file to JSON.", e);
+        }
+    }
+
+    private AtlasEntity getEntity(String guid) throws AtlasBaseException {
+        AtlasEntity.AtlasEntityWithExtInfo extInfo = getEntityWithExtInfo(guid);
+        return (extInfo != null) ? extInfo.getEntity() : null;
+    }
+
+    public int size() {
+        return this.streamSize;
+    }
+
+    private String moveNext() {
+        try {
+            moveNextEntry();
+            return getJsonPayloadFromZipEntryStream(this.zipInputStream);
+        } catch (IOException e) {
+            LOG.error("moveNext failed!", e);
+        }
+
+        return null;
+    }
+
+    private void moveNextEntry() throws IOException {
+        this.zipEntryNext = this.zipInputStream.getNextEntry();
+        this.currentPosition++;
+    }
+
+    private void prepareStreamForFetch() throws AtlasBaseException, IOException {
+        moveNextEntry();
+        if (this.zipEntryNext == null) {
+            throw new AtlasBaseException(IMPORT_ATTEMPTING_EMPTY_ZIP, "Attempting to import empty ZIP.");
+        }
+
+        if (this.zipEntryNext.getName().equals(ZipExportFileNames.ATLAS_TYPESDEF_NAME.toEntryFileName())) {
+            String json = getJsonPayloadFromZipEntryStream(this.zipInputStream);
+            this.typesDef = AtlasType.fromJson(json, AtlasTypesDef.class);
+        }
+    }
+
+    private String getJsonPayloadFromZipEntryStream(ZipInputStream zipInputStream) {
+        try {
+            final int BUFFER_LENGTH = 4096;
+            byte[] buf = new byte[BUFFER_LENGTH];
+
+            int n = 0;
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            while ((n = zipInputStream.read(buf, 0, BUFFER_LENGTH)) > -1) {
+                bos.write(buf, 0, n);
+            }
+
+            return bos.toString();
+        } catch (IOException ex) {
+            LOG.error("Error fetching string from entry!", ex);
+        }
+
+        return null;
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
index ca0bc41..69d78cd 100644
--- a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
+++ b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
@@ -24,6 +24,7 @@ import org.apache.atlas.RequestContext;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.impexp.AtlasImportRequest;
 import org.apache.atlas.repository.impexp.ImportService;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,11 +33,20 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetAddress;
+import java.util.zip.ZipFile;
+
+import static org.apache.atlas.AtlasConfiguration.MIGRATION_IMPORT_START_POSITION;
 
 public class ZipFileMigrationImporter implements Runnable {
     private static final Logger LOG = LoggerFactory.getLogger(ZipFileMigrationImporter.class);
 
-    private static String ENV_USER_NAME = "user.name";
+    private static final String APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS = "atlas.migration.mode.workers";
+    private static final String APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE       = "atlas.migration.mode.batch.size";
+    private static final String DEFAULT_NUMBER_OF_WORKDERS = "4";
+    private static final String DEFAULT_BATCH_SIZE = "100";
+    private static final String ZIP_FILE_COMMENT = "streamSize";
+
+    private final static String ENV_USER_NAME = "user.name";
 
     private final ImportService importService;
     private final String fileToImport;
@@ -52,7 +62,8 @@ public class ZipFileMigrationImporter implements Runnable {
             FileWatcher fileWatcher = new FileWatcher(fileToImport);
             fileWatcher.start();
 
-            performImport(new FileInputStream(new File(fileToImport)));
+            int streamSize = getStreamSizeFromComment(fileToImport);
+            performImport(new FileInputStream(new File(fileToImport)), streamSize);
         } catch (IOException e) {
             LOG.error("Migration Import: IO Error!", e);
         } catch (AtlasBaseException e) {
@@ -60,19 +71,44 @@ public class ZipFileMigrationImporter implements Runnable {
         }
     }
 
-    private void performImport(InputStream fs) throws AtlasBaseException {
+    private int getStreamSizeFromComment(String fileToImport) {
+        int ret = 1;
+        try {
+            ZipFile zipFile = new ZipFile(fileToImport);
+            String streamSizeComment = zipFile.getComment();
+            ret = processZipFileStreamSizeComment(streamSizeComment);
+            zipFile.close();
+        } catch (IOException e) {
+            LOG.error("Error opening ZIP file: {}", fileToImport, e);
+        }
+
+        return ret;
+    }
+
+    private int processZipFileStreamSizeComment(String streamSizeComment) {
+        if (!StringUtils.isNotEmpty(streamSizeComment) || !StringUtils.startsWith(streamSizeComment, ZIP_FILE_COMMENT)) {
+            return 1;
+        }
+
+        String s = StringUtils.substringAfter(streamSizeComment, ":");
+        LOG.debug("ZipFileMigrationImporter: streamSize: {}", streamSizeComment);
+
+        return Integer.valueOf(s);
+    }
+
+    private void performImport(InputStream fs, int streamSize) throws AtlasBaseException {
         try {
             LOG.info("Migration Import: {}: Starting...", fileToImport);
 
             RequestContext.get().setUser(getUserNameFromEnvironment(), null);
 
-            importService.run(fs, getImportRequest(),
+            importService.run(fs, getImportRequest(streamSize),
                     getUserNameFromEnvironment(),
                     InetAddress.getLocalHost().getHostName(),
                     InetAddress.getLocalHost().getHostAddress());
 
         } catch (Exception ex) {
-            LOG.error("Error loading zip for migration", ex);
+            LOG.error("Migration Import: Error loading zip for migration!", ex);
             throw new AtlasBaseException(ex);
         } finally {
             LOG.info("Migration Import: {}: Done!", fileToImport);
@@ -83,8 +119,16 @@ public class ZipFileMigrationImporter implements Runnable {
         return System.getProperty(ENV_USER_NAME);
     }
 
-    private AtlasImportRequest getImportRequest() throws AtlasException {
-        return new AtlasImportRequest();
+    private AtlasImportRequest getImportRequest(int streamSize) throws AtlasException {
+        AtlasImportRequest request = new AtlasImportRequest();
+
+        request.setSizeOption(streamSize);
+        request.setOption(AtlasImportRequest.OPTION_KEY_MIGRATION, "true");
+        request.setOption(AtlasImportRequest.OPTION_KEY_NUM_WORKERS, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS, DEFAULT_NUMBER_OF_WORKDERS));
+        request.setOption(AtlasImportRequest.OPTION_KEY_BATCH_SIZE, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE, DEFAULT_BATCH_SIZE));
+        request.setOption(AtlasImportRequest.START_POSITION_KEY, Integer.toString(MIGRATION_IMPORT_START_POSITION.getInt()));
+
+        return request;
     }
 
     private String getPropertyValue(String property, String defaultValue) throws AtlasException {
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
index 2b58119..bee6378 100644
--- a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
@@ -81,9 +81,9 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
             AtlasGraph        graph        = getGraph();
 
             for (AtlasEntityType entityType : typeRegistry.getAllEntityTypes()) {
-                LOG.info("finding entities of type {}", entityType.getTypeName());
-
+                LOG.info("finding entities of type: {}", entityType.getTypeName());
                 Iterable<Object> iterable = graph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, entityType.getTypeName()).vertexIds();
+                LOG.info("found entities of type: {}", entityType.getTypeName());
                 int              count    = 0;
 
                 for (Iterator<Object> iter = iterable.iterator(); iter.hasNext(); ) {
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
index 39ea3f8..805531c 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
@@ -150,6 +150,14 @@ public interface AtlasEntityStore {
     EntityMutationResponse createOrUpdateForImport(EntityStream entityStream) throws AtlasBaseException;
 
     /**
+     * Create or update  entities with parameters necessary for import process without commit. Caller will have to do take care of commit.
+     * @param entityStream AtlasEntityStream
+     * @return EntityMutationResponse Entity mutations operations with the corresponding set of entities on which these operations were performed
+     * @throws AtlasBaseException
+     */
+    EntityMutationResponse createOrUpdateForImportNoCommit(EntityStream entityStream) throws AtlasBaseException;
+
+    /**
      * Update a single entity
      * @param objectId     ID of the entity
      * @param updatedEntityInfo updated entity information
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index 30f5e5a..6f6ee17 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -332,6 +332,11 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
     }
 
     @Override
+    public EntityMutationResponse createOrUpdateForImportNoCommit(EntityStream entityStream) throws AtlasBaseException {
+        return createOrUpdate(entityStream, false, true, true);
+    }
+
+    @Override
     @GraphTransaction
     public EntityMutationResponse updateEntity(AtlasObjectId objectId, AtlasEntityWithExtInfo updatedEntityInfo, boolean isPartialUpdate) throws AtlasBaseException {
         if (LOG.isDebugEnabled()) {
@@ -1210,8 +1215,10 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
 
             ret.setGuidAssignments(context.getGuidAssignments());
 
-            // Notify the change listeners
-            entityChangeNotifier.onEntitiesMutated(ret, RequestContext.get().isImportInProgress());
+            if (!RequestContext.get().isImportInProgress()) {
+                // Notify the change listeners
+                entityChangeNotifier.onEntitiesMutated(ret, RequestContext.get().isImportInProgress());
+            }
 
             if (LOG.isDebugEnabled()) {
                 LOG.debug("<== createOrUpdate()");
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
index fdf117a..857b709 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
@@ -929,6 +929,10 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
     }
 
     private void sendNotifications(AtlasRelationship ret, OperationType relationshipUpdate) throws AtlasBaseException {
+        if (entityChangeNotifier == null) {
+            return;
+        }
+
         entityChangeNotifier.notifyPropagatedEntities();
         if (notificationsEnabled){
             entityChangeNotifier.notifyRelationshipMutation(ret, relationshipUpdate);
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
index 54c32c5..4526002 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
@@ -18,33 +18,30 @@
 package org.apache.atlas.repository.store.graph.v2;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.atlas.AtlasConfiguration;
-import org.apache.atlas.AtlasErrorCode;
-import org.apache.atlas.RequestContext;
-import org.apache.atlas.annotation.GraphTransaction;
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
 import org.apache.atlas.model.impexp.AtlasImportResult;
 import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.EntityMutationResponse;
-import org.apache.atlas.repository.Constants;
-import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.BulkImporter;
+import org.apache.atlas.repository.store.graph.v2.bulkimport.ImportStrategy;
+import org.apache.atlas.repository.store.graph.v2.bulkimport.MigrationImport;
+import org.apache.atlas.repository.store.graph.v2.bulkimport.RegularImport;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.type.Constants;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
 import javax.inject.Inject;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -55,131 +52,24 @@ public class BulkImporterImpl implements BulkImporter {
     private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV2.class);
 
     private final AtlasEntityStore entityStore;
-    private final EntityGraphRetriever entityGraphRetriever;
     private AtlasTypeRegistry typeRegistry;
-    private final int MAX_ATTEMPTS = 2;
-    private boolean directoryBasedImportConfigured;
 
     @Inject
     public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
         this.entityStore = entityStore;
-        this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
         this.typeRegistry = typeRegistry;
-        this.directoryBasedImportConfigured = StringUtils.isNotEmpty(AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
     }
 
     @Override
     public EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("==> bulkImport()");
-        }
-
-        if (entityStream == null || !entityStream.hasNext()) {
-            throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
-        }
-
-        EntityMutationResponse ret = new EntityMutationResponse();
-        ret.setGuidAssignments(new HashMap<>());
-
-        Set<String>  processedGuids = new HashSet<>();
-        float        currentPercent = 0f;
-        List<String> residualList   = new ArrayList<>();
-
-        EntityImportStreamWithResidualList entityImportStreamWithResidualList = new EntityImportStreamWithResidualList(entityStream, residualList);
-
-        while (entityImportStreamWithResidualList.hasNext()) {
-            AtlasEntityWithExtInfo entityWithExtInfo = entityImportStreamWithResidualList.getNextEntityWithExtInfo();
-            AtlasEntity            entity            = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
-
-            if (entity == null) {
-                continue;
-            }
-
-            for (int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) {
-                try {
-                    AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null);
-                    EntityMutationResponse resp = entityStore.createOrUpdateForImport(oneEntityStream);
-
-                    if (resp.getGuidAssignments() != null) {
-                        ret.getGuidAssignments().putAll(resp.getGuidAssignments());
-                    }
-
-                    currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids,
-                            entityStream.getPosition(),
-                            entityImportStreamWithResidualList.getStreamSize(),
-                            currentPercent);
-
-                    entityStream.onImportComplete(entity.getGuid());
-                    break;
-                } catch (AtlasBaseException e) {
-                    if (!updateResidualList(e, residualList, entityWithExtInfo.getEntity().getGuid())) {
-                        throw e;
-                    }
-                    break;
-                } catch (AtlasSchemaViolationException e) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Entity: {}", entity.getGuid(), e);
-                    }
-
-                    if (attempt == 0) {
-                        updateVertexGuid(entity);
-                    } else {
-                        LOG.error("Guid update failed: {}", entityWithExtInfo.getEntity().getGuid());
-                        throw e;
-                    }
-                } catch (Throwable e) {
-                    AtlasBaseException abe = new AtlasBaseException(e);
-                    if (!updateResidualList(abe, residualList, entityWithExtInfo.getEntity().getGuid())) {
-                        throw abe;
-                    }
-
-                    LOG.warn("Exception: {}", entity.getGuid(), e);
-                    break;
-                } finally {
-                    RequestContext.get().clearCache();
-                }
-            }
-        }
-
-        importResult.getProcessedEntities().addAll(processedGuids);
-        LOG.info("bulkImport(): done. Total number of entities (including referred entities) imported: {}", processedGuids.size());
-
-        return ret;
-    }
-
-    @GraphTransaction
-    public void updateVertexGuid(AtlasEntity entity) {
-        String entityGuid = entity.getGuid();
-        AtlasObjectId objectId = entityGraphRetriever.toAtlasObjectIdWithoutGuid(entity);
-
-        AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
-        String vertexGuid = null;
-        try {
-            vertexGuid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, objectId.getUniqueAttributes());
-        } catch (AtlasBaseException e) {
-            LOG.warn("Entity: {}: Does not exist!", objectId);
-            return;
-        }
-
-        if (StringUtils.isEmpty(vertexGuid) || vertexGuid.equals(entityGuid)) {
-            return;
-        }
-
-        AtlasVertex v = AtlasGraphUtilsV2.findByGuid(vertexGuid);
-        if (v == null) {
-            return;
-        }
-
-        addHistoricalGuid(v, vertexGuid);
-        AtlasGraphUtilsV2.setProperty(v, Constants.GUID_PROPERTY_KEY, entityGuid);
-
-        LOG.warn("GUID Updated: Entity: {}: from: {}: to: {}", objectId, vertexGuid, entity.getGuid());
-    }
-
-    private void addHistoricalGuid(AtlasVertex v, String vertexGuid) {
-        String existingJson = AtlasGraphUtilsV2.getProperty(v, HISTORICAL_GUID_PROPERTY_KEY, String.class);
-
-        AtlasGraphUtilsV2.setProperty(v, HISTORICAL_GUID_PROPERTY_KEY, getJsonArray(existingJson, vertexGuid));
+        ImportStrategy importStrategy =
+                (importResult.getRequest().getOptions() != null &&
+                        importResult.getRequest().getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION))
+                ? new MigrationImport(new AtlasGraphProvider(), this.typeRegistry)
+                : new RegularImport(this.entityStore, this.typeRegistry);
+
+        LOG.info("BulkImportImpl: {}", importStrategy.getClass().getSimpleName());
+        return importStrategy.run(entityStream, importResult);
     }
 
     @VisibleForTesting
@@ -193,38 +83,16 @@ public class BulkImporterImpl implements BulkImporter {
         return json;
     }
 
-    private boolean updateResidualList(AtlasBaseException e, List<String> lineageList, String guid) {
-        if (!e.getAtlasErrorCode().getErrorCode().equals(AtlasErrorCode.INVALID_OBJECT_ID.getErrorCode())) {
-            return false;
-        }
-
-        lineageList.add(guid);
-
-        return true;
-    }
-
-    private float updateImportMetrics(AtlasEntity.AtlasEntityWithExtInfo currentEntity,
-                                      EntityMutationResponse             resp,
-                                      AtlasImportResult                  importResult,
-                                      Set<String>                        processedGuids,
-                                      int currentIndex, int streamSize, float currentPercent) {
-        if (!directoryBasedImportConfigured) {
-            updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
-            updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
-            updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
-        }
-
-        String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid());
-
-        return updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported);
-    }
-
     @VisibleForTesting
-    static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent, String additionalInfo) {
+    public static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent, String additionalInfo) {
         final double tolerance   = 0.000001;
         final int    MAX_PERCENT = 100;
 
         int     maxSize        = (currentIndex <= streamSize) ? streamSize : currentIndex;
+        if (maxSize <= 0) {
+            return currentPercent;
+        }
+
         float   percent        = (float) ((currentIndex * MAX_PERCENT) / maxSize);
         boolean updateLog      = Double.compare(percent, currentPercent) > tolerance;
         float   updatedPercent = (MAX_PERCENT < maxSize) ? percent : ((updateLog) ? ++currentPercent : currentPercent);
@@ -236,7 +104,7 @@ public class BulkImporterImpl implements BulkImporter {
         return updatedPercent;
     }
 
-    private static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) {
+    public static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) {
         if (list == null) {
             return;
         }
@@ -251,41 +119,37 @@ public class BulkImporterImpl implements BulkImporter {
         }
     }
 
-    private static class EntityImportStreamWithResidualList {
-        private final EntityImportStream stream;
-        private final List<String>       residualList;
-        private       boolean            navigateResidualList;
-        private       int                currentResidualListIndex;
-
+    public static void updateVertexGuid(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityGraphRetriever, AtlasEntity entity) {
+        String entityGuid = entity.getGuid();
+        AtlasObjectId objectId = entityGraphRetriever.toAtlasObjectIdWithoutGuid(entity);
 
-        public EntityImportStreamWithResidualList(EntityImportStream stream, List<String> residualList) {
-            this.stream                   = stream;
-            this.residualList             = residualList;
-            this.navigateResidualList     = false;
-            this.currentResidualListIndex = 0;
+        AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
+        String vertexGuid = null;
+        try {
+            vertexGuid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, objectId.getUniqueAttributes());
+        } catch (AtlasBaseException e) {
+            LOG.warn("Entity: {}: Does not exist!", objectId);
+            return;
         }
 
-        public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
-            if (navigateResidualList == false) {
-                return stream.getNextEntityWithExtInfo();
-            } else {
-                stream.setPositionUsingEntityGuid(residualList.get(currentResidualListIndex++));
-                return stream.getNextEntityWithExtInfo();
-            }
+        if (StringUtils.isEmpty(vertexGuid) || vertexGuid.equals(entityGuid)) {
+            return;
         }
 
-        public boolean hasNext() {
-            if (!navigateResidualList) {
-                boolean streamHasNext = stream.hasNext();
-                navigateResidualList = (streamHasNext == false);
-                return streamHasNext ? streamHasNext : (currentResidualListIndex < residualList.size());
-            } else {
-                return (currentResidualListIndex < residualList.size());
-            }
+        AtlasVertex v = AtlasGraphUtilsV2.findByGuid(vertexGuid);
+        if (v == null) {
+            return;
         }
 
-        public int getStreamSize() {
-            return stream.size() + residualList.size();
-        }
+        addHistoricalGuid(v, vertexGuid);
+        AtlasGraphUtilsV2.setProperty(v, Constants.GUID_PROPERTY_KEY, entityGuid);
+
+        LOG.warn("GUID Updated: Entity: {}: from: {}: to: {}", objectId, vertexGuid, entity.getGuid());
+    }
+
+    public static void addHistoricalGuid(AtlasVertex v, String vertexGuid) {
+        String existingJson = AtlasGraphUtilsV2.getProperty(v, HISTORICAL_GUID_PROPERTY_KEY, String.class);
+
+        AtlasGraphUtilsV2.setProperty(v, HISTORICAL_GUID_PROPERTY_KEY, getJsonArray(existingJson, vertexGuid));
     }
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index 2f3aad0..e76b341 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -361,7 +361,9 @@ public class EntityGraphMapper {
 
         updateLabels(vertex, labels);
 
-        entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), addedLabels, removedLabels);
+        if (entityChangeNotifier != null) {
+            entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), addedLabels, removedLabels);
+        }
     }
 
     public void addLabels(AtlasVertex vertex, Set<String> labels) throws AtlasBaseException {
@@ -378,7 +380,10 @@ public class EntityGraphMapper {
             if (!updatedLabels.equals(existingLabels)) {
                 updateLabels(vertex, updatedLabels);
                 updatedLabels.removeAll(existingLabels);
-                entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), updatedLabels, null);
+
+                if (entityChangeNotifier != null) {
+                    entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), updatedLabels, null);
+                }
             }
         }
     }
@@ -395,7 +400,10 @@ public class EntityGraphMapper {
                 if (!updatedLabels.equals(existingLabels)) {
                     updateLabels(vertex, updatedLabels);
                     existingLabels.removeAll(updatedLabels);
-                    entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), null, existingLabels);
+
+                    if (entityChangeNotifier != null) {
+                        entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), null, existingLabels);
+                    }
                 }
             }
         }
@@ -1948,7 +1956,9 @@ public class EntityGraphMapper {
                 Set<AtlasVertex>  vertices           = addedClassifications.get(classification);
                 List<AtlasEntity> propagatedEntities = updateClassificationText(classification, vertices);
 
-                entityChangeNotifier.onClassificationsAddedToEntities(propagatedEntities, Collections.singletonList(classification));
+                if (entityChangeNotifier != null) {
+                    entityChangeNotifier.onClassificationsAddedToEntities(propagatedEntities, Collections.singletonList(classification));
+                }
             }
 
             RequestContext.get().endMetricRecord(metric);
@@ -2056,7 +2066,10 @@ public class EntityGraphMapper {
             AtlasEntity entity = updateClassificationText(entry.getKey());
 
             List<AtlasClassification> deletedClassificationNames = entry.getValue();
-            entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassificationNames);
+
+            if (entityChangeNotifier != null) {
+                entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassificationNames);
+            }
         }
     }
 
@@ -2283,17 +2296,19 @@ public class EntityGraphMapper {
             notificationVertices.addAll(entitiesToPropagateTo);
         }
 
-        for (AtlasVertex vertex : notificationVertices) {
-            String      entityGuid = GraphHelper.getGuid(vertex);
-            AtlasEntity entity     = instanceConverter.getAndCacheEntity(entityGuid, ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES);
+        if (entityChangeNotifier != null) {
+            for (AtlasVertex vertex : notificationVertices) {
+                String entityGuid = GraphHelper.getGuid(vertex);
+                AtlasEntity entity = instanceConverter.getAndCacheEntity(entityGuid, ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES);
 
-            if (isActive(entity)) {
-                vertex.setProperty(CLASSIFICATION_TEXT_KEY, fullTextMapperV2.getClassificationTextForEntity(entity));
-                entityChangeNotifier.onClassificationUpdatedToEntity(entity, updatedClassifications);
+                if (isActive(entity)) {
+                    vertex.setProperty(CLASSIFICATION_TEXT_KEY, fullTextMapperV2.getClassificationTextForEntity(entity));
+                    entityChangeNotifier.onClassificationUpdatedToEntity(entity, updatedClassifications);
+                }
             }
         }
 
-        if (MapUtils.isNotEmpty(removedPropagations)) {
+        if (entityChangeNotifier != null && MapUtils.isNotEmpty(removedPropagations)) {
             for (AtlasClassification classification : removedPropagations.keySet()) {
                 List<AtlasVertex> propagatedVertices = removedPropagations.get(classification);
                 List<AtlasEntity> propagatedEntities = updateClassificationText(classification, propagatedVertices);
@@ -2526,7 +2541,7 @@ public class EntityGraphMapper {
     private List<AtlasEntity> updateClassificationText(AtlasClassification classification, Collection<AtlasVertex> propagatedVertices) throws AtlasBaseException {
         List<AtlasEntity> propagatedEntities = new ArrayList<>();
 
-        if(CollectionUtils.isNotEmpty(propagatedVertices)) {
+        if (fullTextMapperV2 != null && CollectionUtils.isNotEmpty(propagatedVertices)) {
             for(AtlasVertex vertex : propagatedVertices) {
                 AtlasEntity entity = instanceConverter.getAndCacheEntity(GraphHelper.getGuid(vertex), ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES);
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/ImportStrategy.java
similarity index 58%
copy from repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java
copy to repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/ImportStrategy.java
index 351b475..6b70eab 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/ImportStrategy.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,20 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.atlas.repository.impexp;
 
-public enum ZipExportFileNames {
-    ATLAS_EXPORT_INFO_NAME("atlas-export-info"),
-    ATLAS_EXPORT_ORDER_NAME("atlas-export-order"),
-    ATLAS_TYPESDEF_NAME("atlas-typesdef");
+package org.apache.atlas.repository.store.graph.v2.bulkimport;
 
-    public final String name;
-    ZipExportFileNames(String name) {
-        this.name = name;
-    }
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
 
-    @Override
-    public String toString() {
-        return this.name;
-    }
+public abstract class ImportStrategy {
+    public abstract EntityMutationResponse run(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException;
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
new file mode 100644
index 0000000..8c66656
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.store.graph.v2.bulkimport;
+
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.repository.converters.AtlasFormatConverters;
+import org.apache.atlas.repository.converters.AtlasInstanceConverter;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
+import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
+import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
+import org.apache.atlas.repository.store.graph.v2.AtlasRelationshipStoreV2;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
+import org.apache.atlas.repository.store.graph.v2.bulkimport.pc.EntityConsumerBuilder;
+import org.apache.atlas.repository.store.graph.v2.bulkimport.pc.EntityCreationManager;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MigrationImport extends ImportStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(MigrationImport.class);
+
+    private final AtlasTypeRegistry typeRegistry;
+    private AtlasGraph atlasGraph;
+    private EntityGraphRetriever entityGraphRetriever;
+    private EntityGraphMapper entityGraphMapper;
+    private AtlasEntityStore entityStore;
+
+    public MigrationImport(AtlasGraphProvider atlasGraphProvider, AtlasTypeRegistry typeRegistry) {
+        this.typeRegistry = typeRegistry;
+        setupEntityStore(atlasGraphProvider, typeRegistry);
+        LOG.info("MigrationImport: Using bulkLoading...");
+    }
+
+    public EntityMutationResponse run(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
+        if (entityStream == null || !entityStream.hasNext()) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
+        }
+
+        if (importResult.getRequest() == null) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "importResult should contain request");
+        }
+
+        int index = 0;
+        int streamSize = entityStream.size();
+        EntityMutationResponse ret = new EntityMutationResponse();
+        EntityCreationManager creationManager = createEntityCreationManager(atlasGraph, importResult, streamSize);
+
+        try {
+            LOG.info("Migration Import: Size: {}: Starting...", streamSize);
+            index = creationManager.read(entityStream);
+            creationManager.drain();
+            creationManager.extractResults();
+        } catch (Exception ex) {
+            LOG.error("Migration Import: Error: Current position: {}", index, ex);
+        } finally {
+            shutdownEntityCreationManager(creationManager);
+        }
+
+        LOG.info("Migration Import: Size: {}: Done!", streamSize);
+        return ret;
+    }
+
+    private EntityCreationManager createEntityCreationManager(AtlasGraph threadedAtlasGraph, AtlasImportResult importResult, int streamSize) {
+        int batchSize = importResult.getRequest().getOptionKeyBatchSize();
+        int numWorkers = getNumWorkers(importResult.getRequest().getOptionKeyNumWorkers());
+
+        EntityConsumerBuilder consumerBuilder =
+                new EntityConsumerBuilder(threadedAtlasGraph, entityStore, entityGraphRetriever, typeRegistry, batchSize);
+
+        return new EntityCreationManager(consumerBuilder, batchSize, numWorkers, importResult, streamSize);
+    }
+
+    private static int getNumWorkers(int numWorkersFromOptions) {
+        int ret = (numWorkersFromOptions > 0) ? numWorkersFromOptions : 1;
+        LOG.info("Migration Import: Setting numWorkers: {}", ret);
+        return ret;
+    }
+
+    private void setupEntityStore(AtlasGraphProvider atlasGraphProvider, AtlasTypeRegistry typeRegistry) {
+        this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
+        this.atlasGraph = atlasGraphProvider.getBulkLoading();
+        DeleteHandlerDelegate deleteDelegate = new DeleteHandlerDelegate(typeRegistry);
+
+        AtlasRelationshipStore relationshipStore = new AtlasRelationshipStoreV2(typeRegistry, deleteDelegate, null);
+        AtlasFormatConverters formatConverters = new AtlasFormatConverters(typeRegistry);
+        AtlasInstanceConverter instanceConverter = new AtlasInstanceConverter(typeRegistry, formatConverters);
+        this.entityGraphMapper = new EntityGraphMapper(deleteDelegate, typeRegistry, atlasGraph, relationshipStore, null, instanceConverter, null);
+        this.entityStore = new AtlasEntityStoreV2(deleteDelegate, typeRegistry, null, entityGraphMapper);
+    }
+
+    private void shutdownEntityCreationManager(EntityCreationManager creationManager) {
+        try {
+            creationManager.shutdown();
+        } catch (InterruptedException e) {
+            LOG.error("Migration Import: Shutdown: Interrupted!", e);
+        }
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImport.java
similarity index 80%
copy from repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
copy to repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImport.java
index 54c32c5..4cc8ed4 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImport.java
@@ -6,16 +6,18 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.atlas.repository.store.graph.v2;
+
+package org.apache.atlas.repository.store.graph.v2.bulkimport;
+
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.AtlasConfiguration;
@@ -33,15 +35,17 @@ import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
-import org.apache.atlas.repository.store.graph.BulkImporter;
+import org.apache.atlas.repository.store.graph.v2.AtlasEntityStreamForImport;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
 
-import javax.inject.Inject;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -49,27 +53,25 @@ import java.util.List;
 import java.util.Set;
 
 import static org.apache.atlas.repository.Constants.HISTORICAL_GUID_PROPERTY_KEY;
+import static org.apache.atlas.repository.store.graph.v2.BulkImporterImpl.updateImportProgress;
 
-@Component
-public class BulkImporterImpl implements BulkImporter {
-    private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV2.class);
-
+public class RegularImport extends ImportStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(RegularImport.class);
+    private static final int MAX_ATTEMPTS = 3;
     private final AtlasEntityStore entityStore;
+    private final AtlasTypeRegistry typeRegistry;
     private final EntityGraphRetriever entityGraphRetriever;
-    private AtlasTypeRegistry typeRegistry;
-    private final int MAX_ATTEMPTS = 2;
     private boolean directoryBasedImportConfigured;
 
-    @Inject
-    public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
+    public RegularImport(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
         this.entityStore = entityStore;
-        this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
         this.typeRegistry = typeRegistry;
+        this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
         this.directoryBasedImportConfigured = StringUtils.isNotEmpty(AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
     }
 
     @Override
-    public EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
+    public EntityMutationResponse run(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("==> bulkImport()");
         }
@@ -81,7 +83,7 @@ public class BulkImporterImpl implements BulkImporter {
         EntityMutationResponse ret = new EntityMutationResponse();
         ret.setGuidAssignments(new HashMap<>());
 
-        Set<String>  processedGuids = new HashSet<>();
+        Set<String> processedGuids = new HashSet<>();
         float        currentPercent = 0f;
         List<String> residualList   = new ArrayList<>();
 
@@ -209,9 +211,9 @@ public class BulkImporterImpl implements BulkImporter {
                                       Set<String>                        processedGuids,
                                       int currentIndex, int streamSize, float currentPercent) {
         if (!directoryBasedImportConfigured) {
-            updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
-            updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
-            updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
+            BulkImporterImpl.updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
+            BulkImporterImpl.updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
+            BulkImporterImpl.updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
         }
 
         String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid());
@@ -219,38 +221,6 @@ public class BulkImporterImpl implements BulkImporter {
         return updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported);
     }
 
-    @VisibleForTesting
-    static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent, String additionalInfo) {
-        final double tolerance   = 0.000001;
-        final int    MAX_PERCENT = 100;
-
-        int     maxSize        = (currentIndex <= streamSize) ? streamSize : currentIndex;
-        float   percent        = (float) ((currentIndex * MAX_PERCENT) / maxSize);
-        boolean updateLog      = Double.compare(percent, currentPercent) > tolerance;
-        float   updatedPercent = (MAX_PERCENT < maxSize) ? percent : ((updateLog) ? ++currentPercent : currentPercent);
-
-        if (updateLog) {
-            log.info("bulkImport(): progress: {}% (of {}) - {}", (int) Math.ceil(percent), maxSize, additionalInfo);
-        }
-
-        return updatedPercent;
-    }
-
-    private static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) {
-        if (list == null) {
-            return;
-        }
-
-        for (AtlasEntityHeader h : list) {
-            if (processedGuids.contains(h.getGuid())) {
-                continue;
-            }
-
-            processedGuids.add(h.getGuid());
-            importResult.incrementMeticsCounter(String.format(prefix, h.getTypeName()));
-        }
-    }
-
     private static class EntityImportStreamWithResidualList {
         private final EntityImportStream stream;
         private final List<String>       residualList;
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
new file mode 100644
index 0000000..bb74205
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2.bulkimport.pc;
+
+import org.apache.atlas.GraphTransactionInterceptor;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.pc.WorkItemConsumer;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.repository.store.graph.v2.AtlasEntityStreamForImport;
+import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWithExtInfo> {
+    private static final Logger LOG = LoggerFactory.getLogger(EntityConsumer.class);
+    private static final int MAX_COMMIT_RETRY_COUNT = 3;
+
+    private final int batchSize;
+    private AtomicLong counter = new AtomicLong(1);
+    private AtomicLong currentBatch = new AtomicLong(1);
+
+    private final AtlasGraph atlasGraph;
+    private final AtlasEntityStore entityStoreV2;
+    private final AtlasTypeRegistry typeRegistry;
+    private final EntityGraphRetriever entityGraphRetriever;
+
+    private List<AtlasEntity.AtlasEntityWithExtInfo> entityBuffer = new ArrayList<>();
+    private List<EntityMutationResponse> localResults = new ArrayList<>();
+
+    public EntityConsumer(AtlasGraph atlasGraph, AtlasEntityStore entityStore,
+                          EntityGraphRetriever entityGraphRetriever, AtlasTypeRegistry typeRegistry,
+                          BlockingQueue queue, int batchSize) {
+        super(queue);
+
+        this.atlasGraph = atlasGraph;
+        this.entityStoreV2 = entityStore;
+        this.entityGraphRetriever = entityGraphRetriever;
+        this.typeRegistry = typeRegistry;
+        this.batchSize = batchSize;
+    }
+
+    @Override
+    protected void processItem(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
+        int delta = (MapUtils.isEmpty(entityWithExtInfo.getReferredEntities())
+                ? 1
+                : entityWithExtInfo.getReferredEntities().size()) + 1;
+
+        long currentCount = counter.addAndGet(delta);
+        currentBatch.addAndGet(delta);
+        entityBuffer.add(entityWithExtInfo);
+
+        try {
+            processEntity(entityWithExtInfo, currentCount);
+            attemptCommit();
+        } catch (Exception e) {
+            LOG.info("Data loss: Please re-submit!", e);
+        }
+    }
+
+    private void processEntity(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo, long currentCount) {
+        try {
+            RequestContext.get().setImportInProgress(true);
+            AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null);
+
+            LOG.debug("Processing: {}", currentCount);
+            EntityMutationResponse result = entityStoreV2.createOrUpdateForImportNoCommit(oneEntityStream);
+            localResults.add(result);
+        } catch (AtlasBaseException e) {
+            addResult(entityWithExtInfo.getEntity().getGuid());
+            LOG.warn("Exception: {}", entityWithExtInfo.getEntity().getGuid(), e);
+        } catch (AtlasSchemaViolationException e) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Entity: {}", entityWithExtInfo.getEntity().getGuid(), e);
+            }
+
+            BulkImporterImpl.updateVertexGuid(typeRegistry, entityGraphRetriever, entityWithExtInfo.getEntity());
+        }
+    }
+
+    private void attemptCommit() {
+        if (currentBatch.get() < batchSize) {
+            return;
+        }
+
+        doCommit();
+    }
+
+    @Override
+    protected void doCommit() {
+        for (int retryCount = 1; retryCount <= MAX_COMMIT_RETRY_COUNT; retryCount++) {
+            if (commitWithRetry(retryCount)) {
+                return;
+            }
+        }
+
+        LOG.error("Retries exceeded! Potential data loss! Please correct data and re-attempt. Buffer: {}: Counter: {}", entityBuffer.size(), counter.get());
+        clear();
+    }
+
+    @Override
+    protected void commitDirty() {
+        super.commitDirty();
+        LOG.info("Total: Commit: {}", counter.get());
+        counter.set(0);
+    }
+
+    private boolean commitWithRetry(int retryCount) {
+        try {
+            atlasGraph.commit();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Commit: Done!: Buffer: {}: Batch: {}: Counter: {}", entityBuffer.size(), currentBatch.get(), counter.get());
+            }
+
+            dispatchResults();
+            return true;
+        } catch (Exception ex) {
+            rollbackPauseRetry(retryCount, ex);
+            return false;
+        }
+    }
+
+    private void rollbackPauseRetry(int retryCount, Exception ex) {
+        atlasGraph.rollback();
+        clearCache();
+
+        LOG.error("Rollback: Done! Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount);
+        pause(retryCount);
+        if (ex.getClass().getName().endsWith("JanusGraphException") && retryCount >= MAX_COMMIT_RETRY_COUNT) {
+            LOG.warn("Commit error! Will pause and retry: Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount, ex);
+        } else {
+            LOG.info("Will pause and retry: Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount);
+        }
+        retryProcessEntity(retryCount);
+    }
+
+    private void retryProcessEntity(int retryCount) {
+        LOG.info("Replaying: Starting!: Buffer: {}: Retry count: {}", entityBuffer.size(), retryCount);
+        for (AtlasEntity.AtlasEntityWithExtInfo e : entityBuffer) {
+            processEntity(e, counter.get());
+        }
+        LOG.info("Replaying: Done!: Buffer: {}: Retry count: {}", entityBuffer.size(), retryCount);
+    }
+
+    private void dispatchResults() {
+        localResults.stream().forEach(x -> {
+            addResultsFromResponse(x.getCreatedEntities());
+            addResultsFromResponse(x.getUpdatedEntities());
+            addResultsFromResponse(x.getDeletedEntities());
+        });
+
+        clear();
+    }
+
+    private void pause(int retryCount) {
+        try {
+            Thread.sleep(1000 * retryCount);
+        } catch (InterruptedException e) {
+            LOG.error("pause: Interrupted!", e);
+        }
+    }
+
+    private void addResultsFromResponse(List<AtlasEntityHeader> entities) {
+        if (CollectionUtils.isEmpty(entities)) {
+            return;
+        }
+
+        for (AtlasEntityHeader eh : entities) {
+            addResult(eh.getGuid());
+        }
+    }
+
+    private void clear() {
+        localResults.clear();
+        entityBuffer.clear();
+        clearCache();
+        currentBatch.set(0);
+    }
+
+    private void clearCache() {
+        GraphTransactionInterceptor.clearCache();
+        RequestContext.get().clearCache();
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java
new file mode 100644
index 0000000..69d33b2
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.store.graph.v2.bulkimport.pc;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.pc.WorkItemBuilder;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import org.apache.atlas.type.AtlasTypeRegistry;
+
+import java.util.concurrent.BlockingQueue;
+
+public class EntityConsumerBuilder implements WorkItemBuilder<EntityConsumer, AtlasEntity.AtlasEntityWithExtInfo> {
+    private AtlasGraph atlasGraph;
+    private AtlasEntityStore entityStore;
+    private final EntityGraphRetriever entityGraphRetriever;
+    private final AtlasTypeRegistry typeRegistry;
+    private int batchSize;
+
+    public EntityConsumerBuilder(AtlasGraph atlasGraph, AtlasEntityStore entityStore,
+                                 EntityGraphRetriever entityGraphRetriever, AtlasTypeRegistry typeRegistry, int batchSize) {
+        this.atlasGraph = atlasGraph;
+        this.entityStore = entityStore;
+        this.entityGraphRetriever = entityGraphRetriever;
+        this.typeRegistry = typeRegistry;
+        this.batchSize = batchSize;
+    }
+
+    @Override
+    public EntityConsumer build(BlockingQueue<AtlasEntity.AtlasEntityWithExtInfo> queue) {
+        return new EntityConsumer(atlasGraph, entityStore, entityGraphRetriever, typeRegistry, queue, this.batchSize);
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
new file mode 100644
index 0000000..0051941
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.store.graph.v2.bulkimport.pc;
+
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.pc.WorkItemBuilder;
+import org.apache.atlas.pc.WorkItemManager;
+import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
+import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManager {
+    private static final Logger LOG = LoggerFactory.getLogger(EntityCreationManager.class);
+    private static final String WORKER_PREFIX = "migration-import";
+
+    private final StatusReporter<String, String> statusReporter;
+    private final AtlasImportResult importResult;
+    private final int streamSize;
+    private final long STATUS_REPORT_TIMEOUT_DURATION = 5 * 60 * 1000; // 5 min
+    private String currentTypeName;
+    private float currentPercent;
+
+    public EntityCreationManager(WorkItemBuilder builder, int batchSize, int numWorkers, AtlasImportResult importResult, int streamSize) {
+        super(builder, WORKER_PREFIX, batchSize, numWorkers, true);
+        this.importResult = importResult;
+        this.streamSize = streamSize;
+
+        this.statusReporter = new StatusReporter<>(STATUS_REPORT_TIMEOUT_DURATION);
+    }
+
+    public int read(EntityImportStream entityStream) {
+        int currentIndex = 0;
+        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo;
+        while ((entityWithExtInfo = entityStream.getNextEntityWithExtInfo()) != null) {
+            AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
+            if (entity == null) {
+                continue;
+            }
+
+            try {
+                produce(currentIndex++, entity.getTypeName(), entityWithExtInfo);
+            } catch (Throwable e) {
+                LOG.warn("Exception: {}", entity.getGuid(), e);
+                break;
+            }
+        }
+        return currentIndex;
+    }
+
+    private void produce(int currentIndex, String typeName, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
+        String previousTypeName = getCurrentTypeName();
+
+        if (StringUtils.isNotEmpty(typeName)
+                && StringUtils.isNotEmpty(previousTypeName)
+                && !StringUtils.equals(previousTypeName, typeName)) {
+            LOG.info("Waiting: '{}' to complete...", previousTypeName);
+            super.drain();
+            LOG.info("Switching entity type processing: From: '{}' To: '{}'...", previousTypeName, typeName);
+        }
+
+        setCurrentTypeName(typeName);
+        statusReporter.produced(entityWithExtInfo.getEntity().getGuid(), String.format("%s:%s", entityWithExtInfo.getEntity().getTypeName(), currentIndex));
+        super.checkProduce(entityWithExtInfo);
+        extractResults();
+    }
+
+    public void extractResults() {
+        Object result;
+        while (((result = getResults().poll())) != null) {
+            statusReporter.processed((String) result);
+        }
+
+        logStatus();
+    }
+
+    private void logStatus() {
+        String ack = statusReporter.ack();
+        if (StringUtils.isEmpty(ack)) {
+            return;
+        }
+
+        String[] split = ack.split(":");
+        if (split.length == 0 || split.length < 2) {
+            return;
+        }
+
+        importResult.incrementMeticsCounter(split[0]);
+        this.currentPercent = updateImportMetrics(split[0], Integer.parseInt(split[1]), getStreamSize(), getCurrentPercent());
+    }
+
+    private static float updateImportMetrics(String typeNameGuid, int currentIndex, int streamSize, float currentPercent) {
+        String lastEntityImported = String.format("entity:last-imported:%s:(%s)", typeNameGuid, currentIndex);
+        return BulkImporterImpl.updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported);
+    }
+
+    private String getCurrentTypeName() {
+        return this.currentTypeName;
+    }
+
+    private void setCurrentTypeName(String typeName) {
+        this.currentTypeName = typeName;
+    }
+
+    private float getCurrentPercent() {
+        return this.currentPercent;
+    }
+
+    private int getStreamSize() {
+        return this.streamSize;
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/StatusReporter.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/StatusReporter.java
new file mode 100644
index 0000000..1cd9860
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/StatusReporter.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.store.graph.v2.bulkimport.pc;
+
+import org.apache.atlas.v1.typesystem.types.utils.TypesUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class StatusReporter<T, U> {
+    private static final Logger LOG = LoggerFactory.getLogger(StatusReporter.class);
+
+    private Map<T,U> producedItems = new LinkedHashMap<>();
+    private Set<T> processedSet = new HashSet<>();
+    private TypesUtil.Pair<T, Long> watchedItem;
+    private final long timeOut;
+
+    public StatusReporter(long timeOut) {
+        this.timeOut = timeOut;
+    }
+
+    public void produced(T item, U index) {
+        this.producedItems.put(item, index);
+    }
+
+    public void processed(T item) {
+        this.processedSet.add(item);
+    }
+
+    public void processed(T[] index) {
+        this.processedSet.addAll(Arrays.asList(index));
+    }
+
+    public U ack() {
+        U ack = null;
+        U ret;
+        Map.Entry<T, U> firstElement;
+        do {
+            firstElement = getFirstElement(this.producedItems);
+            ret = completionIndex(firstElement);
+            if (ret != null) {
+                ack = ret;
+            }
+        } while(ret != null);
+
+        return addToWatchIfNeeded(ack, firstElement);
+    }
+
+    private U addToWatchIfNeeded(U ack, Map.Entry<T, U> firstElement) {
+        if (ack == null && firstElement != null) {
+            ack = addToWatch(firstElement.getKey());
+        } else {
+            resetWatchItem();
+        }
+        return ack;
+    }
+
+    private void resetWatchItem() {
+        this.watchedItem = null;
+    }
+
+    private U addToWatch(T key) {
+        createNewWatchItem(key);
+        if (!hasTimedOut(this.watchedItem)) {
+            return null;
+        }
+
+        T producedItemKey = this.watchedItem.left;
+        resetWatchItem();
+        LOG.warn("Item: {}: Was produced but not successfully processed!", producedItemKey);
+        return this.producedItems.get(producedItemKey);
+
+    }
+
+    private void createNewWatchItem(T key) {
+        if (this.watchedItem != null) {
+            return;
+        }
+
+        this.watchedItem = new TypesUtil.Pair<T, Long>(key, System.currentTimeMillis());
+    }
+
+    private boolean hasTimedOut(TypesUtil.Pair<T, Long> watchedItem) {
+        if (watchedItem == null) {
+            return  false;
+        }
+
+        return (System.currentTimeMillis() - watchedItem.right) >= timeOut;
+    }
+
+    private Map.Entry<T, U> getFirstElement(Map<T, U> map) {
+        if (map.isEmpty()) {
+            return null;
+        }
+
+        return map.entrySet().iterator().next();
+    }
+
+    private U completionIndex(Map.Entry<T, U> lookFor) {
+        U ack = null;
+        if (lookFor == null || !processedSet.contains(lookFor.getKey())) {
+            return ack;
+        }
+
+        ack = lookFor.getValue();
+        producedItems.remove(lookFor.getKey());
+        processedSet.remove(lookFor);
+        return ack;
+    }
+}
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
index c14850f..759be64 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
@@ -136,6 +136,11 @@ public class ImportServiceTest extends ExportImportTestBase {
         return getZipSource("dup_col_deleted.zip");
     }
 
+    @DataProvider(name = "zipDirect1")
+    public static Object[][] getZipDirect(ITestContext context) throws IOException, AtlasBaseException {
+        return getZipSource("dup_col_deleted.zip");
+    }
+
     @Test(dataProvider = "sales")
     public void importDB1(InputStream inputStream) throws AtlasBaseException, IOException {
         loadBaseModel();
@@ -530,6 +535,17 @@ public class ImportServiceTest extends ExportImportTestBase {
         }
     }
 
+    @Test(dataProvider = "zipDirect1")
+    public void zipSourceDirect(InputStream inputStream) throws IOException, AtlasBaseException {
+        loadBaseModel();
+        loadFsModel();
+        loadHiveModel();
+
+        runImportWithNoParameters(importService, inputStream);
+
+    }
+
+
     private AtlasImportRequest getImportRequest(String replicatedFrom){
         AtlasImportRequest importRequest = getDefaultImportRequest();
 
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/MigrationImportTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/MigrationImportTest.java
new file mode 100644
index 0000000..2a22d88
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/MigrationImportTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+
+import com.google.inject.Inject;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.discovery.EntityDiscoveryService;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.repository.store.graph.v2.bulkimport.MigrationImport;
+import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.testng.Assert.assertNotNull;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class MigrationImportTest  extends ExportImportTestBase {
+
+    private final ImportService importService;
+
+    @Inject
+    AtlasTypeRegistry typeRegistry;
+
+    @Inject
+    private AtlasTypeDefStore typeDefStore;
+
+    @Inject
+    private EntityDiscoveryService discoveryService;
+
+    @Inject
+    AtlasEntityStore entityStore;
+
+    @Inject
+    AtlasGraph atlasGraph;
+
+    @Inject
+    public MigrationImportTest(ImportService importService) {
+        this.importService = importService;
+    }
+
+    @Test
+    public void simpleImport() throws IOException, AtlasBaseException {
+        InputStream inputStream = ZipFileResourceTestUtils.getFileInputStream("zip-direct-2.zip");
+
+        AtlasImportRequest importRequest = new AtlasImportRequest();
+        importRequest.setOption("migration", "true");
+
+        AtlasImportResult result = importService.run(inputStream, importRequest, null, null, null);
+        assertNotNull(result);
+    }
+}
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/StatusReporterTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/StatusReporterTest.java
new file mode 100644
index 0000000..5e15023
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/StatusReporterTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.repository.store.graph.v2.bulkimport.pc.StatusReporter;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
+public class StatusReporterTest {
+    @Test
+    public void noneProducedNoneReported() {
+        StatusReporter<Integer, Integer> statusReporter = new StatusReporter<>(100);
+        assertNull(statusReporter.ack());
+    }
+
+    @Test
+    public void producedButNotAcknowledged() {
+        StatusReporter<Integer, Integer> statusReporter = createStatusReportWithItems();
+        assertNull(statusReporter.ack());
+    }
+
+    @Test
+    public void producedAcknowledged() {
+        StatusReporter<Integer, Integer> statusReporter = createStatusReportWithItems();
+        statusReporter.processed(1);
+
+        assertEquals(java.util.Optional.of(100).get(), statusReporter.ack());
+    }
+
+    @Test
+    public void producedAcknowledgeMaxAvailableInSequence() {
+        StatusReporter<Integer, Integer> statusReporter = createStatusReportWithItems();
+
+        statusReporter.processed(new Integer[]{1, 3, 5});
+
+        assertEquals(java.util.Optional.of(100).get(), statusReporter.ack());
+    }
+
+    @Test
+    public void producedAcknowledgeMaxAvailableInSequence2() {
+        StatusReporter<Integer, Integer> statusReporter = createStatusReportWithItems();
+        statusReporter.processed(new Integer[]{1, 2, 3, 6, 5});
+
+        assertEquals(java.util.Optional.of(300).get(), statusReporter.ack());
+    }
+
+    @Test
+    public void producedSetDisjointWithAckSet() {
+        StatusReporter<Integer, Integer> statusReporter = new StatusReporter(100);
+        statusReporter.produced(11, 1000);
+        statusReporter.produced(12, 2000);
+        statusReporter.produced(13, 3000);
+
+        statusReporter.processed(new Integer[]{1, 11, 12, 13});
+
+        assertEquals(java.util.Optional.of(3000).get(), statusReporter.ack());
+    }
+
+    @Test
+    public void missingAck() throws InterruptedException {
+        StatusReporter<Integer, Integer> statusReporter = createStatusReportWithItems(2, 3, 4);
+
+        assertNull(statusReporter.ack());
+        Thread.sleep(1002);
+        assertEquals(java.util.Optional.of(100).get(), statusReporter.ack());
+    }
+
+    private StatusReporter<Integer, Integer> createStatusReportWithItems(Integer... processed) {
+        StatusReporter<Integer, Integer> statusReporter = new StatusReporter(1000);
+        statusReporter.produced(1, 100);
+        statusReporter.produced(2, 200);
+        statusReporter.produced(3, 300);
+        statusReporter.produced(4, 400);
+        statusReporter.produced(5, 500);
+        statusReporter.produced(6, 600);
+
+        statusReporter.processed(processed);
+
+        return statusReporter;
+    }
+}
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipDirectTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipDirectTest.java
new file mode 100644
index 0000000..d191d8c
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipDirectTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportResult;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+public class ZipDirectTest {
+    @Test(expectedExceptions = AtlasBaseException.class)
+    public void loadFileEmpty() throws IOException, AtlasBaseException {
+        InputStream inputStream = ZipFileResourceTestUtils.getFileInputStream("zip-direct-1.zip");
+        new ZipSourceDirect(inputStream, 1);
+    }
+
+    @Test
+    public void loadFile() throws IOException, AtlasBaseException {
+        final int EXPECTED_ENTITY_COUNT = 3434;
+
+        InputStream inputStream = ZipFileResourceTestUtils.getFileInputStream("zip-direct-2.zip");
+        ZipSourceDirect zipSourceDirect = new ZipSourceDirect(inputStream, EXPECTED_ENTITY_COUNT);
+
+        assertNotNull(zipSourceDirect);
+        assertNotNull(zipSourceDirect.getTypesDef());
+        assertTrue(zipSourceDirect.getTypesDef().getEntityDefs().size() > 0);
+        assertNotNull(zipSourceDirect.getExportResult());
+
+        int count = 0;
+        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo;
+        while((entityWithExtInfo = zipSourceDirect.getNextEntityWithExtInfo()) != null) {
+            assertNotNull(entityWithExtInfo);
+            count++;
+        }
+
+        assertEquals(count, EXPECTED_ENTITY_COUNT);
+    }
+}
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
index 0ffc3d5..27a6668 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
@@ -317,7 +317,9 @@ public class ZipFileResourceTestUtils {
     }
 
     public static AtlasImportRequest getDefaultImportRequest() {
-        return new AtlasImportRequest();
+        AtlasImportRequest atlasImportRequest = new AtlasImportRequest();
+        atlasImportRequest.setOption("migration", "true");
+        return atlasImportRequest;
     }
 
 
@@ -336,7 +338,8 @@ public class ZipFileResourceTestUtils {
         final String hostName = "localhost";
         final String userName = "admin";
 
-        AtlasImportResult result = importService.run(inputStream, userName, hostName, requestingIP);
+        AtlasImportRequest request = getDefaultImportRequest();
+        AtlasImportResult result = runImportWithParameters(importService, request, inputStream);
         assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS);
         return result;
     }
diff --git a/repository/src/test/resources/zip-direct-1.zip b/repository/src/test/resources/zip-direct-1.zip
new file mode 100644
index 0000000..15cb0ec
Binary files /dev/null and b/repository/src/test/resources/zip-direct-1.zip differ
diff --git a/repository/src/test/resources/zip-direct-2.zip b/repository/src/test/resources/zip-direct-2.zip
new file mode 100644
index 0000000..e7b8617
Binary files /dev/null and b/repository/src/test/resources/zip-direct-2.zip differ


[atlas] 02/02: DataMigration: Automatic resume.

Posted by am...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 54042d35b29f91b46fd033a6378dedf1ff47c5d9
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Thu Feb 20 17:04:49 2020 -0800

    DataMigration: Automatic resume.
---
 addons/models/0000-Area0/0010-base_model.json      |  50 ++++++++++
 .../model/migration/MigrationImportStatus.java     |  98 +++++++++++++++++++
 .../repository/migration/DataMigrationService.java |   4 +-
 .../migration/DataMigrationStatusService.java      | 104 +++++++++++++++++++++
 .../migration/ZipFileMigrationImporter.java        |  33 +++++--
 .../repository/ogm/MigrationImportStatusDTO.java   | 103 ++++++++++++++++++++
 .../store/graph/v2/BulkImporterImpl.java           |   8 +-
 .../store/graph/v2/bulkimport/MigrationImport.java |  13 ++-
 .../v2/bulkimport/pc/EntityCreationManager.java    |  15 ++-
 9 files changed, 404 insertions(+), 24 deletions(-)

diff --git a/addons/models/0000-Area0/0010-base_model.json b/addons/models/0000-Area0/0010-base_model.json
index 6bdd2f7..001bb6c 100644
--- a/addons/models/0000-Area0/0010-base_model.json
+++ b/addons/models/0000-Area0/0010-base_model.json
@@ -256,6 +256,56 @@
       ]
     },
     {
+      "name": "__MigrationImportStatus",
+      "superTypes": [
+        "__internal"
+      ],
+      "serviceType": "atlas_core",
+      "typeVersion": "1.0",
+      "attributeDefs": [
+        {
+          "name": "name",
+          "typeName": "string",
+          "cardinality": "SINGLE",
+          "isIndexable": true,
+          "isOptional": false,
+          "isUnique": true
+        },
+        {
+          "name": "size",
+          "typeName": "int",
+          "cardinality": "SINGLE",
+          "isIndexable": true,
+          "isOptional": true,
+          "isUnique": false
+        },
+        {
+          "name": "position",
+          "typeName": "string",
+          "cardinality": "SINGLE",
+          "isIndexable": true,
+          "isOptional": true,
+          "isUnique": false
+        },
+        {
+          "name": "startTime",
+          "typeName": "long",
+          "cardinality": "SINGLE",
+          "isIndexable": true,
+          "isOptional": true,
+          "isUnique": false
+        },
+        {
+          "name": "endTime",
+          "typeName": "long",
+          "cardinality": "SINGLE",
+          "isIndexable": true,
+          "isOptional": true,
+          "isUnique": false
+        }
+      ]
+    },
+    {
       "name": "__AtlasUserSavedSearch",
       "superTypes": [
         "__internal"
diff --git a/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java b/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java
new file mode 100644
index 0000000..e3f1326
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.model.migration;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.atlas.model.AtlasBaseModelObject;
+
+import java.io.Serializable;
+import java.util.Date;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class MigrationImportStatus extends AtlasBaseModelObject implements Serializable {
+    private String name;
+    private int size;
+    private long startTime;
+    private long endTime;
+    private String position;
+
+    public MigrationImportStatus() {
+    }
+
+    public MigrationImportStatus(String name) {
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public int getSize() {
+        return size;
+    }
+
+    public void setSize(int size) {
+        this.size = size;
+    }
+
+    public long getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+    }
+
+    public long getEndTime() {
+        return endTime;
+    }
+
+    public void setEndTime(long endTime) {
+        this.endTime = endTime;
+    }
+
+    public void setPosition(String position) {
+        this.position = position;
+    }
+
+    public String getPosition() {
+        return this.position;
+    }
+
+    @Override
+    protected StringBuilder toString(StringBuilder sb) {
+        sb.append(", name=").append(name);
+        sb.append(", size=").append(size);
+        sb.append(", startTime=").append(startTime);
+        sb.append(", endTime=").append(endTime);
+
+        return sb;
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java
index 0a2257e..48f2a2f 100644
--- a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java
@@ -60,14 +60,14 @@ public class DataMigrationService implements Service {
     @Inject
     public DataMigrationService(GraphDBMigrator migrator, AtlasTypeDefStore typeDefStore, Configuration configuration,
                                 GraphBackedSearchIndexer indexer, AtlasTypeDefStoreInitializer storeInitializer,
-                                AtlasTypeRegistry typeRegistry, ImportService importService) {
+                                AtlasTypeRegistry typeRegistry, ImportService importService, DataMigrationStatusService dataMigrationStatusService) {
         this.configuration = configuration;
 
 
         String fileName = getFileName();
         boolean zipFileBasedMigrationImport = StringUtils.endsWithIgnoreCase(fileName, FILE_EXTENSION_ZIP);
         this.thread        = (zipFileBasedMigrationImport)
-            ?  new Thread(new ZipFileMigrationImporter(importService, fileName), "zipFileBasedMigrationImporter")
+            ?  new Thread(new ZipFileMigrationImporter(importService, fileName, dataMigrationStatusService), "zipFileBasedMigrationImporter")
             :  new Thread(new FileImporter(migrator, typeDefStore, typeRegistry, storeInitializer, fileName, indexer));
     }
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java
new file mode 100644
index 0000000..b5285d0
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.migration;
+
+import org.apache.atlas.annotation.AtlasService;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.migration.MigrationImportStatus;
+import org.apache.atlas.repository.ogm.DataAccess;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+
+@AtlasService
+public class DataMigrationStatusService {
+    private static final Logger LOG = LoggerFactory.getLogger(DataMigrationStatusService.class);
+
+    private final DataAccess dataAccess;
+    private MigrationImportStatus status;
+
+    @Inject
+    public DataMigrationStatusService(DataAccess dataAccess) {
+        this.dataAccess = dataAccess;
+    }
+
+    public MigrationImportStatus getCreate(MigrationImportStatus status) {
+        try {
+            this.status = this.dataAccess.load(status);
+            this.status.setSize(status.getSize());
+            this.status.setStartTime(status.getStartTime());
+
+            this.status = dataAccess.save(this.status);
+        } catch (Exception ex) {
+            LOG.info("DataMigrationStatusService: Setting status: {}...", status.getName());
+            try {
+                this.status = dataAccess.save(status);
+            } catch (AtlasBaseException e) {
+                LOG.info("DataMigrationStatusService: Error saving status: {}...", status.getName());
+            }
+        }
+
+        return this.status;
+    }
+
+    public MigrationImportStatus get() {
+        return this.status;
+    }
+
+    public MigrationImportStatus getByName(String name) throws AtlasBaseException {
+        MigrationImportStatus status = new MigrationImportStatus(name);
+
+        return dataAccess.load(status);
+    }
+
+    public void deleteStatus() throws AtlasBaseException {
+        if (this.status == null) {
+            return;
+        }
+
+        MigrationImportStatus status = getByName(this.status.getName());
+        dataAccess.delete(status.getGuid());
+    }
+
+    public void savePosition(String position) {
+        this.status.setPosition(position);
+        try {
+            this.dataAccess.saveNoLoad(this.status);
+        } catch (AtlasBaseException e) {
+            LOG.error("Error saving status: {}", position, e);
+        }
+    }
+
+    public void setEndTime() {
+        this.status.setEndTime(System.currentTimeMillis());
+        try {
+            this.dataAccess.saveNoLoad(this.status);
+        } catch (AtlasBaseException e) {
+            LOG.error("Error saving status: endTime", e);
+        }
+    }
+
+    public MigrationImportStatus createGet(String fileToImport, int streamSize) {
+        MigrationImportStatus status = new MigrationImportStatus(fileToImport);
+        status.setSize(streamSize);
+
+        return getCreate(status);
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
index 69d78cd..72ffab4 100644
--- a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
+++ b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
@@ -23,6 +23,7 @@ import org.apache.atlas.AtlasException;
 import org.apache.atlas.RequestContext;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.migration.MigrationImportStatus;
 import org.apache.atlas.repository.impexp.ImportService;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -50,20 +51,23 @@ public class ZipFileMigrationImporter implements Runnable {
 
     private final ImportService importService;
     private final String fileToImport;
+    private DataMigrationStatusService dataMigrationStatusService;
 
-    public ZipFileMigrationImporter(ImportService importService, String fileName) {
+    public ZipFileMigrationImporter(ImportService importService, String fileName, DataMigrationStatusService dataMigrationStatusService) {
         this.importService = importService;
         this.fileToImport = fileName;
+        this.dataMigrationStatusService = dataMigrationStatusService;
     }
 
     @Override
     public void run() {
         try {
-            FileWatcher fileWatcher = new FileWatcher(fileToImport);
-            fileWatcher.start();
+            detectFileToImport();
 
             int streamSize = getStreamSizeFromComment(fileToImport);
-            performImport(new FileInputStream(new File(fileToImport)), streamSize);
+            MigrationImportStatus status = dataMigrationStatusService.createGet(fileToImport, streamSize);
+            performImport(new FileInputStream(new File(fileToImport)), status.getPosition(), streamSize);
+            dataMigrationStatusService.setEndTime();
         } catch (IOException e) {
             LOG.error("Migration Import: IO Error!", e);
         } catch (AtlasBaseException e) {
@@ -71,6 +75,11 @@ public class ZipFileMigrationImporter implements Runnable {
         }
     }
 
+    private void detectFileToImport() throws IOException {
+        FileWatcher fileWatcher = new FileWatcher(fileToImport);
+        fileWatcher.start();
+    }
+
     private int getStreamSizeFromComment(String fileToImport) {
         int ret = 1;
         try {
@@ -96,13 +105,13 @@ public class ZipFileMigrationImporter implements Runnable {
         return Integer.valueOf(s);
     }
 
-    private void performImport(InputStream fs, int streamSize) throws AtlasBaseException {
+    private void performImport(InputStream fs, String position, int streamSize) throws AtlasBaseException {
         try {
-            LOG.info("Migration Import: {}: Starting...", fileToImport);
+            LOG.info("Migration Import: {}: Position: {}: Starting...", fileToImport, position);
 
             RequestContext.get().setUser(getUserNameFromEnvironment(), null);
 
-            importService.run(fs, getImportRequest(streamSize),
+            importService.run(fs, getImportRequest(streamSize, position),
                     getUserNameFromEnvironment(),
                     InetAddress.getLocalHost().getHostName(),
                     InetAddress.getLocalHost().getHostAddress());
@@ -112,6 +121,7 @@ public class ZipFileMigrationImporter implements Runnable {
             throw new AtlasBaseException(ex);
         } finally {
             LOG.info("Migration Import: {}: Done!", fileToImport);
+            dataMigrationStatusService.deleteStatus();
         }
     }
 
@@ -119,14 +129,19 @@ public class ZipFileMigrationImporter implements Runnable {
         return System.getProperty(ENV_USER_NAME);
     }
 
-    private AtlasImportRequest getImportRequest(int streamSize) throws AtlasException {
+    private AtlasImportRequest getImportRequest(int streamSize, String position) throws AtlasException {
         AtlasImportRequest request = new AtlasImportRequest();
 
         request.setSizeOption(streamSize);
         request.setOption(AtlasImportRequest.OPTION_KEY_MIGRATION, "true");
         request.setOption(AtlasImportRequest.OPTION_KEY_NUM_WORKERS, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS, DEFAULT_NUMBER_OF_WORKDERS));
         request.setOption(AtlasImportRequest.OPTION_KEY_BATCH_SIZE, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE, DEFAULT_BATCH_SIZE));
-        request.setOption(AtlasImportRequest.START_POSITION_KEY, Integer.toString(MIGRATION_IMPORT_START_POSITION.getInt()));
+
+        request.setOption(AtlasImportRequest.START_POSITION_KEY,
+                        (StringUtils.isEmpty(position)
+                        ? Integer.toString(MIGRATION_IMPORT_START_POSITION.getInt())
+                        : position)
+                );
 
         return request;
     }
diff --git a/repository/src/main/java/org/apache/atlas/repository/ogm/MigrationImportStatusDTO.java b/repository/src/main/java/org/apache/atlas/repository/ogm/MigrationImportStatusDTO.java
new file mode 100644
index 0000000..be541cd
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/ogm/MigrationImportStatusDTO.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.ogm;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.migration.MigrationImportStatus;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+@Component
+public class MigrationImportStatusDTO extends AbstractDataTransferObject<MigrationImportStatus> {
+    public static final String PROPERTY_NAME = "name";
+    public static final String PROPERTY_SIZE = "size";
+    public static final String PROPERTY_POSITION = "position";
+    public static final String PROPERTY_START_TIME = "startTime";
+    public static final String PROPERTY_END_TIME = "endTime";
+    public static final String PROPERTY_ADDITIONAL_INFO = "additionalInfo";
+
+    private static final Set<String> ATTRIBUTE_NAMES = new HashSet<>(Arrays.asList(PROPERTY_NAME,
+            PROPERTY_SIZE, PROPERTY_POSITION,
+            PROPERTY_START_TIME, PROPERTY_END_TIME,
+            PROPERTY_ADDITIONAL_INFO));
+
+    @Inject
+    public MigrationImportStatusDTO(AtlasTypeRegistry typeRegistry) {
+        super(typeRegistry, MigrationImportStatus.class, Constants.INTERNAL_PROPERTY_KEY_PREFIX + MigrationImportStatus.class.getSimpleName());
+    }
+
+    public static Set<String> getAttributes() {
+        return ATTRIBUTE_NAMES;
+    }
+
+    public static MigrationImportStatus from(String guid, Map<String,Object> attributes) {
+        MigrationImportStatus entry = new MigrationImportStatus();
+
+        entry.setGuid(guid);
+        entry.setName((String) attributes.get(PROPERTY_NAME));
+        entry.setSize((int) attributes.get(PROPERTY_SIZE));
+        entry.setPosition((String) attributes.get(PROPERTY_POSITION));
+        entry.setStartTime((long) attributes.get(PROPERTY_START_TIME));
+        entry.setEndTime((long) attributes.get(PROPERTY_END_TIME));
+
+        return entry;
+    }
+
+    @Override
+    public MigrationImportStatus from(AtlasEntity entity) {
+        return from(entity.getGuid(), entity.getAttributes());
+    }
+
+    @Override
+    public MigrationImportStatus from(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
+        return from(entityWithExtInfo.getEntity());
+    }
+
+    @Override
+    public AtlasEntity toEntity(MigrationImportStatus obj) {
+        AtlasEntity entity = getDefaultAtlasEntity(obj);
+
+        entity.setAttribute(PROPERTY_NAME, obj.getName());
+        entity.setAttribute(PROPERTY_SIZE, obj.getSize());
+        entity.setAttribute(PROPERTY_POSITION, obj.getPosition());
+        entity.setAttribute(PROPERTY_START_TIME, obj.getStartTime());
+        entity.setAttribute(PROPERTY_END_TIME, obj.getEndTime());
+
+        return entity;
+    }
+
+    @Override
+    public AtlasEntity.AtlasEntityWithExtInfo toEntityWithExtInfo(MigrationImportStatus obj) throws AtlasBaseException {
+        return new AtlasEntity.AtlasEntityWithExtInfo(toEntity(obj));
+    }
+
+    @Override
+    public Map<String, Object> getUniqueAttributes(final MigrationImportStatus obj) {
+        return Collections.singletonMap(PROPERTY_NAME, obj.getName());
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
index 4526002..72b2f4f 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
@@ -26,8 +26,8 @@ import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.repository.graph.AtlasGraphProvider;
-import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.migration.DataMigrationStatusService;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.BulkImporter;
 import org.apache.atlas.repository.store.graph.v2.bulkimport.ImportStrategy;
@@ -53,11 +53,13 @@ public class BulkImporterImpl implements BulkImporter {
 
     private final AtlasEntityStore entityStore;
     private AtlasTypeRegistry typeRegistry;
+    private DataMigrationStatusService dataMigrationStatusService;
 
     @Inject
-    public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
+    public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry, DataMigrationStatusService dataMigrationStatusService) {
         this.entityStore = entityStore;
         this.typeRegistry = typeRegistry;
+        this.dataMigrationStatusService = dataMigrationStatusService;
     }
 
     @Override
@@ -65,7 +67,7 @@ public class BulkImporterImpl implements BulkImporter {
         ImportStrategy importStrategy =
                 (importResult.getRequest().getOptions() != null &&
                         importResult.getRequest().getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION))
-                ? new MigrationImport(new AtlasGraphProvider(), this.typeRegistry)
+                ? new MigrationImport(new AtlasGraphProvider(), this.typeRegistry, dataMigrationStatusService)
                 : new RegularImport(this.entityStore, this.typeRegistry);
 
         LOG.info("BulkImportImpl: {}", importStrategy.getClass().getSimpleName());
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
index 8c66656..9819dc2 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
@@ -26,6 +26,7 @@ import org.apache.atlas.repository.converters.AtlasFormatConverters;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
 import org.apache.atlas.repository.graph.AtlasGraphProvider;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.migration.DataMigrationStatusService;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
 import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
@@ -44,14 +45,16 @@ public class MigrationImport extends ImportStrategy {
     private static final Logger LOG = LoggerFactory.getLogger(MigrationImport.class);
 
     private final AtlasTypeRegistry typeRegistry;
+    private final DataMigrationStatusService dataMigrationStatusService;
     private AtlasGraph atlasGraph;
     private EntityGraphRetriever entityGraphRetriever;
     private EntityGraphMapper entityGraphMapper;
     private AtlasEntityStore entityStore;
 
-    public MigrationImport(AtlasGraphProvider atlasGraphProvider, AtlasTypeRegistry typeRegistry) {
+    public MigrationImport(AtlasGraphProvider atlasGraphProvider, AtlasTypeRegistry typeRegistry, DataMigrationStatusService dataMigrationStatusService) {
         this.typeRegistry = typeRegistry;
         setupEntityStore(atlasGraphProvider, typeRegistry);
+        this.dataMigrationStatusService = dataMigrationStatusService;
         LOG.info("MigrationImport: Using bulkLoading...");
     }
 
@@ -67,11 +70,11 @@ public class MigrationImport extends ImportStrategy {
         int index = 0;
         int streamSize = entityStream.size();
         EntityMutationResponse ret = new EntityMutationResponse();
-        EntityCreationManager creationManager = createEntityCreationManager(atlasGraph, importResult, streamSize);
+        EntityCreationManager creationManager = createEntityCreationManager(atlasGraph, dataMigrationStatusService, importResult, streamSize);
 
         try {
             LOG.info("Migration Import: Size: {}: Starting...", streamSize);
-            index = creationManager.read(entityStream);
+            index = creationManager.read(entityStream, importResult.getRequest().getStartPosition());
             creationManager.drain();
             creationManager.extractResults();
         } catch (Exception ex) {
@@ -84,14 +87,14 @@ public class MigrationImport extends ImportStrategy {
         return ret;
     }
 
-    private EntityCreationManager createEntityCreationManager(AtlasGraph threadedAtlasGraph, AtlasImportResult importResult, int streamSize) {
+    private EntityCreationManager createEntityCreationManager(AtlasGraph threadedAtlasGraph, DataMigrationStatusService dataMigrationStatusService, AtlasImportResult importResult, int streamSize) {
         int batchSize = importResult.getRequest().getOptionKeyBatchSize();
         int numWorkers = getNumWorkers(importResult.getRequest().getOptionKeyNumWorkers());
 
         EntityConsumerBuilder consumerBuilder =
                 new EntityConsumerBuilder(threadedAtlasGraph, entityStore, entityGraphRetriever, typeRegistry, batchSize);
 
-        return new EntityCreationManager(consumerBuilder, batchSize, numWorkers, importResult, streamSize);
+        return new EntityCreationManager(consumerBuilder, batchSize, numWorkers, dataMigrationStatusService, importResult, streamSize);
     }
 
     private static int getNumWorkers(int numWorkersFromOptions) {
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
index 0051941..89c5429 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
@@ -22,6 +22,7 @@ import org.apache.atlas.model.impexp.AtlasImportResult;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.pc.WorkItemBuilder;
 import org.apache.atlas.pc.WorkItemManager;
+import org.apache.atlas.repository.migration.DataMigrationStatusService;
 import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
 import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
 import org.apache.commons.lang.StringUtils;
@@ -30,25 +31,27 @@ import org.slf4j.LoggerFactory;
 
 public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManager {
     private static final Logger LOG = LoggerFactory.getLogger(EntityCreationManager.class);
+    private static final long STATUS_REPORT_TIMEOUT_DURATION = 5 * 60 * 1000; // 5 min
     private static final String WORKER_PREFIX = "migration-import";
 
     private final StatusReporter<String, String> statusReporter;
+    private final DataMigrationStatusService dataMigrationStatusService;
     private final AtlasImportResult importResult;
     private final int streamSize;
-    private final long STATUS_REPORT_TIMEOUT_DURATION = 5 * 60 * 1000; // 5 min
     private String currentTypeName;
     private float currentPercent;
 
-    public EntityCreationManager(WorkItemBuilder builder, int batchSize, int numWorkers, AtlasImportResult importResult, int streamSize) {
+    public EntityCreationManager(WorkItemBuilder builder, int batchSize, int numWorkers, DataMigrationStatusService dataMigrationStatusService, AtlasImportResult importResult, int streamSize) {
         super(builder, WORKER_PREFIX, batchSize, numWorkers, true);
+        this.dataMigrationStatusService = dataMigrationStatusService;
         this.importResult = importResult;
         this.streamSize = streamSize;
 
         this.statusReporter = new StatusReporter<>(STATUS_REPORT_TIMEOUT_DURATION);
     }
 
-    public int read(EntityImportStream entityStream) {
-        int currentIndex = 0;
+    public int read(EntityImportStream entityStream, String startPosition) {
+        int currentIndex = StringUtils.isEmpty(startPosition) ? 0 : Integer.valueOf(startPosition);
         AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo;
         while ((entityWithExtInfo = entityStream.getNextEntityWithExtInfo()) != null) {
             AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
@@ -103,8 +106,10 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManag
             return;
         }
 
+        String currentPosition = split[1];
+        dataMigrationStatusService.savePosition(currentPosition);
         importResult.incrementMeticsCounter(split[0]);
-        this.currentPercent = updateImportMetrics(split[0], Integer.parseInt(split[1]), getStreamSize(), getCurrentPercent());
+        this.currentPercent = updateImportMetrics(split[0], Integer.parseInt(currentPosition), getStreamSize(), getCurrentPercent());
     }
 
     private static float updateImportMetrics(String typeNameGuid, int currentIndex, int streamSize, float currentPercent) {