You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/09/15 15:40:47 UTC

[4/4] atlas git commit: ATLAS-2129: import fix to handle shutdown while in the middle of import

ATLAS-2129: import fix to handle shutdown while in the middle of import

Signed-off-by: Madhan Neethiraj <ma...@apache.org>
(cherry picked from commit 81e5444f4ce9635465632b90ac9d97eec3a16a6b)


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/c623835f
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/c623835f
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/c623835f

Branch: refs/heads/master
Commit: c623835f5376bdb04797ba9cb886da70fbef0f8c
Parents: f1c4646
Author: Ashutosh Mestry <am...@apache.org>
Authored: Fri Sep 15 07:43:55 2017 -0700
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Fri Sep 15 08:17:40 2017 -0700

----------------------------------------------------------------------
 .../atlas/repository/impexp/ImportService.java  |  10 +-
 .../store/graph/AtlasEntityStore.java           |   6 +-
 .../repository/store/graph/BulkImporter.java    |  34 ++++
 .../store/graph/v1/AtlasEntityStoreV1.java      | 156 +-------------
 .../store/graph/v1/BulkImporterImpl.java        | 202 +++++++++++++++++++
 .../test/java/org/apache/atlas/TestModules.java |   1 +
 .../repository/impexp/ImportServiceTest.java    |  27 ++-
 7 files changed, 258 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/c623835f/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index bbe9ed0..650741e 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -22,7 +22,7 @@ import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.impexp.AtlasImportRequest;
 import org.apache.atlas.model.impexp.AtlasImportResult;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
-import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.repository.store.graph.BulkImporter;
 import org.apache.atlas.store.AtlasTypeDefStore;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.commons.collections.MapUtils;
