You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2019/09/10 22:10:13 UTC

[atlas] branch master updated: ATLAS-3396: ZipSourceWithBackingDirectory: Implementation. Port to master.

This is an automated email from the ASF dual-hosted git repository.

amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 8bad6b0  ATLAS-3396: ZipSourceWithBackingDirectory: Implementation. Port to master.
8bad6b0 is described below

commit 8bad6b0b6724f15ec6173769bccd33184d842f8e
Author: Ashutosh Mestry <am...@hortonworks.com>
AuthorDate: Tue Sep 10 15:10:00 2019 -0700

    ATLAS-3396: ZipSourceWithBackingDirectory: Implementation. Port to master.
---
 distro/src/conf/atlas-application.properties       |   3 +
 docs/src/site/twiki/Import-API-Options.twiki       |  11 +
 .../java/org/apache/atlas/AtlasConfiguration.java  |   4 +-
 .../atlas/repository/impexp/ImportService.java     |  58 ++--
 .../apache/atlas/repository/impexp/ZipSource.java  |  10 +-
 .../impexp/ZipSourceWithBackingDirectory.java      | 371 +++++++++++++++++++++
 .../store/graph/v2/AtlasEntityStreamForImport.java |  52 +++
 .../store/graph/v2/BulkImporterImpl.java           |  11 +-
 .../store/graph/v2/EntityImportStream.java         |  28 ++
 .../repository/impexp/ExportIncrementalTest.java   |  68 ++--
 .../repository/impexp/ExportSkipLineageTest.java   |   7 +-
 .../atlas/repository/impexp/ImportServiceTest.java |  68 ++--
 .../impexp/ImportTransformsShaperTest.java         |   2 +-
 .../RelationshipAttributesExtractorTest.java       |   5 +-
 .../impexp/ReplicationEntityAttributeTest.java     |  22 +-
 .../impexp/ZipFileResourceTestUtils.java           |  49 +--
 .../atlas/repository/impexp/ZipSourceTest.java     |   4 +-
 .../ClassificationPropagationTest.java             |   6 +-
 .../apache/atlas/services/MetricsServiceTest.java  |   6 +-
 .../apache/atlas/web/resources/AdminResource.java  |   4 +-
 20 files changed, 670 insertions(+), 119 deletions(-)

diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties
index 471424b..7846452 100755
--- a/distro/src/conf/atlas-application.properties
+++ b/distro/src/conf/atlas-application.properties
@@ -82,6 +82,9 @@ ${graph.index.properties}
 # Solr-specific configuration property
 atlas.graph.index.search.max-result-set-size=150
 
+#########  Import Configs  #########
+#atlas.import.temp.directory=/temp/import
+
 #########  Notification Configs  #########
 atlas.notification.embedded=true
 atlas.kafka.data=${sys:atlas.home}/data/kafka
diff --git a/docs/src/site/twiki/Import-API-Options.twiki b/docs/src/site/twiki/Import-API-Options.twiki
index 4004e70..7f90475 100644
--- a/docs/src/site/twiki/Import-API-Options.twiki
+++ b/docs/src/site/twiki/Import-API-Options.twiki
@@ -26,6 +26,7 @@ Following options are supported for Import process:
    * Specify transforms during import operation.
    * Resume import by specifying starting entity guid.
    * Optionally import type definition.
+   * Handling large imports.
 
 ---++++ Transforms
 
@@ -133,3 +134,13 @@ curl -g -X POST -u adminuser:password -H "Content-Type: application/json"
             -d r@importOptions.json
             "http://localhost:21000/api/atlas/admin/importfile"
 </verbatim>
