You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2020/03/16 17:09:47 UTC

[atlas] branch master updated: ATLAS-3663: ZipFileMigrator: Automatic Resume During Migration.

This is an automated email from the ASF dual-hosted git repository.

amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d0c987  ATLAS-3663: ZipFileMigrator: Automatic Resume During Migration.
3d0c987 is described below

commit 3d0c98779f16c7f0c6faa2b441877458ea152675
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Mon Mar 16 09:58:27 2020 -0700

    ATLAS-3663: ZipFileMigrator: Automatic Resume During Migration.
---
 .../atlas/model/impexp/AtlasImportRequest.java     |   3 +-
 .../model/migration/MigrationImportStatus.java     |  62 ++++++
 .../java/org/apache/atlas/pc/StatusReporter.java   |  41 +++-
 .../org/apache/atlas/pc/StatusReporterTest.java    |  18 ++
 .../atlas/repository/impexp/ImportService.java     |  10 +-
 .../impexp/MigrationProgressService.java           |  42 +++-
 .../migration/DataMigrationStatusService.java      | 245 +++++++++++++++++++++
 .../migration/ZipFileMigrationImporter.java        |  43 +++-
 .../store/graph/v2/bulkimport/MigrationImport.java |  21 +-
 .../graph/v2/bulkimport/pc/EntityConsumer.java     |   5 +-
 .../v2/bulkimport/pc/EntityCreationManager.java    |  19 +-
 .../impexp/DataMigrationStatusServiceTest.java     |  66 ++++++
 12 files changed, 548 insertions(+), 27 deletions(-)

diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
index 09dafdf..2c18704 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
@@ -44,12 +44,13 @@ public class AtlasImportRequest implements Serializable {
     public  static final String TRANSFORMS_KEY             = "transforms";
     public  static final String TRANSFORMERS_KEY           = "transformers";
     public  static final String OPTION_KEY_REPLICATED_FROM = "replicatedFrom";
+    public  static final String OPTION_KEY_MIGRATION_FILE_NAME = "migrationFileName";
     public  static final String OPTION_KEY_MIGRATION       = "migration";
     public  static final String OPTION_KEY_NUM_WORKERS     = "numWorkers";
     public  static final String OPTION_KEY_BATCH_SIZE      = "batchSize";
     public  static final String OPTION_KEY_FORMAT          = "format";
     public  static final String OPTION_KEY_FORMAT_ZIP_DIRECT = "zipDirect";
-    private static final String START_POSITION_KEY         = "startPosition";
+    public static final String START_POSITION_KEY          = "startPosition";
     private static final String START_GUID_KEY             = "startGuid";
     private static final String FILE_NAME_KEY              = "fileName";
     private static final String UPDATE_TYPE_DEFINITION_KEY = "updateTypeDefinition";
diff --git a/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java b/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java
new file mode 100644
index 0000000..3430fda
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.model.migration;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.atlas.model.AtlasBaseModelObject;
+import org.apache.atlas.model.impexp.MigrationStatus;
+import org.apache.commons.lang.StringUtils;
+
+import java.io.Serializable;
+import java.util.Date;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class MigrationImportStatus extends MigrationStatus {
+    private String name;
+
+    public MigrationImportStatus() {
+    }
+
+    public MigrationImportStatus(String name) {
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(", name=").append(name);
+        sb.append(super.toString());
+        return sb.toString();
+    }
+}
diff --git a/intg/src/main/java/org/apache/atlas/pc/StatusReporter.java b/intg/src/main/java/org/apache/atlas/pc/StatusReporter.java
index f84e8d0..7baf973 100644
--- a/intg/src/main/java/org/apache/atlas/pc/StatusReporter.java
+++ b/intg/src/main/java/org/apache/atlas/pc/StatusReporter.java
@@ -18,6 +18,9 @@
 
 package org.apache.atlas.pc;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -25,8 +28,20 @@ import java.util.Map;
 import java.util.Set;
 
 public class StatusReporter<T, U> {
+    private static final Logger LOG = LoggerFactory.getLogger(StatusReporter.class);
+
     private Map<T,U> producedItems = new LinkedHashMap<>();
     private Set<T> processedSet = new HashSet<>();
+    private long timeoutDuration;
+    private long lastAck;
+
+    public StatusReporter() {
+        this.timeoutDuration = -1;
+    }
+
+    public StatusReporter(long timeoutDurationInMs) {
+        this.timeoutDuration = timeoutDurationInMs;
+    }
 
     public void produced(T item, U index) {
         this.producedItems.put(item, index);
@@ -44,7 +59,8 @@ public class StatusReporter<T, U> {
         U ack = null;
         U ret;
         do {
-            ret = completionIndex(getFirstElement(this.producedItems));
+            Map.Entry<T, U> firstElement = getFirstElement(this.producedItems);
+            ret = completionIndex(firstElement);
             if (ret != null) {
                 ack = ret;
             }
@@ -63,13 +79,32 @@ public class StatusReporter<T, U> {
 
     private U completionIndex(Map.Entry<T, U> lookFor) {
         U ack = null;
-        if (lookFor == null || !processedSet.contains(lookFor.getKey())) {
+        if (lookFor == null) {
             return ack;
         }
 
-        ack = lookFor.getValue();
+        if (hasTimeoutDurationReached(System.currentTimeMillis())) {
+            LOG.warn("Ack: Timeout: {} - {}", lookFor.getKey(), lookFor.getValue());
+            return acknowledged(lookFor);
+        }
+
+        if (!processedSet.contains(lookFor.getKey())) {
+            return ack;
+        }
+
+        return acknowledged(lookFor);
+    }
+
+    private U acknowledged(Map.Entry<T, U> lookFor) {
+        U ack = lookFor.getValue();
         producedItems.remove(lookFor.getKey());
         processedSet.remove(lookFor);
         return ack;
     }
+
+    private boolean hasTimeoutDurationReached(long now) {
+        boolean b = (this.timeoutDuration > -1) && (this.lastAck != 0) && ((now - this.lastAck) >= timeoutDuration);
+        lastAck = System.currentTimeMillis();
+        return b;
+    }
 }
diff --git a/intg/src/test/java/org/apache/atlas/pc/StatusReporterTest.java b/intg/src/test/java/org/apache/atlas/pc/StatusReporterTest.java
index 3e50562..45bdbb0 100644
--- a/intg/src/test/java/org/apache/atlas/pc/StatusReporterTest.java
+++ b/intg/src/test/java/org/apache/atlas/pc/StatusReporterTest.java
@@ -23,6 +23,8 @@ import org.testng.annotations.Test;
 import java.util.concurrent.BlockingQueue;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
 
 public class StatusReporterTest {
     private static class IntegerConsumer extends WorkItemConsumer<Integer> {
@@ -91,4 +93,20 @@ public class StatusReporterTest {
             statusReporter.processed((Integer) result);
         }
     }
+
+    @Test
+    public void reportWithTimeout() throws InterruptedException {
+        StatusReporter<Integer, Integer> statusReporter = new StatusReporter<>(2000);
+        statusReporter.produced(1, 100);
+        statusReporter.produced(2, 200);
+
+        statusReporter.processed(2);
+        Integer ack = statusReporter.ack();
+        assertNull(ack);
+
+        Thread.sleep(3000);
+        ack = statusReporter.ack();
+        assertNotNull(ack);
+        assertEquals(ack, Integer.valueOf(200));
+    }
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index c18c4ab..1d29bf8 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -235,6 +235,10 @@ public class ImportService {
         result.incrementMeticsCounter("duration", getDuration(this.endTimestamp, this.startTimestamp));
 
         result.setOperationStatus(AtlasImportResult.OperationStatus.SUCCESS);
+        if (isMigrationMode(result.getRequest())) {
+            return;
+        }
+
         auditsWriter.write(userName, result, startTimestamp, endTimestamp, importSource.getCreationOrder());
     }
 
@@ -250,7 +254,7 @@ public class ImportService {
 
     private EntityImportStream createZipSource(AtlasImportRequest request, InputStream inputStream, String configuredTemporaryDirectory) throws AtlasBaseException {
         try {
-            if (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION) || (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_FORMAT) &&
+            if (isMigrationMode(request) || (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_FORMAT) &&
                     request.getOptions().get(AtlasImportRequest.OPTION_KEY_FORMAT).equals(AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT))) {
                 LOG.info("ZipSource Format: ZipDirect: Size: {}", request.getOptions().get("size"));
                 return getZipDirectEntityImportStream(request, inputStream);
@@ -288,4 +292,8 @@ public class ImportService {
                 exportRequest.getFetchTypeOptionValue().equalsIgnoreCase(AtlasExportRequest.FETCH_TYPE_INCREMENTAL) &&
                 exportRequest.getSkipLineageOptionValue();
     }
+
+    private boolean isMigrationMode(AtlasImportRequest request) {
+        return request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION);
+    }
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/MigrationProgressService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/MigrationProgressService.java
index 54ae32a..6bb5f1e 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/MigrationProgressService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/MigrationProgressService.java
@@ -19,35 +19,61 @@
 package org.apache.atlas.repository.impexp;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasException;
 import org.apache.atlas.annotation.AtlasService;
 import org.apache.atlas.model.impexp.MigrationStatus;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
 import org.apache.atlas.repository.graphdb.GraphDBMigrator;
+import org.apache.atlas.repository.migration.DataMigrationStatusService;
 import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
 import javax.inject.Singleton;
 
+import static org.apache.atlas.AtlasConstants.ATLAS_MIGRATION_MODE_FILENAME;
+
 @AtlasService
 @Singleton
 public class MigrationProgressService {
     private static final Logger LOG = LoggerFactory.getLogger(MigrationProgressService.class);
+    private static final String FILE_EXTENSION_ZIP = ".zip";
 
     public static final String MIGRATION_QUERY_CACHE_TTL = "atlas.migration.query.cache.ttlInSecs";
 
     @VisibleForTesting
-    static long DEFAULT_CACHE_TTL_IN_SECS = 30 * 1000; // 30 secs
+    static long DEFAULT_CACHE_TTL_IN_SECS = 120 * 1000; // 30 secs
 
     private final long            cacheValidity;
     private final GraphDBMigrator migrator;
     private       MigrationStatus cachedStatus;
     private       long            cacheExpirationTime = 0;
+    private DataMigrationStatusService dataMigrationStatusService;
+    private boolean zipFileBasedMigrationImport;
 
     @Inject
     public MigrationProgressService(Configuration configuration, GraphDBMigrator migrator) {
         this.migrator      = migrator;
         this.cacheValidity = (configuration != null) ? configuration.getLong(MIGRATION_QUERY_CACHE_TTL, DEFAULT_CACHE_TTL_IN_SECS) : DEFAULT_CACHE_TTL_IN_SECS;
+
+        this.zipFileBasedMigrationImport = isZipFileBasedMigrationEnabled();
+        initConditionallyZipFileBasedMigrator();
+    }
+
+    private void initConditionallyZipFileBasedMigrator() {
+        if (!zipFileBasedMigrationImport) {
+            return;
+        }
+
+        dataMigrationStatusService = new DataMigrationStatusService(AtlasGraphProvider.getGraphInstance());
+        dataMigrationStatusService.init(getFileNameFromMigrationProperty());
+    }
+
+    private boolean isZipFileBasedMigrationEnabled() {
+        return StringUtils.endsWithIgnoreCase(getFileNameFromMigrationProperty(), FILE_EXTENSION_ZIP);
     }
 
     public MigrationStatus getStatus() {
@@ -58,7 +84,11 @@ public class MigrationProgressService {
         long currentTime = System.currentTimeMillis();
 
         if(resetCache(currentTime)) {
-            cachedStatus = migrator.getMigrationStatus();
+            if (this.zipFileBasedMigrationImport) {
+                cachedStatus = dataMigrationStatusService.getStatus();
+            } else {
+                cachedStatus = migrator.getMigrationStatus();
+            }
         }
 
         return cachedStatus;
@@ -73,4 +103,12 @@ public class MigrationProgressService {
 
         return ret;
     }
+
+    public String getFileNameFromMigrationProperty() {
+        try {
+            return ApplicationProperties.get().getString(ATLAS_MIGRATION_MODE_FILENAME, StringUtils.EMPTY);
+        } catch (AtlasException e) {
+            return StringUtils.EMPTY;
+        }
+    }
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java
new file mode 100644
index 0000000..8c7a3a8
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.migration;
+
+import org.apache.atlas.model.migration.MigrationImportStatus;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+import java.util.Iterator;
+
+import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getEncodedProperty;
+import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
+import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.encodePropertyKey;
+import static org.apache.atlas.type.Constants.INTERNAL_PROPERTY_KEY_PREFIX;
+
+public class DataMigrationStatusService {
+    private static final Logger LOG = LoggerFactory.getLogger(DataMigrationStatusService.class);
+    private final MigrationStatusVertexManagement migrationStatusVertexManagement;
+
+    private MigrationImportStatus status;
+
+    public DataMigrationStatusService() {
+        this.migrationStatusVertexManagement = new MigrationStatusVertexManagement(AtlasGraphProvider.getGraphInstance());
+    }
+
+    public DataMigrationStatusService(AtlasGraph atlasGraph) {
+        this.migrationStatusVertexManagement = new MigrationStatusVertexManagement(atlasGraph);
+    }
+
+
+    public void init(String fileToImport) {
+        this.status = new MigrationImportStatus(fileToImport);
+        if (!this.migrationStatusVertexManagement.exists(fileToImport)) {
+            return;
+        }
+
+        getCreate(fileToImport);
+    }
+
+    public MigrationImportStatus getCreate(String fileName) {
+        return getCreate(new MigrationImportStatus(fileName));
+    }
+
+    public MigrationImportStatus getCreate(MigrationImportStatus status) {
+        try {
+            this.status = this.migrationStatusVertexManagement.createOrUpdate(status);
+        } catch (Exception ex) {
+            LOG.error("DataMigrationStatusService: Setting status: {}: Resulted in error!", status.getName(), ex);
+        }
+
+        return this.status;
+    }
+
+    public MigrationImportStatus getStatus() {
+        if (this.status != null &&
+                StringUtils.isEmpty(this.status.getOperationStatus()) &&
+                this.migrationStatusVertexManagement.exists(this.status.getName())) {
+            return getCreate(this.status);
+        } else {
+            return this.status;
+        }
+    }
+
+    public MigrationImportStatus getByName(String name) {
+        return this.migrationStatusVertexManagement.findByName(name);
+    }
+
+    public void delete() {
+        if (this.status == null) {
+            return;
+        }
+
+        MigrationImportStatus status = getByName(this.status.getName());
+        this.migrationStatusVertexManagement.delete(status.getName());
+        this.status = null;
+    }
+
+    public void savePosition(String position) {
+        this.status.setCurrentIndex(Long.valueOf(position));
+        this.migrationStatusVertexManagement.updateVertexPartial(this.status);
+    }
+
+    public void setStatus(String status) {
+        this.status.setOperationStatus(status);
+        this.migrationStatusVertexManagement.updateVertexPartial(this.status);
+    }
+
+    private static class MigrationStatusVertexManagement {
+        public static final String PROPERTY_KEY_START_TIME = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "migration.startTime");
+        public static final String PROPERTY_KEY_SIZE = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "migration.size");
+        public static final String PROPERTY_KEY_POSITION = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "migration.position");
+        public static final String PROPERTY_KEY_STATUS = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "migration.status");
+
+        private AtlasGraph atlasGraph;
+        private AtlasVertex vertex;
+
+        public MigrationStatusVertexManagement(AtlasGraph atlasGraph) {
+            this.atlasGraph = atlasGraph;
+        }
+
+        public MigrationImportStatus createOrUpdate(MigrationImportStatus status) {
+            this.vertex = findByNameInternal(status.getName());
+
+            if (this.vertex == null) {
+                this.vertex = atlasGraph.addVertex();
+                LOG.info("MigrationStatusVertexManagement: Vertex created!");
+                updateVertex(this.vertex, status);
+            }
+
+            return to(this.vertex);
+        }
+
+        public boolean exists(String name) {
+            return findByNameInternal(name) != null;
+        }
+
+        public MigrationImportStatus findByName(String name) {
+            if (this.vertex != null) {
+                return to(this.vertex);
+            }
+
+            AtlasVertex v = findByNameInternal(name);
+            if (v == null) {
+                return null;
+            }
+
+            this.vertex = v;
+            LOG.info("MigrationImportStatus: Vertex found!");
+            return to(v);
+        }
+
+        public void delete(String name) {
+            try {
+                AtlasVertex vertex = findByNameInternal(name);
+                atlasGraph.removeVertex(vertex);
+                this.vertex = null;
+            } finally {
+                atlasGraph.commit();
+            }
+        }
+
+        private AtlasVertex findByNameInternal(String name) {
+            try {
+                String idxQueryString = String.format("%s\"%s\":\"%s\"", AtlasGraphUtilsV2.getIndexSearchPrefix(), Constants.GUID_PROPERTY_KEY, name);
+                AtlasIndexQuery idxQuery = atlasGraph.indexQuery(Constants.VERTEX_INDEX, idxQueryString);
+                Iterator<AtlasIndexQuery.Result<Object, Object>> results = idxQuery.vertices();
+
+                AtlasIndexQuery.Result<?, ?> qryResult = results.hasNext() ? results.next() : null;
+                if (qryResult != null) {
+                    return qryResult.getVertex();
+                } else {
+                    return null;
+                }
+            } catch (Exception e) {
+                LOG.error("MigrationStatusVertexManagement.findByNameInternal: Failed!", e);
+            } finally {
+                atlasGraph.commit();
+            }
+
+            return null;
+        }
+
+        public void updateVertexPartial(MigrationImportStatus status) {
+            try {
+                setEncodedProperty(vertex, PROPERTY_KEY_POSITION, status.getCurrentIndex());
+            } catch (Exception e) {
+                LOG.warn("Error updating status. Please rely on log messages.", e);
+            } finally {
+                atlasGraph.commit();
+            }
+        }
+
+        private void updateVertex(AtlasVertex vertex, MigrationImportStatus status) {
+            try {
+                setEncodedProperty(vertex, Constants.GUID_PROPERTY_KEY, status.getName());
+
+                setEncodedProperty(vertex, PROPERTY_KEY_START_TIME,
+                        (status.getStartTime() != null)
+                                ? status.getStartTime().getTime()
+                                : System.currentTimeMillis());
+
+                setEncodedProperty(vertex, PROPERTY_KEY_SIZE, status.getTotalCount());
+                setEncodedProperty(vertex, PROPERTY_KEY_POSITION, status.getCurrentIndex());
+                setEncodedProperty(vertex, PROPERTY_KEY_STATUS, status.getOperationStatus());
+            } catch (Exception ex) {
+                LOG.error("Error updating MigrationImportStatus vertex. Status may not be persisted correctly.", ex);
+            } finally {
+                atlasGraph.commit();
+            }
+        }
+
+        private static MigrationImportStatus to(AtlasVertex vertex) {
+            MigrationImportStatus ret = new MigrationImportStatus();
+
+            try {
+                ret.setName(getEncodedProperty(vertex, Constants.GUID_PROPERTY_KEY, String.class));
+
+                Long dateValue = getEncodedProperty(vertex, PROPERTY_KEY_START_TIME, Long.class);
+                if (dateValue != null) {
+                    ret.setStartTime(new Date(dateValue));
+                }
+
+                Long size = getEncodedProperty(vertex, PROPERTY_KEY_SIZE, Long.class);
+                if (size != null) {
+                    ret.setTotalCount(size);
+                }
+
+                Long position = getEncodedProperty(vertex, PROPERTY_KEY_POSITION, Long.class);
+                if (position != null) {
+                    ret.setCurrentIndex(position);
+                }
+
+                ret.setOperationStatus(getEncodedProperty(vertex, PROPERTY_KEY_STATUS, String.class));
+            } catch (Exception ex) {
+                LOG.error("Error converting to MigrationImportStatus. Will proceed with default values.", ex);
+            }
+
+            return ret;
+        }
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
index 35a76ea..f44f2a8 100644
--- a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
+++ b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
@@ -23,6 +23,8 @@ import org.apache.atlas.AtlasException;
 import org.apache.atlas.RequestContext;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.migration.MigrationImportStatus;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
 import org.apache.atlas.repository.impexp.ImportService;
 import org.apache.atlas.type.AtlasType;
 import org.apache.commons.lang.StringUtils;
@@ -41,7 +43,7 @@ public class ZipFileMigrationImporter implements Runnable {
     private static final Logger LOG = LoggerFactory.getLogger(ZipFileMigrationImporter.class);
 
     private static final String APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS = "atlas.migration.mode.workers";
-    private static final String APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE       = "atlas.migration.mode.batch.size";
+    private static final String APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE = "atlas.migration.mode.batch.size";
     private static final String DEFAULT_NUMBER_OF_WORKERS = "4";
     private static final String DEFAULT_BATCH_SIZE = "100";
     private static final String ZIP_FILE_COMMENT_ENTITIES_COUNT = "entitiesCount";
@@ -51,20 +53,24 @@ public class ZipFileMigrationImporter implements Runnable {
 
     private final ImportService importService;
     private final String fileToImport;
+    private DataMigrationStatusService dataMigrationStatusService;
+    private MigrationImportStatus migrationImportStatus;
 
     public ZipFileMigrationImporter(ImportService importService, String fileName) {
         this.importService = importService;
         this.fileToImport = fileName;
+        this.dataMigrationStatusService = new DataMigrationStatusService(AtlasGraphProvider.getGraphInstance());
     }
 
     @Override
     public void run() {
         try {
-            FileWatcher fileWatcher = new FileWatcher(fileToImport);
-            fileWatcher.start();
+            detectFileToImport();
 
             int streamSize = getStreamSizeFromComment(fileToImport);
-            performImport(new FileInputStream(new File(fileToImport)), streamSize);
+            migrationImportStatus = getCreateMigrationStatus(fileToImport, streamSize);
+            performImport(fileToImport, streamSize, Long.toString(migrationImportStatus.getCurrentIndex()));
+            dataMigrationStatusService.setStatus("DONE");
         } catch (IOException e) {
             LOG.error("Migration Import: IO Error!", e);
         } catch (AtlasBaseException e) {
@@ -72,6 +78,22 @@ public class ZipFileMigrationImporter implements Runnable {
         }
     }
 
+    private MigrationImportStatus getCreateMigrationStatus(String fileName, int streamSize) {
+        MigrationImportStatus status = new MigrationImportStatus(fileName);
+        status.setTotalCount(streamSize);
+
+        MigrationImportStatus statusRetrieved = dataMigrationStatusService.getCreate(status);
+
+        LOG.info("DataMigrationStatusService: Position: {}", statusRetrieved.getCurrentIndex());
+        dataMigrationStatusService.setStatus("STARTED");
+        return statusRetrieved;
+    }
+
+    private void detectFileToImport() throws IOException {
+        FileWatcher fileWatcher = new FileWatcher(fileToImport);
+        fileWatcher.start();
+    }
+
     private int getStreamSizeFromComment(String fileToImport) {
         int ret = 1;
         try {
@@ -99,13 +121,13 @@ public class ZipFileMigrationImporter implements Runnable {
         return entitiesCount;
     }
 
-    private void performImport(InputStream fs, int streamSize) throws AtlasBaseException {
+    private void performImport(String fileToImport, int streamSize, String startPosition) throws AtlasBaseException {
         try {
-            LOG.info("Migration Import: {}: Starting...", fileToImport);
-
+            LOG.info("Migration Import: {}: Starting at: {}...", fileToImport, startPosition);
+            InputStream fs = new FileInputStream(new File(fileToImport));
             RequestContext.get().setUser(getUserNameFromEnvironment(), null);
 
-            importService.run(fs, getImportRequest(streamSize),
+            importService.run(fs, getImportRequest(fileToImport, streamSize, startPosition),
                     getUserNameFromEnvironment(),
                     InetAddress.getLocalHost().getHostName(),
                     InetAddress.getLocalHost().getHostAddress());
@@ -122,16 +144,19 @@ public class ZipFileMigrationImporter implements Runnable {
         return System.getProperty(ENV_USER_NAME);
     }
 
-    private AtlasImportRequest getImportRequest(int streamSize) throws AtlasException {
+    private AtlasImportRequest getImportRequest(String fileToImport, int streamSize, String position) throws AtlasException {
         AtlasImportRequest request = new AtlasImportRequest();
 
+        request.setOption(AtlasImportRequest.OPTION_KEY_MIGRATION_FILE_NAME, fileToImport);
         request.setSizeOption(streamSize);
         request.setOption(AtlasImportRequest.OPTION_KEY_MIGRATION, "true");
         request.setOption(AtlasImportRequest.OPTION_KEY_NUM_WORKERS, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS, DEFAULT_NUMBER_OF_WORKERS));
         request.setOption(AtlasImportRequest.OPTION_KEY_BATCH_SIZE, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE, DEFAULT_BATCH_SIZE));
+        request.setOption(AtlasImportRequest.START_POSITION_KEY, (StringUtils.isEmpty(position) ? "0" : position));
 
         return request;
     }
+
     private String getPropertyValue(String property, String defaultValue) throws AtlasException {
         return ApplicationProperties.get().getString(property, defaultValue);
     }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
index 4c912fd..ff55e40 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
@@ -20,12 +20,14 @@ package org.apache.atlas.repository.store.graph.v2.bulkimport;
 
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
 import org.apache.atlas.model.impexp.AtlasImportResult;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.repository.converters.AtlasFormatConverters;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
 import org.apache.atlas.repository.graph.AtlasGraphProvider;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.migration.DataMigrationStatusService;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
 import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
@@ -65,10 +67,12 @@ public class MigrationImport extends ImportStrategy {
             throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "importResult should contain request");
         }
 
-        int index = 0;
+        DataMigrationStatusService dataMigrationStatusService = createMigrationStatusService(importResult);
+
+        long index = 0;
         int streamSize = entityStream.size();
         EntityMutationResponse ret = new EntityMutationResponse();
-        EntityCreationManager creationManager = createEntityCreationManager(atlasGraph, importResult);
+        EntityCreationManager creationManager = createEntityCreationManager(atlasGraph, importResult, dataMigrationStatusService);
 
         try {
             LOG.info("Migration Import: Size: {}: Starting...", streamSize);
@@ -85,14 +89,23 @@ public class MigrationImport extends ImportStrategy {
         return ret;
     }
 
-    private EntityCreationManager createEntityCreationManager(AtlasGraph threadedAtlasGraph, AtlasImportResult importResult) {
+    private DataMigrationStatusService createMigrationStatusService(AtlasImportResult importResult) {
+        DataMigrationStatusService dataMigrationStatusService = new DataMigrationStatusService();
+        dataMigrationStatusService.init(importResult.getRequest().getOptions().get(AtlasImportRequest.OPTION_KEY_MIGRATION_FILE_NAME));
+        return dataMigrationStatusService;
+    }
+
+    private EntityCreationManager createEntityCreationManager(AtlasGraph threadedAtlasGraph,
+                                                              AtlasImportResult importResult,
+                                                              DataMigrationStatusService dataMigrationStatusService) {
+        atlasGraph = threadedAtlasGraph;
         int batchSize = importResult.getRequest().getOptionKeyBatchSize();
         int numWorkers = getNumWorkers(importResult.getRequest().getOptionKeyNumWorkers());
 
         EntityConsumerBuilder consumerBuilder =
                 new EntityConsumerBuilder(threadedAtlasGraph, entityStore, entityGraphRetriever, typeRegistry, batchSize);
 
-        return new EntityCreationManager(consumerBuilder, batchSize, numWorkers, importResult);
+        return new EntityCreationManager(consumerBuilder, batchSize, numWorkers, importResult, dataMigrationStatusService);
     }
 
     private static int getNumWorkers(int numWorkersFromOptions) {
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
index e8f4b02..d0fac10 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
@@ -155,7 +155,10 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith
 
         LOG.error("Rollback: Done! Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount);
         pause(retryCount);
-        LOG.warn("Commit error! Will pause and retry: Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount, ex);
+        String exceptionClass = ex.getClass().getSimpleName();
+        if (!exceptionClass.equals("JanusGraphException") && !exceptionClass.equals("PermanentLockingException")) {
+            LOG.warn("Commit error! Will pause and retry: Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount, ex);
+        }
         retryProcessEntity(retryCount);
     }
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
index 16bb49e..177b563 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
@@ -23,6 +23,7 @@ import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.pc.StatusReporter;
 import org.apache.atlas.pc.WorkItemBuilder;
 import org.apache.atlas.pc.WorkItemManager;
+import org.apache.atlas.repository.migration.DataMigrationStatusService;
 import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
 import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
 import org.apache.commons.lang.StringUtils;
@@ -32,24 +33,28 @@ import org.slf4j.LoggerFactory;
 public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManager {
     private static final Logger LOG = LoggerFactory.getLogger(EntityCreationManager.class);
     private static final String WORKER_PREFIX = "migration-import";
+    private static final long STATUS_REPORT_TIMEOUT_DURATION = 1 * 60 * 1000; // 5 min
 
     private final StatusReporter<String, String> statusReporter;
     private final AtlasImportResult importResult;
+    private final DataMigrationStatusService dataMigrationStatusService;
     private String currentTypeName;
     private float currentPercent;
     private EntityImportStream entityImportStream;
 
-    public EntityCreationManager(WorkItemBuilder builder, int batchSize, int numWorkers, AtlasImportResult importResult) {
+    public EntityCreationManager(WorkItemBuilder builder, int batchSize, int numWorkers, AtlasImportResult importResult, DataMigrationStatusService dataMigrationStatusService) {
         super(builder, WORKER_PREFIX, batchSize, numWorkers, true);
         this.importResult = importResult;
+        this.dataMigrationStatusService = dataMigrationStatusService;
 
-        this.statusReporter = new StatusReporter<>();
+        this.statusReporter = new StatusReporter<>(STATUS_REPORT_TIMEOUT_DURATION);
     }
 
-    public int read(EntityImportStream entityStream) {
-        int currentIndex = 0;
+    public long read(EntityImportStream entityStream) {
+        long currentIndex = this.dataMigrationStatusService.getStatus().getCurrentIndex();
         AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo;
         this.entityImportStream = entityStream;
+        this.dataMigrationStatusService.setStatus("IN_PROGRESS");
         while ((entityWithExtInfo = entityStream.getNextEntityWithExtInfo()) != null) {
             AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
             if (entity == null) {
@@ -66,7 +71,7 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManag
         return currentIndex;
     }
 
-    private void produce(int currentIndex, String typeName, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
+    private void produce(long currentIndex, String typeName, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
         String previousTypeName = getCurrentTypeName();
 
         if (StringUtils.isNotEmpty(typeName)
@@ -104,7 +109,9 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManag
         }
 
         importResult.incrementMeticsCounter(split[0]);
-        this.currentPercent = updateImportMetrics(split[0], Integer.parseInt(split[1]), this.entityImportStream.size(), getCurrentPercent());
+        String currentPosition = split[1];
+        dataMigrationStatusService.savePosition(currentPosition);
+        this.currentPercent = updateImportMetrics(split[0], Integer.parseInt(currentPosition), this.entityImportStream.size(), getCurrentPercent());
     }
 
     private static float updateImportMetrics(String typeNameGuid, int currentIndex, int streamSize, float currentPercent) {
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java
new file mode 100644
index 0000000..bf1d9a0
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.impexp;
+
+import com.google.inject.Inject;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.migration.MigrationImportStatus;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.migration.DataMigrationStatusService;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.util.Date;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class DataMigrationStatusServiceTest {
+    @Inject
+    AtlasGraph atlasGraph;
+
+    @Test
+    public void createUpdateDelete() throws AtlasBaseException {
+        DataMigrationStatusService dataMigrationStatusService = new DataMigrationStatusService(atlasGraph);
+
+        MigrationImportStatus expected = new MigrationImportStatus("/tmp/defg.zip");
+        expected.setTotalCount(3333);
+        expected.setCurrentIndex(20);
+        expected.setStartTime(new Date());
+
+        MigrationImportStatus ret = dataMigrationStatusService.getCreate(expected);
+
+        assertNotNull(ret);
+        assertEquals(ret.getName(), expected.getName());
+        assertEquals(ret.getStartTime(), expected.getStartTime());
+        assertEquals(ret.getTotalCount(), expected.getTotalCount());
+        assertEquals(ret.getCurrentIndex(), expected.getCurrentIndex());
+
+        dataMigrationStatusService.savePosition("100");
+        assertNotNull(dataMigrationStatusService.getStatus());
+        assertNotNull(dataMigrationStatusService.getStatus().getCurrentIndex(), "100");
+        assertNotNull(dataMigrationStatusService.getCreate(expected).getCurrentIndex(), "100");
+
+        dataMigrationStatusService.delete();
+        assertNull(dataMigrationStatusService.getStatus());
+        assertNull(dataMigrationStatusService.getByName(ret.getName()));
+    }
+}