@@ -42,17 +42,17 @@ public class ImportService {
     private static final Logger LOG = LoggerFactory.getLogger(ImportService.class);
 
     private final AtlasTypeDefStore typeDefStore;
-    private final AtlasEntityStore entityStore;
     private final AtlasTypeRegistry typeRegistry;
+    private final BulkImporter bulkImporter;
 
     private long startTimestamp;
     private long endTimestamp;
 
     @Inject
-    public ImportService(final AtlasTypeDefStore typeDefStore, final AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
+    public ImportService(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry, BulkImporter bulkImporter) {
         this.typeDefStore = typeDefStore;
-        this.entityStore = entityStore;
         this.typeRegistry = typeRegistry;
+        this.bulkImporter = bulkImporter;
     }
 
     public AtlasImportResult run(ZipSource source, String userName,
@@ -154,7 +154,7 @@ public class ImportService {
     }
 
     private void processEntities(ZipSource importSource, AtlasImportResult result) throws AtlasBaseException {
-        this.entityStore.bulkImport(importSource, result);
+        this.bulkImporter.bulkImport(importSource, result);
 
         endTimestamp = System.currentTimeMillis();
         result.incrementMeticsCounter("duration", (int) (this.endTimestamp - this.startTimestamp));

http://git-wip-us.apache.org/repos/asf/atlas/blob/c623835f/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
index a4163f2..5a0b74e 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
@@ -18,12 +18,10 @@
 package org.apache.atlas.repository.store.graph;
 
 import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.impexp.AtlasImportResult;
 import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.EntityMutationResponse;
-import org.apache.atlas.repository.store.graph.v1.EntityImportStream;
 import org.apache.atlas.repository.store.graph.v1.EntityStream;
 import org.apache.atlas.type.AtlasEntityType;
 
@@ -69,12 +67,12 @@ public interface AtlasEntityStore {
     EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate) throws AtlasBaseException;
 
     /**
-     * Create or update  entities in the stream using repeated commits of connected entities
+     * Create or update  entities with parameters necessary for import process
      * @param entityStream AtlasEntityStream
      * @return EntityMutationResponse Entity mutations operations with the corresponding set of entities on which these operations were performed
      * @throws AtlasBaseException
      */
-    EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException;
+    EntityMutationResponse createOrUpdateForImport(EntityStream entityStream) throws AtlasBaseException;
 
     /**
      * Update a single entity

http://git-wip-us.apache.org/repos/asf/atlas/blob/c623835f/repository/src/main/java/org/apache/atlas/repository/store/graph/BulkImporter.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/BulkImporter.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/BulkImporter.java
new file mode 100644
index 0000000..7139eac
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/BulkImporter.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.repository.store.graph.v1.EntityImportStream;
+
+public interface BulkImporter {
+
+    /**
+     * Create or update  entities in the stream using repeated commits of connected entities
+     * @param entityStream AtlasEntityStream
+     * @return EntityMutationResponse Entity mutations operations with the corresponding set of entities on which these operations were performed
+     * @throws AtlasBaseException
+     */
+    EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException;
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/c623835f/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index 3c19d4d..a5db81b 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -23,12 +23,10 @@ import org.apache.atlas.GraphTransactionInterceptor;
 import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.annotation.GraphTransaction;
 import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.impexp.AtlasImportResult;
 import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
-import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
@@ -52,11 +50,8 @@ import javax.inject.Inject;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
 import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
@@ -146,115 +141,6 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
         return ret;
     }
 
-    @Override
-    public EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("==> bulkImport()");
-        }
-
-        if (entityStream == null || !entityStream.hasNext()) {
-            throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
-        }
-
-        EntityMutationResponse ret = new EntityMutationResponse();
-        ret.setGuidAssignments(new HashMap<String, String>());
-
-        Set<String> processedGuids = new HashSet<>();
-        float       currentPercent = 0f;
-
-        List<String> residualList = new ArrayList<>();
-        EntityImportStreamWithResidualList entityImportStreamWithResidualList = new EntityImportStreamWithResidualList(entityStream, residualList);
-        while (entityImportStreamWithResidualList.hasNext()) {
-            AtlasEntityWithExtInfo entityWithExtInfo = entityImportStreamWithResidualList.getNextEntityWithExtInfo();
-            AtlasEntity            entity            = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
-
-            if (entity == null || processedGuids.contains(entity.getGuid())) {
-                continue;
-            }
-
-            AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, entityStream);
-            try {
-                EntityMutationResponse resp = createOrUpdateForImport(oneEntityStream);
-
-                if (resp.getGuidAssignments() != null) {
-                    ret.getGuidAssignments().putAll(resp.getGuidAssignments());
-                }
-
-                currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids, entityStream.getPosition(),
-                                                     entityImportStreamWithResidualList.getStreamSize(), currentPercent);
-
-                entityStream.onImportComplete(entity.getGuid());
-            } catch (AtlasBaseException e) {
-                if (!updateResidualList(e, residualList, entityWithExtInfo.getEntity().getGuid())) {
-                    throw e;
-                }
-            }
-        }
-
-        importResult.getProcessedEntities().addAll(processedGuids);
-        LOG.info("bulkImport(): done. Total number of entities (including referred entities) imported: {}", processedGuids.size());
-
-        return ret;
-    }
-
-    private boolean updateResidualList(AtlasBaseException e, List<String> lineageList, String guid) {
-        if (!e.getAtlasErrorCode().getErrorCode().equals(AtlasErrorCode.INVALID_OBJECT_ID.getErrorCode())) {
-            return false;
-        }
-
-        lineageList.add(guid);
-        return true;
-    }
-
-    private float updateImportMetrics(AtlasEntityWithExtInfo currentEntity,
-                                      EntityMutationResponse resp,
-                                      AtlasImportResult importResult,
-                                      Set<String> processedGuids,
-                                      int currentIndex, int streamSize, float currentPercent) {
-        updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
-        updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
-        updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
-
-        String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)",
-                currentEntity.getEntity().getTypeName(),
-                currentIndex,
-                currentEntity.getEntity().getGuid());
-
-        return updateImportProgress(LOG, currentIndex + 1, streamSize, currentPercent, lastEntityImported);
-    }
-
-    private static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent,
-                                              String additionalInfo) {
-        final double tolerance = 0.000001;
-        final int MAX_PERCENT = 100;
-
-        float percent = (float) ((currentIndex * MAX_PERCENT) / streamSize);
-        boolean updateLog = Double.compare(percent, currentPercent) > tolerance;
-        float updatedPercent = (MAX_PERCENT < streamSize) ? percent :
-                ((updateLog) ? ++currentPercent : currentPercent);
-
-        if (updateLog) {
-            log.info("bulkImport(): progress: {}% (of {}) - {}", (int) Math.ceil(percent), streamSize, additionalInfo);
-        }
-
-        return updatedPercent;
-    }
-
-    private static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) {
-        if (list == null) {
-            return;
-        }
-
-        for (AtlasEntityHeader h : list) {
-            if (processedGuids.contains(h.getGuid())) {
-                continue;
-            }
-
-            processedGuids.add(h.getGuid());
-            importResult.incrementMeticsCounter(String.format(prefix, h.getTypeName()));
-        }
-    }
-
     private EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate, boolean replaceClassifications) throws AtlasBaseException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("==> createOrUpdate()");