+
+---++++ Handling Large Imports
+
+By default, the Import Service stores all of the data in memory. This may be limiting for ZIPs containing large amount of data.
+
+To configure temporary directory use the application property _atlas.import.temp.directory_. If this property is left blank, default in-memory implementation is used.
+
+Please ensure that there is sufficient disk space available for the operation.
+
+The contents of the directory created as backing store for the import operation will be erased after the operation is over.
diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index 7b71e51..345b105 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -55,7 +55,9 @@ public enum AtlasConfiguration {
 
     //search configuration
     SEARCH_MAX_LIMIT("atlas.search.maxlimit", 10000),
-    SEARCH_DEFAULT_LIMIT("atlas.search.defaultlimit", 100);
+    SEARCH_DEFAULT_LIMIT("atlas.search.defaultlimit", 100),
+
+    IMPORT_TEMP_DIRECTORY("atlas.import.temp.directory", "");
 
     private static final Configuration APPLICATION_PROPERTIES;
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index 3ded798..df49ae1 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -18,6 +18,7 @@
 package org.apache.atlas.repository.impexp;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.RequestContext;
 import org.apache.atlas.entitytransform.BaseEntityHandler;
@@ -27,22 +28,23 @@ import org.apache.atlas.model.impexp.AtlasImportRequest;
 import org.apache.atlas.model.impexp.AtlasImportResult;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
 import org.apache.atlas.repository.store.graph.BulkImporter;
+import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
 import org.apache.atlas.store.AtlasTypeDefStore;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
 import javax.inject.Inject;
-import java.io.ByteArrayInputStream;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.List;
 
 import static org.apache.atlas.model.impexp.AtlasImportRequest.TRANSFORMERS_KEY;
@@ -72,18 +74,25 @@ public class ImportService {
         this.importTransformsShaper = importTransformsShaper;
     }
 
-    public AtlasImportResult run(ZipSource source, String userName,
+    public AtlasImportResult run(InputStream inputStream, String userName,
                                  String hostName, String requestingIP) throws AtlasBaseException {
-        return run(source, null, userName, hostName, requestingIP);
+        return run(inputStream, null, userName, hostName, requestingIP);
     }
 
 
-    public AtlasImportResult run(ZipSource source, AtlasImportRequest request, String userName,
+    public AtlasImportResult run(InputStream inputStream, AtlasImportRequest request, String userName,
                                  String hostName, String requestingIP) throws AtlasBaseException {
         if (request == null) {
             request = new AtlasImportRequest();
         }
 
+        EntityImportStream source = createZipSource(inputStream, AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
+        return run(source, request, userName, hostName, requestingIP);
+    }
+
+    @VisibleForTesting
+    AtlasImportResult run(EntityImportStream source, AtlasImportRequest request, String userName,
+                                 String hostName, String requestingIP) throws AtlasBaseException {
         AtlasImportResult result = new AtlasImportResult(request, userName, requestingIP, hostName, System.currentTimeMillis());
 
         try {
@@ -112,7 +121,10 @@ public class ImportService {
         } finally {
             RequestContext.get().setImportInProgress(false);
 
-            source.close();
+            if (source != null) {
+                source.close();
+            }
+
             LOG.info("<== import(user={}, from={}): status={}", userName, requestingIP, result.getOperationStatus());
         }
 
@@ -120,7 +132,7 @@ public class ImportService {
     }
 
     @VisibleForTesting
-    void setImportTransform(ZipSource source, String transforms) throws AtlasBaseException {
+    void setImportTransform(EntityImportStream source, String transforms) throws AtlasBaseException {
         ImportTransforms importTransform = ImportTransforms.fromJson(transforms);
         if (importTransform == null) {
             return;
@@ -132,11 +144,10 @@ public class ImportService {
         if(LOG.isDebugEnabled()) {
             debugLog("   => transforms: {}", AtlasType.toJson(importTransform));
         }
-
     }
 
     @VisibleForTesting
-    void setEntityTransformerHandlers(ZipSource source, String transformersJson) throws AtlasBaseException {
+    void setEntityTransformerHandlers(EntityImportStream source, String transformersJson) throws AtlasBaseException {
         if (StringUtils.isEmpty(transformersJson)) {
             return;
         }
@@ -156,7 +167,7 @@ public class ImportService {
         LOG.debug(s, params);
     }
 
-    private void setStartPosition(AtlasImportRequest request, ZipSource source) throws AtlasBaseException {
+    private void setStartPosition(AtlasImportRequest request, EntityImportStream source) throws AtlasBaseException {
         if (request.getStartGuid() != null) {
             source.setPositionUsingEntityGuid(request.getStartGuid());
         } else if (request.getStartPosition() != null) {
@@ -164,8 +175,7 @@ public class ImportService {
         }
     }
 
-    public AtlasImportResult run(AtlasImportRequest request, String userName, String hostName, String requestingIP)
-            throws AtlasBaseException {
+    public AtlasImportResult run(AtlasImportRequest request, String userName, String hostName, String requestingIP) throws AtlasBaseException {
         String fileName = request.getFileName();
 
         if (StringUtils.isBlank(fileName)) {
@@ -173,14 +183,11 @@ public class ImportService {
         }
 
         AtlasImportResult result = null;
-
         try {
             LOG.info("==> import(user={}, from={}, fileName={})", userName, requestingIP, fileName);
 
-            String transforms = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(TRANSFORMS_KEY) : null;
             File file = new File(fileName);
-            ZipSource source = new ZipSource(new ByteArrayInputStream(FileUtils.readFileToByteArray(file)), ImportTransforms.fromJson(transforms));
-            result = run(source, request, userName, hostName, requestingIP);
+            result = run(new FileInputStream(file), request, userName, hostName, requestingIP);
         } catch (AtlasBaseException excp) {
             LOG.error("import(user={}, from={}, fileName={}): failed", userName, requestingIP, excp);
 
@@ -189,10 +196,6 @@ public class ImportService {
             LOG.error("import(user={}, from={}, fileName={}): file not found", userName, requestingIP, excp);
 
             throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, fileName + ": file not found");
-        } catch (IOException excp) {
-            LOG.error("import(user={}, from={}, fileName={}): cannot read file", userName, requestingIP, excp);
-
-            throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, fileName + ": cannot read file");
         } catch (Exception excp) {
             LOG.error("import(user={}, from={}, fileName={}): failed", userName, requestingIP, excp);
 
@@ -214,7 +217,7 @@ public class ImportService {
         importTypeDefProcessor.processTypes(typeDefinitionMap, result);
     }
 
-    private void processEntities(String userName, ZipSource importSource, AtlasImportResult result) throws AtlasBaseException {
+    private void processEntities(String userName, EntityImportStream importSource, AtlasImportResult result) throws AtlasBaseException {
         result.setExportResult(importSource.getExportResult());
         this.bulkImporter.bulkImport(importSource, result);
 
@@ -228,4 +231,17 @@ public class ImportService {
     private int getDuration(long endTime, long startTime) {
         return (int) (endTime - startTime);
     }
+
+    private EntityImportStream createZipSource(InputStream inputStream, String configuredTemporaryDirectory) throws AtlasBaseException {
+        try {
+            if (StringUtils.isEmpty(configuredTemporaryDirectory)) {
+                return new ZipSource(inputStream);
+            }
+
+            return new ZipSourceWithBackingDirectory(inputStream, configuredTemporaryDirectory);
+        }
+        catch (IOException ex) {
+            throw new AtlasBaseException(ex);
+        }
+    }
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
index 1ce96a8..812add9 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
@@ -82,20 +82,25 @@ public class ZipSource implements EntityImportStream {
                                  guidEntityJsonMap.get(key).equals("[]"));
     }
 
+    @Override
     public ImportTransforms getImportTransform() { return this.importTransform; }
 
+    @Override
     public void setImportTransform(ImportTransforms importTransform) {
         this.importTransform = importTransform;
     }
 
+    @Override
     public List<BaseEntityHandler> getEntityHandlers() {
         return entityHandlers;
     }
 
+    @Override
     public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) {
         this.entityHandlers = entityHandlers;
     }
 
+    @Override
     public AtlasTypesDef getTypesDef() throws AtlasBaseException {
         final String fileName = ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString();
 
@@ -103,6 +108,7 @@ public class ZipSource implements EntityImportStream {
         return convertFromJson(AtlasTypesDef.class, s);
     }
 
+    @Override
     public AtlasExportResult getExportResult() throws AtlasBaseException {
         final String fileName = ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toString();
 
@@ -147,6 +153,7 @@ public class ZipSource implements EntityImportStream {
         zipInputStream.close();
     }
 
+    @Override
     public List<String> getCreationOrder() {
         return this.creationOrder;
     }
@@ -210,6 +217,7 @@ public class ZipSource implements EntityImportStream {
         return s;
     }
 
+    @Override
     public void close() {
         try {
             inputStream.close();
@@ -284,7 +292,7 @@ public class ZipSource implements EntityImportStream {
         currentPosition = index;
         reset();
         for (int i = 0; i < creationOrder.size() && i <= index; i++) {
-            iterator.next();
+            onImportComplete(iterator.next());
         }
     }
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceWithBackingDirectory.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceWithBackingDirectory.java
new file mode 100644
index 0000000..7963800
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceWithBackingDirectory.java
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.entitytransform.BaseEntityHandler;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportResult;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
+import org.apache.atlas.type.AtlasType;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+import static org.apache.atlas.AtlasErrorCode.IMPORT_ATTEMPTING_EMPTY_ZIP;
+
+public class ZipSourceWithBackingDirectory implements EntityImportStream {
+    private static final Logger LOG = LoggerFactory.getLogger(ZipSourceWithBackingDirectory.class);
+    private static final String TEMPORARY_DIRECTORY_PREFIX = "atlas-import-temp-";
+    private static final String EXT_JSON = ".json";
+
+    private Path tempDirectory;
+
+    private ImportTransforms importTransform;
+    private List<BaseEntityHandler> entityHandlers;
+
+    private ArrayList<String> creationOrder = new ArrayList<>();
+    private int currentPosition;
+    private int numberOfEntries;
+
+    public ZipSourceWithBackingDirectory(InputStream inputStream) throws IOException, AtlasBaseException {
+        this(inputStream, null);
+    }
+
+    public ZipSourceWithBackingDirectory(InputStream inputStream, String backingDirectory) throws IOException, AtlasBaseException {
+        setupBackingStore(inputStream, backingDirectory);
+        if (isZipFileEmpty()) {
+            throw new AtlasBaseException(IMPORT_ATTEMPTING_EMPTY_ZIP, "Attempting to import empty ZIP.");
+        }
+    }
+
+    @Override
+    public ImportTransforms getImportTransform() { return this.importTransform; }
+
+    @Override
+    public void setImportTransform(ImportTransforms importTransform) {
+        this.importTransform = importTransform;
+    }
+
+    @Override
+    public List<BaseEntityHandler> getEntityHandlers() {
+        return entityHandlers;
+    }
+
+    @Override
+    public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) {
+        this.entityHandlers = entityHandlers;
+    }
+
+    @Override
+    public AtlasTypesDef getTypesDef() throws AtlasBaseException {
+        return getJsonFromEntry(ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString(), AtlasTypesDef.class);
+    }
+
+    @Override
+    public AtlasExportResult getExportResult() throws AtlasBaseException {
+        return getJsonFromEntry(ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toString(), AtlasExportResult.class);
+    }
+
+    @Override
+    public List<String> getCreationOrder() {
+        return creationOrder;
+    }
+
+    @Override
+    public int getPosition() {
+        return currentPosition;
+    }
+
+    @Override
+    public AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException {
+        final File file = getFileFromTemporaryDirectory(guid + EXT_JSON);
+        if (!file.exists()) {
+            return null;
+        }
+
+        String json = getJsonStringForFile(file);
+        if (StringUtils.isEmpty(json)) {
+            return null;
+        }
+
+        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = convertFromJson(AtlasEntity.AtlasEntityWithExtInfo.class, json);
+
+        if (importTransform != null) {
+            entityWithExtInfo = importTransform.apply(entityWithExtInfo);
+        }
+
+        if (entityHandlers != null) {
+            applyTransformers(entityWithExtInfo);
+        }
+
+        return entityWithExtInfo;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return (currentPosition < numberOfEntries);
+    }
+
+    @Override
+    public AtlasEntity next() {
+        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = getNextEntityWithExtInfo();
+
+        return entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
+    }
+
+    @Override
+    public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
+        try {
+            return getEntityWithExtInfo(moveNext());
+        } catch (AtlasBaseException e) {
+            LOG.error("getNextEntityWithExtInfo", e);
+            return null;
+        }
+    }
+
+    @Override
+    public void reset() {
+        currentPosition = 0;
+    }
+
+    @Override
+    public AtlasEntity getByGuid(String guid) {
+        try {
+            return getEntity(guid);
+        } catch (AtlasBaseException e) {
+            LOG.error("getByGuid: {} failed!", guid, e);
+            return null;
+        }
+    }
+
+
+    @Override
+    public void onImportComplete(String guid) {
+        getFileFromTemporaryDirectory(guid + EXT_JSON).delete();
+    }
+
+    @Override
+    public void setPosition(int index) {
+        reset();
+        for (int i = 0; i < numberOfEntries && i <= index; i++) {
+            onImportComplete(moveNext());
+        }
+    }
+
+    @Override
+    public void setPositionUsingEntityGuid(String guid) {
+        if (StringUtils.isEmpty(guid)) {
+            return;
+        }
+
+        String current;
+        while (currentPosition < numberOfEntries) {
+            current = creationOrder.get(currentPosition);
+            if (current.equals(guid)) {
+                return;
+            }
+
+            moveNext();
+        }
+    }
+
+    @Override
+    public void close() {
+        creationOrder.clear();
+        try {
+            LOG.error("Import: Removing temporary directory: {}", tempDirectory.toString());
+            FileUtils.deleteDirectory(tempDirectory.toFile());
+        } catch (IOException e) {
+            LOG.error("Import: Error deleting: {}", tempDirectory.toString(), e);
+        }
+    }
+
+    private boolean isZipFileEmpty() {
+        return (numberOfEntries == 0);
+    }
+
+    private <T> T getJsonFromEntry(String entryName, Class<T> clazz) throws AtlasBaseException {
+        final File file = getFileFromTemporaryDirectory(entryName + EXT_JSON);
+        if (!file.exists()) {
+            throw new AtlasBaseException(entryName + " not found!");
+        }
+
+        return convertFromJson(clazz, getJsonStringForFile(file));
+    }
+
+    private void setupBackingStore(InputStream inputStream, String backingDirectory) throws AtlasBaseException, IOException {
+        initTempDirectory(backingDirectory);
+        unzipToTempDirectory(inputStream);
+        setupIterator();
+    }
+
+    private void initTempDirectory(String backingDirectory) throws AtlasBaseException {
+        try {
+            tempDirectory = Files.createDirectory(Paths.get(backingDirectory, getChildDirectoryForSession()));
+            if (!permissionChecks(tempDirectory.toFile())) {
+                throw new AtlasBaseException(
+                        String.format("Import: Temporary directory: %s does not have permissions for operation!", tempDirectory.toString()));
+            }
+        }
+        catch(Exception ex) {
+            throw new AtlasBaseException(String.format("Error fetching temporary directory: %s", tempDirectory.toString()), ex);
+        }
+    }
+
+    private String getChildDirectoryForSession() {
+        return String.format("%s%s", TEMPORARY_DIRECTORY_PREFIX, UUID.randomUUID());
+    }
+
+    private boolean permissionChecks(File f) {
+        return f.exists() && f.isDirectory() && f.canWrite();
+    }
+
+    private void unzipToTempDirectory(InputStream inputStream) throws IOException {
+        LOG.info("Import: Temporary directory: {}", tempDirectory.toString());
+
+        ZipInputStream zis = new ZipInputStream(inputStream);
+        try {
+            ZipEntry zipEntry = zis.getNextEntry();
+            while (zipEntry != null) {
+                String entryName = zipEntry.getName();
+
+                writeJsonToFile(entryName,  getJsonPayloadFromZipEntryStream(zis));
+                numberOfEntries++;
+
+                zipEntry = zis.getNextEntry();
+            }
+
+            numberOfEntries -= ZipExportFileNames.values().length;
+        }
+        finally {
+            zis.close();
+            inputStream.close();
+        }
+    }
+
+    private void writeJsonToFile(String entryName, byte[] jsonPayload) throws IOException {
+        File f = getFileFromTemporaryDirectory(entryName);
+        Files.write(f.toPath(), jsonPayload);
+    }
+
+    private File getFileFromTemporaryDirectory(String entryName) {
+        return new File(tempDirectory.toFile(), entryName);
+    }
+
+    private void setupIterator() {
+        try {
+            creationOrder = getJsonFromEntry(ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString(), ArrayList.class);
+        } catch (AtlasBaseException e) {
+            LOG.error("Error fetching: {}. Error generating order.", ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString(), e);
+        }
+
+        reset();
+    }
+
+    private byte[] getJsonPayloadFromZipEntryStream(ZipInputStream zipInputStream) {
+        try {
+            byte[] buf = new byte[1024];
+
+            int n = 0;
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            while ((n = zipInputStream.read(buf, 0, 1024)) > -1) {
+                bos.write(buf, 0, n);
+            }
+
+            return bos.toByteArray();
+        } catch (IOException ex) {
+            LOG.error("Error fetching string from entry.", ex);
+        }
+
+        return null;
+    }
+
+    private String getJsonStringForFile(File file) {
+        try {
+            byte[] bytes = Files.readAllBytes(file.toPath());
+            return new String(bytes);
+        } catch (IOException e) {
+            LOG.warn("Error fetching: {}", file.toString(), e);
+            return null;
+        }
+    }
+
+    private void applyTransformers(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
+        if (entityWithExtInfo == null) {
+            return;
+        }
+
+        transform(entityWithExtInfo.getEntity());
+
+        if (MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) {
+            for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
+                transform(e);
+            }
+        }
+    }
+
+    private void transform(AtlasEntity e) {
+        for (BaseEntityHandler handler : entityHandlers) {
+            handler.transform(e);
+        }
+    }
+
+    private <T> T convertFromJson(Class<T> clazz, String jsonData) throws AtlasBaseException {
+        try {
+            return AtlasType.fromJson(jsonData, clazz);
+
+        } catch (Exception e) {
+            throw new AtlasBaseException("Error converting file to JSON.", e);
+        }
+    }
+
+    private AtlasEntity getEntity(String guid) throws AtlasBaseException {
+        AtlasEntity.AtlasEntityWithExtInfo extInfo = getEntityWithExtInfo(guid);
+        return (extInfo != null) ? extInfo.getEntity() : null;
+    }
+
+    public int size() {
+        return numberOfEntries;
+    }
+
+    private String moveNext() {
+        if (currentPosition < numberOfEntries) {
+            return creationOrder.get(currentPosition++);
+        }
+
+        return null;
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStreamForImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStreamForImport.java
index 6bf962e..5ad9d60 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStreamForImport.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStreamForImport.java
@@ -17,8 +17,15 @@
  */
 package org.apache.atlas.repository.store.graph.v2;
 
+import org.apache.atlas.entitytransform.BaseEntityHandler;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportResult;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.repository.impexp.ImportTransforms;
+
+import java.util.List;
 
 public class AtlasEntityStreamForImport extends AtlasEntityStream implements EntityImportStream {
     private int currentPosition = 0;
@@ -36,6 +43,11 @@ public class AtlasEntityStreamForImport extends AtlasEntityStream implements Ent
     }
 
     @Override
+    public AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException {
+        return null;
+    }
+
+    @Override
     public AtlasEntity getByGuid(String guid) {
         AtlasEntity ent = super.entitiesWithExtInfo.getEntity(guid);
 
@@ -69,4 +81,44 @@ public class AtlasEntityStreamForImport extends AtlasEntityStream implements Ent
     public void onImportComplete(String guid) {
 
     }
+
+    @Override
+    public void setImportTransform(ImportTransforms importTransform) {
+
+    }
+
+    @Override
+    public ImportTransforms getImportTransform() {
+        return null;
+    }
+
+    @Override
+    public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) {
+
+    }
+
+    @Override
+    public List<BaseEntityHandler> getEntityHandlers() {
+        return null;
+    }
+
+    @Override
+    public AtlasTypesDef getTypesDef() throws AtlasBaseException {
+        return null;
+    }
+
+    @Override
+    public AtlasExportResult getExportResult() throws AtlasBaseException {
+        return null;
+    }
+
+    @Override
+    public List<String> getCreationOrder() {
+        return null;
+    }
+
+    @Override
+    public void close() {
+
+    }
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
index 2f330c0..54c32c5 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
@@ -18,6 +18,7 @@
 package org.apache.atlas.repository.store.graph.v2;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.RequestContext;
 import org.apache.atlas.annotation.GraphTransaction;
@@ -57,12 +58,14 @@ public class BulkImporterImpl implements BulkImporter {
     private final EntityGraphRetriever entityGraphRetriever;
     private AtlasTypeRegistry typeRegistry;
     private final int MAX_ATTEMPTS = 2;
+    private boolean directoryBasedImportConfigured;
 
     @Inject
     public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
         this.entityStore = entityStore;
         this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
         this.typeRegistry = typeRegistry;
+        this.directoryBasedImportConfigured = StringUtils.isNotEmpty(AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
     }
 
     @Override
@@ -205,9 +208,11 @@ public class BulkImporterImpl implements BulkImporter {
                                       AtlasImportResult                  importResult,
                                       Set<String>                        processedGuids,
                                       int currentIndex, int streamSize, float currentPercent) {
-        updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
-        updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
-        updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
+        if (!directoryBasedImportConfigured) {
+            updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
+            updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
+            updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
+        }
 
         String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid());
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityImportStream.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityImportStream.java
index cf7ac28..c43a04e 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityImportStream.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityImportStream.java
@@ -18,17 +18,45 @@
 package org.apache.atlas.repository.store.graph.v2;
 
 
+import org.apache.atlas.entitytransform.BaseEntityHandler;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportResult;
+import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.repository.impexp.ImportTransforms;
+
+import java.util.List;
 
 public interface EntityImportStream extends EntityStream {
 
     int size();
+
     void setPosition(int position);
+
     int getPosition();
 
     void setPositionUsingEntityGuid(String guid);
 
     AtlasEntityWithExtInfo getNextEntityWithExtInfo();
 
+    AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException;
+
     void onImportComplete(String guid);
+
+    void setImportTransform(ImportTransforms importTransform);
+
+    public ImportTransforms getImportTransform();
+
+    void setEntityHandlers(List<BaseEntityHandler> entityHandlers);
+
+    List<BaseEntityHandler> getEntityHandlers();
+
+    AtlasTypesDef getTypesDef() throws AtlasBaseException;
+
+    AtlasExportResult getExportResult() throws AtlasBaseException;
+
+    List<String> getCreationOrder();
+
+    void close();
 }
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
index 7aeb6a7..4d43852 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
@@ -33,16 +33,20 @@ import org.apache.atlas.store.AtlasTypeDefStore;
 import org.apache.atlas.type.AtlasClassificationType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.utils.TestResourceFileUtils;
+import org.apache.commons.io.IOUtils;
 import org.testng.ITestContext;
 import org.testng.SkipException;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
-import org.testng.annotations.DataProvider;
 
 import javax.inject.Inject;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -52,8 +56,8 @@ import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREM
 import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createTypes;
 import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getEntities;
 import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource;
-import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
 import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runExportWithParameters;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
@@ -107,11 +111,13 @@ public class ExportIncrementalTest extends ExportImportTestBase {
     }
 
     @Test
-    public void atT0_ReturnsAllEntities() throws AtlasBaseException {
+    public void atT0_ReturnsAllEntities() throws AtlasBaseException, IOException {
         final int expectedEntityCount = 2;
 
         AtlasExportRequest request = getIncrementalRequest(0);
-        ZipSource source = runExportWithParameters(exportService, request);
+        InputStream inputStream = runExportWithParameters(exportService, request);
+
+        ZipSource source = getZipSourceFromInputStream(inputStream);
         AtlasEntity.AtlasEntityWithExtInfo entities = getEntities(source, expectedEntityCount);
 
         int count = 0;
@@ -129,13 +135,15 @@ public class ExportIncrementalTest extends ExportImportTestBase {
     }
 
     @Test(dependsOnMethods = "atT0_ReturnsAllEntities")
-    public void atT1_NewClassificationAttachedToTable_ReturnsChangedTable() throws AtlasBaseException {
+    public void atT1_NewClassificationAttachedToTable_ReturnsChangedTable() throws AtlasBaseException, IOException {
         final int expectedEntityCount = 1;
 
         entityStore.addClassifications(TABLE_GUID, ImmutableList.of(classificationTypeT1.createDefaultValue()));
 
         AtlasExportRequest request = getIncrementalRequest(nextTimestamp);
-        ZipSource source = runExportWithParameters(exportService, request);
+        InputStream inputStream = runExportWithParameters(exportService, request);
+
+        ZipSource source = getZipSourceFromInputStream(inputStream);
         AtlasEntity.AtlasEntityWithExtInfo entities = getEntities(source, expectedEntityCount);
 
         AtlasEntity entity = null;
@@ -155,7 +163,7 @@ public class ExportIncrementalTest extends ExportImportTestBase {
     }
 
     @Test(dependsOnMethods = "atT1_NewClassificationAttachedToTable_ReturnsChangedTable")
-    public void atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn() throws AtlasBaseException {
+    public void atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn() throws AtlasBaseException, IOException {
         final int expectedEntityCount = 1;
 
         AtlasEntity.AtlasEntityWithExtInfo tableEntity = entityStore.getById(TABLE_GUID);
@@ -163,7 +171,9 @@ public class ExportIncrementalTest extends ExportImportTestBase {
 
         entityStore.addClassifications(COLUMN_GUID_HIGH, ImmutableList.of(typeRegistry.getClassificationTypeByName("T1").createDefaultValue()));
 
-        ZipSource source = runExportWithParameters(exportService, getIncrementalRequest(nextTimestamp));
+        InputStream inputStream = runExportWithParameters(exportService, getIncrementalRequest(nextTimestamp));
+
+        ZipSource source = getZipSourceFromInputStream(inputStream);
         AtlasEntity.AtlasEntityWithExtInfo entities = getEntities(source, expectedEntityCount);
 
         for (Map.Entry<String, AtlasEntity> entry : entities.getReferredEntities().entrySet()) {
@@ -176,17 +186,26 @@ public class ExportIncrementalTest extends ExportImportTestBase {
         assertEquals(preExportTableEntityTimestamp, postUpdateTableEntityTimestamp);
     }
 
+    private ZipSource getZipSourceFromInputStream(InputStream inputStream) {
+        try {
+            return new ZipSource(inputStream);
+        } catch (IOException | AtlasBaseException e) {
+            return null;
+        }
+    }
+
     @Test(dependsOnMethods = "atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn")
     public void exportingWithSameParameters_Succeeds() {
-        ZipSource source = runExportWithParameters(exportService, getIncrementalRequest(nextTimestamp));
+        InputStream inputStream = runExportWithParameters(exportService, getIncrementalRequest(nextTimestamp));
 
-        assertNotNull(source);
+        assertNotNull(getZipSourceFromInputStream(inputStream));
     }
 
     @Test
     public void connectedExport() {
-        ZipSource source = runExportWithParameters(exportService, getConnected());
+        InputStream inputStream = runExportWithParameters(exportService, getConnected());
 
+        ZipSource source = getZipSourceFromInputStream(inputStream);
         UniqueList<String> creationOrder = new UniqueList<>();
         List<String> zipCreationOrder = source.getCreationOrder();
         creationOrder.addAll(zipCreationOrder);
@@ -200,27 +219,29 @@ public class ExportIncrementalTest extends ExportImportTestBase {
     }
 
     @Test(dataProvider = "hiveDb")
-    public void importHiveDb(ZipSource zipSource) throws AtlasBaseException, IOException {
-        runImportWithNoParameters(importService, zipSource);
+    public void importHiveDb(InputStream stream) throws AtlasBaseException, IOException {
+        runImportWithNoParameters(importService, stream);
     }
 
     @Test(dependsOnMethods = "importHiveDb")
-    public void exportTableInrementalConnected() throws AtlasBaseException {
-        ZipSource source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, 0, true));
-        verifyExpectedEntities(getFileNames(source), GUID_DB, GUID_TABLE_CTAS_2);
+    public void exportTableInrementalConnected() throws AtlasBaseException, IOException {
+        InputStream source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, 0, true));
 
-        nextTimestamp = updateTimesampForNextIncrementalExport(source);
+        ZipSource sourceCopy = getZipSourceCopy(source);
+        verifyExpectedEntities(getFileNames(sourceCopy), GUID_DB, GUID_TABLE_CTAS_2);
+
+        nextTimestamp = updateTimesampForNextIncrementalExport(sourceCopy);
 
         try {
             source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, nextTimestamp, true));
-        }catch (SkipException e){
-
+        } catch (SkipException e) {
+            throw e;
         }
 
         entityStore.addClassifications(GUID_TABLE_CTAS_2, ImmutableList.of(classificationTypeT1.createDefaultValue()));
 
         source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, nextTimestamp, true));
-        verifyExpectedEntities(getFileNames(source), GUID_TABLE_CTAS_2);
+        verifyExpectedEntities(getFileNames(getZipSourceCopy(source)), GUID_TABLE_CTAS_2);
     }
 
 
@@ -281,4 +302,11 @@ public class ExportIncrementalTest extends ExportImportTestBase {
         }
         return ret;
     }
+
+    private ZipSource getZipSourceCopy(InputStream is) throws IOException, AtlasBaseException {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        IOUtils.copy(is, baos);
+
+        return new ZipSource(new ByteArrayInputStream(baos.toByteArray()));
+    }
 }
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java
index 18b4a30..25e0a53 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java
@@ -40,6 +40,7 @@ import org.testng.annotations.Test;
 
 import javax.inject.Inject;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.Map;
 
 import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel;
@@ -87,11 +88,13 @@ public class ExportSkipLineageTest extends ExportImportTestBase {
     }
 
     @Test
-    public void exportWithoutLineage() {
+    public void exportWithoutLineage() throws IOException, AtlasBaseException {
         final int expectedEntityCount = 3;
 
         AtlasExportRequest request = getRequest();
-        ZipSource source = runExportWithParameters(exportService, request);
+        InputStream inputStream = runExportWithParameters(exportService, request);
+
+        ZipSource source = new ZipSource(inputStream);
         AtlasEntity.AtlasEntityWithExtInfo entities = ZipFileResourceTestUtils.getEntities(source, expectedEntityCount);
 
         int count = 0;
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
index 7044243..33fe0ad 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
@@ -52,6 +52,7 @@ import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -74,7 +75,6 @@ import static org.testng.Assert.assertTrue;
 
 @Guice(modules = TestModules.TestOnlyModule.class)
 public class ImportServiceTest extends ExportImportTestBase {
-    private static final Logger LOG = LoggerFactory.getLogger(ImportServiceTest.class);
     private static final int DEFAULT_LIMIT = 25;
     private final ImportService importService;
 
@@ -124,9 +124,9 @@ public class ImportServiceTest extends ExportImportTestBase {
     }
 
     @Test(dataProvider = "sales")
-    public void importDB1(ZipSource zipSource) throws AtlasBaseException, IOException {
+    public void importDB1(InputStream inputStream) throws AtlasBaseException, IOException {
         loadBaseModel();
-        runAndVerifyQuickStart_v1_Import(importService, zipSource);
+        runAndVerifyQuickStart_v1_Import(importService, inputStream);
 
         assertEntityCount("DB_v1", "bfe88eb8-7556-403c-8210-647013f44a44", 1);
 
@@ -141,9 +141,9 @@ public class ImportServiceTest extends ExportImportTestBase {
     }
 
     @Test(dataProvider = "reporting")
-    public void importDB2(ZipSource zipSource) throws AtlasBaseException, IOException {
+    public void importDB2(InputStream inputStream) throws AtlasBaseException, IOException {
         loadBaseModel();
-        runAndVerifyQuickStart_v1_Import(importService, zipSource);
+        runAndVerifyQuickStart_v1_Import(importService, inputStream);
     }
 
     @DataProvider(name = "logging")
@@ -152,9 +152,9 @@ public class ImportServiceTest extends ExportImportTestBase {
     }
 
     @Test(dataProvider = "logging")
-    public void importDB3(ZipSource zipSource) throws AtlasBaseException, IOException {
+    public void importDB3(InputStream inputStream) throws AtlasBaseException, IOException {
         loadBaseModel();
-        runAndVerifyQuickStart_v1_Import(importService, zipSource);
+        runAndVerifyQuickStart_v1_Import(importService, inputStream);
     }
 
     @DataProvider(name = "salesNewTypeAttrs")
@@ -163,9 +163,9 @@ public class ImportServiceTest extends ExportImportTestBase {
     }
 
     @Test(dataProvider = "salesNewTypeAttrs", dependsOnMethods = "importDB1")
-    public void importDB4(ZipSource zipSource) throws AtlasBaseException, IOException {
+    public void importDB4(InputStream inputStream) throws AtlasBaseException, IOException {
         loadBaseModel();
-        runImportWithParameters(importService, getDefaultImportRequest(), zipSource);
+        runImportWithParameters(importService, getDefaultImportRequest(), inputStream);
     }
 
     @DataProvider(name = "salesNewTypeAttrs-next")
@@ -174,7 +174,7 @@ public class ImportServiceTest extends ExportImportTestBase {
     }
 
     @Test(dataProvider = "salesNewTypeAttrs-next", dependsOnMethods = "importDB4")
-    public void importDB5(ZipSource zipSource) throws AtlasBaseException, IOException {
+    public void importDB5(InputStream inputStream) throws AtlasBaseException, IOException {
         final String newEnumDefName = "database_action";
 
         assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName));
@@ -184,13 +184,13 @@ public class ImportServiceTest extends ExportImportTestBase {
         options.put("updateTypeDefinition", "false");
         request.setOptions(options);
 
-        runImportWithParameters(importService, request, zipSource);
+        runImportWithParameters(importService, request, inputStream);
         assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName));
         assertEquals(typeDefStore.getEnumDefByName(newEnumDefName).getElementDefs().size(), 4);
     }
 
     @Test(dataProvider = "salesNewTypeAttrs-next", dependsOnMethods = "importDB4")
-    public void importDB6(ZipSource zipSource) throws AtlasBaseException, IOException {
+    public void importDB6(InputStream inputStream) throws AtlasBaseException, IOException {
         final String newEnumDefName = "database_action";
 
         assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName));
@@ -200,7 +200,7 @@ public class ImportServiceTest extends ExportImportTestBase {
         options.put("updateTypeDefinition", "true");
         request.setOptions(options);
 
-        runImportWithParameters(importService, request, zipSource);
+        runImportWithParameters(importService, request, inputStream);
         assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName));
         assertEquals(typeDefStore.getEnumDefByName(newEnumDefName).getElementDefs().size(), 8);
     }
@@ -211,11 +211,11 @@ public class ImportServiceTest extends ExportImportTestBase {
     }
 
     @Test(dataProvider = "ctas")
-    public void importCTAS(ZipSource zipSource) throws IOException, AtlasBaseException {
+    public void importCTAS(InputStream inputStream) throws IOException, AtlasBaseException {
         loadBaseModel();
         loadHiveModel();
 
-        runImportWithNoParameters(importService, zipSource);
+        runImportWithNoParameters(importService, inputStream);
     }
 
     @DataProvider(name = "stocks-legacy")
@@ -224,12 +224,12 @@ public class ImportServiceTest extends ExportImportTestBase {
     }
 
     @Test(dataProvider = "stocks-legacy")
-    public void importLegacy(ZipSource zipSource) throws IOException, AtlasBaseException {
+    public void importLegacy(InputStream inputStream) throws IOException, AtlasBaseException {
         loadBaseModel();
         loadFsModel();
         loadHiveModel();
 
-        runImportWithNoParameters(importService, zipSource);
+        runImportWithNoParameters(importService, inputStream);
         List<AtlasEntityHeader> result = getImportedEntities("hive_db", "886c5e9c-3ac6-40be-8201-fb0cebb64783");
         assertEquals(result.size(), 1);
 
@@ -244,12 +244,12 @@ public class ImportServiceTest extends ExportImportTestBase {
     }
 
     @Test(dataProvider = "tag-prop-2")
-    public void importTagProp2(ZipSource zipSource) throws IOException, AtlasBaseException {
+    public void importTagProp2(InputStream inputStream) throws IOException, AtlasBaseException {
         loadBaseModel();
         loadFsModel();
         loadHiveModel();
 
-        runImportWithNoParameters(importService, zipSource);
+        runImportWithNoParameters(importService, inputStream);
         assertEntityCount("hive_db", "7d7d5a18-d992-457e-83c0-e36f5b95ebdb", 1);
         assertEntityCount("hive_table", "dbe729bb-c614-4e23-b845-3258efdf7a58", 1);
         AtlasEntity entity = assertEntity("hive_table", "092e9888-de96-4908-8be3-925ee72e3395");
@@ -260,7 +260,7 @@ public class ImportServiceTest extends ExportImportTestBase {
     }
 
     @Test(dataProvider = "stocks-legacy")
-    public void importExistingTopLevelEntity(ZipSource zipSource) throws IOException, AtlasBaseException{
+    public void importExistingTopLevelEntity(InputStream inputStream) throws IOException, AtlasBaseException{
         loadBaseModel();
         loadFsModel();
         loadHiveModel();
@@ -277,7 +277,7 @@ public class ImportServiceTest extends ExportImportTestBase {
         assertNotNull(createResponse);
 
         String preImportGuid = createResponse.getCreatedEntities().get(0).getGuid();
-        runImportWithNoParameters(importService, zipSource);
+        runImportWithNoParameters(importService, inputStream);
 
         AtlasVertex v = AtlasGraphUtilsV2.findByGuid("886c5e9c-3ac6-40be-8201-fb0cebb64783");
         assertNotNull(v);
@@ -295,10 +295,10 @@ public class ImportServiceTest extends ExportImportTestBase {
     }
 
     @Test(dataProvider = "stocks-glossary")
-    public void importGlossary(ZipSource zipSource) throws IOException, AtlasBaseException {
+    public void importGlossary(InputStream inputStream) throws IOException, AtlasBaseException {
         loadBaseModel();
         loadGlossary();
-        runImportWithNoParameters(importService, zipSource);
+        runImportWithNoParameters(importService, inputStream);
 
         assertEntityCount("AtlasGlossary", "40c80052-3129-4f7c-8f2f-391677935416", 1);
         assertEntityCount("AtlasGlossaryTerm", "e93ac426-de04-4d54-a7c9-d76c1e96369b", 1);
@@ -317,13 +317,13 @@ public class ImportServiceTest extends ExportImportTestBase {
     }
 
     @Test(dataProvider = "hdfs_path1", expectedExceptions = AtlasBaseException.class)
-    public void importHdfs_path1(ZipSource zipSource) throws IOException, AtlasBaseException {
+    public void importHdfs_path1(InputStream inputStream) throws IOException, AtlasBaseException {
         loadBaseModel();
         loadFsModel();
         loadModelFromResourcesJson("tag1.json", typeDefStore, typeRegistry);
 
         try {
-            runImportWithNoParameters(importService, zipSource);
+            runImportWithNoParameters(importService, inputStream);
         } catch (AtlasBaseException e) {
             assertEquals(e.getAtlasErrorCode(), AtlasErrorCode.INVALID_IMPORT_ATTRIBUTE_TYPE_CHANGED);
             AtlasClassificationType tag1 = typeRegistry.getClassificationTypeByName("tag1");
@@ -344,11 +344,11 @@ public class ImportServiceTest extends ExportImportTestBase {
     }
 
     @Test(dataProvider = "relationshipLineage")
-    public void importDB8(ZipSource zipSource) throws AtlasBaseException, IOException {
+    public void importDB8(InputStream inputStream) throws AtlasBaseException, IOException {
         loadBaseModel();
         loadHiveModel();
         AtlasImportRequest request = getDefaultImportRequest();
-        runImportWithParameters(importService, request, zipSource);
+        runImportWithParameters(importService, request, inputStream);
     }
 
     @DataProvider(name = "relationship")
@@ -357,11 +357,11 @@ public class ImportServiceTest extends ExportImportTestBase {
     }
 
     @Test(dataProvider = "relationship")
-    public void importDB7(ZipSource zipSource) throws AtlasBaseException, IOException {
+    public void importDB7(InputStream inputStream) throws AtlasBaseException, IOException {
         loadBaseModel();
         loadHiveModel();
         AtlasImportRequest request = getDefaultImportRequest();
-        runImportWithParameters(importService, request, zipSource);
+        runImportWithParameters(importService, request, inputStream);
 
         assertEntityCount("hive_db", "d7dc0848-fbba-4d63-9264-a460798361f5", 1);
         assertEntityCount("hive_table", "2fb31eaa-4bb2-4eb8-b333-a888ba7c84fe", 1);
@@ -430,11 +430,12 @@ public class ImportServiceTest extends ExportImportTestBase {
     }
 
     @Test(dataProvider = "salesNewTypeAttrs-next")
-    public void transformUpdatesForSubTypes(ZipSource zipSource) throws IOException, AtlasBaseException {
+    public void transformUpdatesForSubTypes(InputStream inputStream) throws IOException, AtlasBaseException {
         loadBaseModel();
         loadHiveModel();
 
         String transformJSON = "{ \"Asset\": { \"qualifiedName\":[ \"lowercase\", \"replace:@cl1:@cl2\" ] } }";
+        ZipSource zipSource = new ZipSource(inputStream);
         importService.setImportTransform(zipSource, transformJSON);
         ImportTransforms importTransforms = zipSource.getImportTransform();
 
@@ -444,11 +445,12 @@ public class ImportServiceTest extends ExportImportTestBase {
     }
 
     @Test(dataProvider = "salesNewTypeAttrs-next")
-    public void transformUpdatesForSubTypesAddsToExistingTransforms(ZipSource zipSource) throws IOException, AtlasBaseException {
-        loadBaseModel();
-        loadHiveModel();
+    public void transformUpdatesForSubTypesAddsToExistingTransforms(InputStream inputStream) throws IOException, AtlasBaseException {
+            loadBaseModel();
+            loadHiveModel();
 
         String transformJSON = "{ \"Asset\": { \"qualifiedName\":[ \"replace:@cl1:@cl2\" ] }, \"hive_table\": { \"qualifiedName\":[ \"lowercase\" ] } }";
+        ZipSource zipSource = new ZipSource(inputStream);
         importService.setImportTransform(zipSource, transformJSON);
         ImportTransforms importTransforms = zipSource.getImportTransform();
 
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java
index 06bdaa6..78fdaca 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java
@@ -66,7 +66,7 @@ public class ImportTransformsShaperTest extends ExportImportTestBase {
     public void newTagIsCreatedAndEntitiesAreTagged() throws AtlasBaseException, IOException {
         AtlasImportResult result = ZipFileResourceTestUtils.runImportWithParameters(importService,
                 getImporRequest(),
-                ZipFileResourceTestUtils.getZipSourceFrom("stocks.zip"));
+                ZipFileResourceTestUtils.getInputStreamFrom("stocks.zip"));
 
         AtlasClassificationType classification = typeRegistry.getClassificationTypeByName(TAG_NAME);
         assertNotNull(classification);
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractorTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractorTest.java
index 03d50f1..920fc28 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractorTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractorTest.java
@@ -41,6 +41,7 @@ import javax.inject.Inject;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.List;
 import java.util.Map;
 import java.util.ArrayList;
@@ -106,8 +107,8 @@ public class RelationshipAttributesExtractorTest {
     }
 
     @Test(dataProvider = "hiveDb")
-    public void importHiveDb(ZipSource zipSource) throws AtlasBaseException, IOException {
-        runImportWithNoParameters(importService, zipSource);
+    public void importHiveDb(InputStream inputStream) throws AtlasBaseException, IOException {
+        runImportWithNoParameters(importService, inputStream);
     }
 
     @Test(dependsOnMethods = "importHiveDb")
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
index 92d4fb0..7a1ed18 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
@@ -24,10 +24,10 @@ import org.apache.atlas.RequestContext;
 import org.apache.atlas.TestModules;
 import org.apache.atlas.TestUtilsV2;
 import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.impexp.AtlasServer;
 import org.apache.atlas.model.impexp.AtlasExportRequest;
 import org.apache.atlas.model.impexp.AtlasImportRequest;
 import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AtlasServer;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.typedef.AtlasEntityDef;
 import org.apache.atlas.repository.Constants;
@@ -40,6 +40,7 @@ import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.utils.TestResourceFileUtils;
+import org.apache.commons.io.IOUtils;
 import org.testng.SkipException;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
@@ -47,7 +48,10 @@ import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
 import javax.inject.Inject;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.List;
 
 import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_KEY_REPLICATED_TO;
@@ -88,7 +92,7 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
     @Inject
     private AtlasEntityStoreV2 entityStore;
 
-    private ZipSource zipSource;
+    private InputStream inputStream;
 
     @BeforeClass
     public void setup() throws IOException, AtlasBaseException {
@@ -107,13 +111,19 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
     }
 
     @Test
-    public void exportWithReplicationToOption_AddsClusterObjectIdToReplicatedFromAttribute() throws AtlasBaseException {
+    public void exportWithReplicationToOption_AddsClusterObjectIdToReplicatedFromAttribute() throws AtlasBaseException, IOException {
         final int expectedEntityCount = 2;
 
         AtlasExportRequest request = getUpdateMetaInfoUpdateRequest();
-        zipSource = runExportWithParameters(exportService, request);
+        InputStream inputStream = runExportWithParameters(exportService, request);
+
+        assertNotNull(inputStream);
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        IOUtils.copy(inputStream, baos);
+        this.inputStream = new ByteArrayInputStream(baos.toByteArray());
 
-        assertNotNull(zipSource);
+        ZipSource zipSource = new ZipSource(new ByteArrayInputStream(baos.toByteArray()));
         assertNotNull(zipSource.getCreationOrder());
         assertEquals(zipSource.getCreationOrder().size(), expectedEntityCount);
 
@@ -139,7 +149,7 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
     @Test(dependsOnMethods = "exportWithReplicationToOption_AddsClusterObjectIdToReplicatedFromAttribute")
     public void importWithReplicationFromOption_AddsClusterObjectIdToReplicatedFromAttribute() throws AtlasBaseException, IOException {
         AtlasImportRequest request = getImportRequestWithReplicationOption();
-        AtlasImportResult importResult = runImportWithParameters(importService, request, zipSource);
+        AtlasImportResult importResult = runImportWithParameters(importService, request, inputStream);
 
         assertCluster(
                 AuditsWriter.getServerNameFromFullName(REPLICATED_FROM_CLUSTER_NAME),
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
index 76b423e..f4e84b2 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
@@ -33,12 +33,14 @@ import org.apache.atlas.model.typedef.AtlasTypesDef;
 import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer;
 import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
 import org.apache.atlas.repository.store.graph.v2.AtlasEntityStreamForImport;
+import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
 import org.apache.atlas.store.AtlasTypeDefStore;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.utils.AtlasJson;
 import org.apache.atlas.utils.TestResourceFileUtils;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,6 +51,8 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -151,19 +155,11 @@ public class ZipFileResourceTestUtils {
     }
 
     public static Object[][] getZipSource(String fileName) throws IOException, AtlasBaseException {
-        return new Object[][]{{getZipSourceFrom(fileName)}};
+        return new Object[][]{{getInputStreamFrom(fileName)}};
     }
 
-    public static ZipSource getZipSourceFrom(String fileName) throws IOException, AtlasBaseException {
-        FileInputStream fs = ZipFileResourceTestUtils.getFileInputStream(fileName);
-
-        return new ZipSource(fs);
-    }
-
-    private static ZipSource getZipSourceFrom(ByteArrayOutputStream baos) throws IOException, AtlasBaseException {
-        ByteArrayInputStream bis = new ByteArrayInputStream(baos.toByteArray());
-        ZipSource zipSource = new ZipSource(bis);
-        return zipSource;
+    public static InputStream getInputStreamFrom(String fileName) {
+        return ZipFileResourceTestUtils.getFileInputStream(fileName);
     }
 
     public static void verifyImportedEntities(List<String> creationOrder, List<String> processedEntities) {
@@ -224,7 +220,7 @@ public class ZipFileResourceTestUtils {
         }
     }
 
-    public static ZipSource runExportWithParameters(ExportService exportService, AtlasExportRequest request) {
+    public static InputStream runExportWithParameters(ExportService exportService, AtlasExportRequest request) {
         final String requestingIP = "1.0.0.0";
         final String hostName = "localhost";
         final String userName = "admin";
@@ -237,7 +233,7 @@ public class ZipFileResourceTestUtils {
             assertEquals(result.getOperationStatus(), AtlasExportResult.OperationStatus.SUCCESS);
 
             zipSink.close();
-            return getZipSourceFrom(baos);
+            return new ByteArrayInputStream(baos.toByteArray());
         }
         catch(Exception ex) {
             throw new SkipException(String.format("runExportWithParameters: %s: failed!", request.toString()));
@@ -325,27 +321,42 @@ public class ZipFileResourceTestUtils {
     }
 
 
-    public static AtlasImportResult runImportWithParameters(ImportService importService, AtlasImportRequest request, ZipSource source) throws AtlasBaseException, IOException {
+    public static AtlasImportResult runImportWithParameters(ImportService importService, AtlasImportRequest request, InputStream inputStream) throws AtlasBaseException, IOException {
+        final String requestingIP = "1.0.0.0";
+        final String hostName = "localhost";
+        final String userName = "admin";
+
+        AtlasImportResult result = importService.run(inputStream, request, userName, hostName, requestingIP);
+        assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS);
+        return result;
+    }
+
+    public static AtlasImportResult runImportWithNoParameters(ImportService importService, InputStream inputStream) throws AtlasBaseException, IOException {
         final String requestingIP = "1.0.0.0";
         final String hostName = "localhost";
         final String userName = "admin";
 
-        AtlasImportResult result = importService.run(source, request, userName, hostName, requestingIP);
+        AtlasImportResult result = importService.run(inputStream, userName, hostName, requestingIP);
         assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS);
         return result;
     }
 
-    public static AtlasImportResult runImportWithNoParameters(ImportService importService, ZipSource source) throws AtlasBaseException, IOException {
+    public static AtlasImportResult runImportWithNoParametersUsingBackingDirectory(ImportService importService, InputStream inputStream) throws AtlasBaseException, IOException {
         final String requestingIP = "1.0.0.0";
         final String hostName = "localhost";
         final String userName = "admin";
 
-        AtlasImportResult result = importService.run(source, userName, hostName, requestingIP);
+        EntityImportStream sourceWithBackingDirectory = new ZipSourceWithBackingDirectory(inputStream, Files.createTempDirectory("temp").toString());
+        AtlasImportResult result = importService.run(sourceWithBackingDirectory,  new AtlasImportRequest(), userName, hostName, requestingIP);
         assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS);
         return result;
     }
 
-    public static void runAndVerifyQuickStart_v1_Import(ImportService importService, ZipSource zipSource) throws AtlasBaseException, IOException {
+    public static void runAndVerifyQuickStart_v1_Import(ImportService importService, InputStream is) throws AtlasBaseException, IOException {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        IOUtils.copy(is, baos);
+
+        ZipSource zipSource = new ZipSource(new ByteArrayInputStream(baos.toByteArray()));
         AtlasExportResult exportResult = zipSource.getExportResult();
         List<String> creationOrder = zipSource.getCreationOrder();
 
@@ -353,7 +364,7 @@ public class ZipFileResourceTestUtils {
         RequestContext.get().setUser(TestUtilsV2.TEST_USER, null);
 
         AtlasImportRequest request = getDefaultImportRequest();
-        AtlasImportResult result = runImportWithParameters(importService, request, zipSource);
+        AtlasImportResult result = runImportWithParameters(importService, request, new ByteArrayInputStream(baos.toByteArray()));
 
         assertNotNull(result);
         verifyImportedMetrics(exportResult, result);
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java
index 7436dc0..46164e8 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java
@@ -28,6 +28,7 @@ import org.testng.annotations.Test;
 import java.io.ByteArrayInputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.List;
 
 import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource;
@@ -127,7 +128,8 @@ public class ZipSourceTest {
     }
 
     @Test(dataProvider = "sales")
-    public void iteratorSetPositionBehavor(ZipSource zipSource) throws IOException, AtlasBaseException {
+    public void iteratorSetPositionBehavor(InputStream inputStream) throws IOException, AtlasBaseException {
+        ZipSource zipSource = new ZipSource(inputStream);
         Assert.assertTrue(zipSource.hasNext());
 
         List<String> creationOrder = zipSource.getCreationOrder();
diff --git a/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationTest.java b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationTest.java
index 9d6d057..6f9c05e 100644
--- a/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationTest.java
@@ -51,6 +51,7 @@ import org.testng.annotations.Test;
 import javax.inject.Inject;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -601,9 +602,8 @@ public class ClassificationPropagationTest {
         }
     }
 
-    public static ZipSource getZipSource(String fileName) throws IOException, AtlasBaseException {
-        FileInputStream fs = ZipFileResourceTestUtils.getFileInputStream(fileName);
-        return new ZipSource(fs);
+    public static InputStream getZipSource(String fileName) throws IOException {
+        return ZipFileResourceTestUtils.getFileInputStream(fileName);
     }
 
     private void loadSampleClassificationDefs() throws AtlasBaseException {
diff --git a/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java b/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java
index b2f2633..baeafd4 100644
--- a/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java
@@ -39,6 +39,7 @@ import org.testng.annotations.Test;
 import javax.inject.Inject;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.time.Clock;
 import java.time.Instant;
 import java.time.ZoneId;
@@ -246,9 +247,8 @@ public class MetricsServiceTest {
         }
     }
 
-    public static ZipSource getZipSource(String fileName) throws IOException, AtlasBaseException {
-        FileInputStream fs = ZipFileResourceTestUtils.getFileInputStream(fileName);
-        return new ZipSource(fs);
+    public static InputStream getZipSource(String fileName) throws AtlasBaseException {
+        return ZipFileResourceTestUtils.getFileInputStream(fileName);
     }
 
     private static class TestClock extends Clock {
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index e78fcb6..464d46f 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -45,7 +45,6 @@ import org.apache.atlas.repository.impexp.ExportService;
 import org.apache.atlas.repository.impexp.ImportService;
 import org.apache.atlas.repository.impexp.MigrationProgressService;
 import org.apache.atlas.repository.impexp.ZipSink;
-import org.apache.atlas.repository.impexp.ZipSource;
 import org.apache.atlas.repository.patches.AtlasPatchManager;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.services.MetricsService;
@@ -404,9 +403,8 @@ public class AdminResource {
 
         try {
             AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class);
-            ZipSource zipSource = new ZipSource(inputStream);
 
-            result = importService.run(zipSource, request, AtlasAuthorizationUtils.getCurrentUserName(),
+            result = importService.run(inputStream, request, Servlets.getUserName(httpServletRequest),
                     Servlets.getHostName(httpServletRequest),
                     AtlasAuthorizationUtils.getRequestIpAddress(httpServletRequest));
         } catch (AtlasBaseException excp) {