@@ -287,8 +173,9 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
         return createOrUpdate(entityStream, isPartialUpdate, false);
     }
 
+    @Override
     @GraphTransaction
-    private EntityMutationResponse createOrUpdateForImport(EntityStream entityStream) throws AtlasBaseException {
+    public EntityMutationResponse createOrUpdateForImport(EntityStream entityStream) throws AtlasBaseException {
         return createOrUpdate(entityStream, false, true);
     }
 
@@ -763,43 +650,4 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
 
         return ret;
     }
-
-    private static class EntityImportStreamWithResidualList {
-        private final EntityImportStream stream;
-        private final List<String>       residualList;
-        private       boolean            navigateResidualList;
-        private       int                currentResidualListIndex;
-
-
-        public EntityImportStreamWithResidualList(EntityImportStream stream, List<String> residualList) {
-            this.stream                   = stream;
-            this.residualList             = residualList;
-            this.navigateResidualList     = false;
-            this.currentResidualListIndex = 0;
-        }
-
-        public AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
-            if (navigateResidualList == false) {
-                return stream.getNextEntityWithExtInfo();
-            } else {
-                stream.setPositionUsingEntityGuid(residualList.get(currentResidualListIndex++));
-                return stream.getNextEntityWithExtInfo();
-            }
-        }
-
-        public boolean hasNext() {
-            if (!navigateResidualList) {
-                boolean streamHasNext = stream.hasNext();
-                navigateResidualList = (streamHasNext == false);
-                return streamHasNext ? streamHasNext : (currentResidualListIndex < residualList.size());
-            } else {
-                return (currentResidualListIndex < residualList.size());
-            }
-        }
-
-        public int getStreamSize() {
-            return stream.size() + residualList.size();
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/c623835f/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java
new file mode 100644
index 0000000..e929d7f
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v1;
+
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.repository.store.graph.BulkImporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Component
+public class BulkImporterImpl implements BulkImporter {
+    private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV1.class);
+
+    private final AtlasEntityStore entityStore;
+
+    @Inject
+    public BulkImporterImpl(AtlasEntityStore entityStore) {
+        this.entityStore = entityStore;
+    }
+
+    @Override
+    public EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> bulkImport()");
+        }
+
+        if (entityStream == null || !entityStream.hasNext()) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
+        }
+
+        EntityMutationResponse ret = new EntityMutationResponse();
+        ret.setGuidAssignments(new HashMap<String, String>());
+
+        Set<String>  processedGuids = new HashSet<>();
+        float        currentPercent = 0f;
+        List<String> residualList   = new ArrayList<>();
+
+        EntityImportStreamWithResidualList entityImportStreamWithResidualList = new EntityImportStreamWithResidualList(entityStream, residualList);
+
+        while (entityImportStreamWithResidualList.hasNext()) {
+            AtlasEntityWithExtInfo entityWithExtInfo = entityImportStreamWithResidualList.getNextEntityWithExtInfo();
+            AtlasEntity            entity            = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
+
+            if (entity == null || processedGuids.contains(entity.getGuid())) {
+                continue;
+            }
+
+            AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, entityStream);
+
+            try {
+                EntityMutationResponse resp = entityStore.createOrUpdateForImport(oneEntityStream);
+
+                if (resp.getGuidAssignments() != null) {
+                    ret.getGuidAssignments().putAll(resp.getGuidAssignments());
+                }
+
+                currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids, entityStream.getPosition(), entityImportStreamWithResidualList.getStreamSize(), currentPercent);
+
+                entityStream.onImportComplete(entity.getGuid());
+            } catch (AtlasBaseException e) {
+                if (!updateResidualList(e, residualList, entityWithExtInfo.getEntity().getGuid())) {
+                    throw e;
+                }
+            } catch (Throwable e) {
+                AtlasBaseException abe = new AtlasBaseException(e);
+
+                if (!updateResidualList(abe, residualList, entityWithExtInfo.getEntity().getGuid())) {
+                    throw abe;
+                }
+            }
+        }
+
+        importResult.getProcessedEntities().addAll(processedGuids);
+        LOG.info("bulkImport(): done. Total number of entities (including referred entities) imported: {}", processedGuids.size());
+
+        return ret;
+    }
+
+
+    private boolean updateResidualList(AtlasBaseException e, List<String> lineageList, String guid) {
+        if (!e.getAtlasErrorCode().getErrorCode().equals(AtlasErrorCode.INVALID_OBJECT_ID.getErrorCode())) {
+            return false;
+        }
+
+        lineageList.add(guid);
+
+        return true;
+    }
+
+    private float updateImportMetrics(AtlasEntity.AtlasEntityWithExtInfo currentEntity,
+                                      EntityMutationResponse             resp,
+                                      AtlasImportResult                  importResult,
+                                      Set<String>                        processedGuids,
+                                      int currentIndex, int streamSize, float currentPercent) {
+        updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
+        updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
+        updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
+
+        String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid());
+
+        return updateImportProgress(LOG, currentIndex + 1, streamSize, currentPercent, lastEntityImported);
+    }
+
+    private static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent,
+                                              String additionalInfo) {
+        final double tolerance   = 0.000001;
+        final int    MAX_PERCENT = 100;
+
+        float   percent        = (float) ((currentIndex * MAX_PERCENT) / streamSize);
+        boolean updateLog      = Double.compare(percent, currentPercent) > tolerance;
+        float   updatedPercent = (MAX_PERCENT < streamSize) ? percent : ((updateLog) ? ++currentPercent : currentPercent);
+
+        if (updateLog) {
+            log.info("bulkImport(): progress: {}% (of {}) - {}", (int) Math.ceil(percent), streamSize, additionalInfo);
+        }
+
+        return updatedPercent;
+    }
+
+    private static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) {
+        if (list == null) {
+            return;
+        }
+
+        for (AtlasEntityHeader h : list) {
+            if (processedGuids.contains(h.getGuid())) {
+                continue;
+            }
+
+            processedGuids.add(h.getGuid());
+            importResult.incrementMeticsCounter(String.format(prefix, h.getTypeName()));
+        }
+    }
+
+    private static class EntityImportStreamWithResidualList {
+        private final EntityImportStream stream;
+        private final List<String>       residualList;
+        private       boolean            navigateResidualList;
+        private       int                currentResidualListIndex;
+
+
+        public EntityImportStreamWithResidualList(EntityImportStream stream, List<String> residualList) {
+            this.stream                   = stream;
+            this.residualList             = residualList;
+            this.navigateResidualList     = false;
+            this.currentResidualListIndex = 0;
+        }
+
+        public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
+            if (navigateResidualList == false) {
+                return stream.getNextEntityWithExtInfo();
+            } else {
+                stream.setPositionUsingEntityGuid(residualList.get(currentResidualListIndex++));
+                return stream.getNextEntityWithExtInfo();
+            }
+        }
+
+        public boolean hasNext() {
+            if (!navigateResidualList) {
+                boolean streamHasNext = stream.hasNext();
+                navigateResidualList = (streamHasNext == false);
+                return streamHasNext ? streamHasNext : (currentResidualListIndex < residualList.size());
+            } else {
+                return (currentResidualListIndex < residualList.size());
+            }
+        }
+
+        public int getStreamSize() {
+            return stream.size() + residualList.size();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/c623835f/repository/src/test/java/org/apache/atlas/TestModules.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/TestModules.java b/repository/src/test/java/org/apache/atlas/TestModules.java
index a442a01..e107556 100644
--- a/repository/src/test/java/org/apache/atlas/TestModules.java
+++ b/repository/src/test/java/org/apache/atlas/TestModules.java
@@ -151,6 +151,7 @@ public class TestModules {
 
             bind(LineageService.class).to(DataSetLineageService.class).asEagerSingleton();
             bind(AtlasLineageService.class).to(EntityLineageService.class).asEagerSingleton();
+            bind(BulkImporter.class).to(BulkImporterImpl.class).asEagerSingleton();
 
             bindTypeCache();
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/c623835f/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
index 3359850..8ec37e3 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
@@ -24,7 +24,6 @@ import org.apache.atlas.TestModules;
 import org.apache.atlas.TestUtilsV2;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.impexp.AtlasImportRequest;
-import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.store.AtlasTypeDefStore;
 import org.apache.atlas.type.AtlasClassificationType;
 import org.apache.atlas.type.AtlasTypeRegistry;
@@ -43,11 +42,11 @@ import java.util.Map;
 import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.*;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
 
 @Guice(modules = TestModules.TestOnlyModule.class)
 public class ImportServiceTest {
     private static final Logger LOG = LoggerFactory.getLogger(ImportServiceTest.class);
+    private final ImportService importService;
 
     @Inject
     AtlasTypeRegistry typeRegistry;
@@ -56,7 +55,9 @@ public class ImportServiceTest {
     private AtlasTypeDefStore typeDefStore;
 
     @Inject
-    private AtlasEntityStore entityStore;
+    public ImportServiceTest(ImportService importService) {
+        this.importService = importService;
+    }
 
     @BeforeTest
     public void setupTest() {
@@ -72,7 +73,7 @@ public class ImportServiceTest {
     @Test(dataProvider = "sales")
     public void importDB1(ZipSource zipSource) throws AtlasBaseException, IOException {
         loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry);
-        runAndVerifyQuickStart_v1_Import(new ImportService(typeDefStore, entityStore, typeRegistry), zipSource);
+        runAndVerifyQuickStart_v1_Import(importService, zipSource);
     }
 
     @DataProvider(name = "reporting")
@@ -83,7 +84,7 @@ public class ImportServiceTest {
     @Test(dataProvider = "reporting")
     public void importDB2(ZipSource zipSource) throws AtlasBaseException, IOException {
         loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry);
-        runAndVerifyQuickStart_v1_Import(new ImportService(typeDefStore, entityStore, typeRegistry), zipSource);
+        runAndVerifyQuickStart_v1_Import(importService, zipSource);
     }
 
     @DataProvider(name = "logging")
@@ -94,7 +95,7 @@ public class ImportServiceTest {
     @Test(dataProvider = "logging")
     public void importDB3(ZipSource zipSource) throws AtlasBaseException, IOException {
         loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry);
-        runAndVerifyQuickStart_v1_Import(new ImportService(typeDefStore, entityStore, typeRegistry), zipSource);
+        runAndVerifyQuickStart_v1_Import(importService, zipSource);
     }
 
     @DataProvider(name = "salesNewTypeAttrs")
@@ -105,7 +106,7 @@ public class ImportServiceTest {
     @Test(dataProvider = "salesNewTypeAttrs", dependsOnMethods = "importDB1")
     public void importDB4(ZipSource zipSource) throws AtlasBaseException, IOException {
         loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry);
-        runImportWithParameters(new ImportService(typeDefStore, entityStore, typeRegistry), getDefaultImportRequest(), zipSource);
+        runImportWithParameters(importService, getDefaultImportRequest(), zipSource);
     }
 
     @DataProvider(name = "salesNewTypeAttrs-next")
@@ -125,7 +126,7 @@ public class ImportServiceTest {
         options.put("updateTypeDefinition", "false");
         request.setOptions(options);
 
-        runImportWithParameters(new ImportService(typeDefStore, entityStore, typeRegistry), request, zipSource);
+        runImportWithParameters(importService, request, zipSource);
         assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName));
         assertEquals(typeDefStore.getEnumDefByName(newEnumDefName).getElementDefs().size(), 4);
     }
@@ -141,7 +142,7 @@ public class ImportServiceTest {
         options.put("updateTypeDefinition", "true");
         request.setOptions(options);
 
-        runImportWithParameters(new ImportService(typeDefStore, entityStore, typeRegistry), request, zipSource);
+        runImportWithParameters(importService, request, zipSource);
         assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName));
         assertEquals(typeDefStore.getEnumDefByName(newEnumDefName).getElementDefs().size(), 8);
     }
@@ -156,7 +157,7 @@ public class ImportServiceTest {
         loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry);
         loadModelFromJson("1000-Hadoop/1030-hive_model.json", typeDefStore, typeRegistry);
 
-        runImportWithNoParameters(getImportService(), zipSource);
+        runImportWithNoParameters(importService, zipSource);
     }
 
     @DataProvider(name = "hdfs_path1")
@@ -172,7 +173,7 @@ public class ImportServiceTest {
         loadModelFromResourcesJson("tag1.json", typeDefStore, typeRegistry);
 
         try {
-            runImportWithNoParameters(getImportService(), zipSource);
+            runImportWithNoParameters(importService, zipSource);
         } catch (AtlasBaseException e) {
             assertEquals(e.getAtlasErrorCode(), AtlasErrorCode.INVALID_IMPORT_ATTRIBUTE_TYPE_CHANGED);
             AtlasClassificationType tag1 = typeRegistry.getClassificationTypeByName("tag1");
@@ -181,8 +182,4 @@ public class ImportServiceTest {
             throw e;
         }
     }
-
-    private ImportService getImportService() {
-        return new ImportService(typeDefStore, entityStore, typeRegistry);
-    }
 }