You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2019/09/09 23:37:21 UTC

[atlas] branch branch-0.8 updated: ATLAS-3396: ZipSourceWithBackingDirectory implementation.

This is an automated email from the ASF dual-hosted git repository.

amestry pushed a commit to branch branch-0.8
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-0.8 by this push:
     new 1778ed9  ATLAS-3396: ZipSourceWithBackingDirectory implementation.
1778ed9 is described below

commit 1778ed9d9f37e59a2f5eecfe8d4bf6384ac46942
Author: Ashutosh Mestry <am...@hortonworks.com>
AuthorDate: Mon Sep 9 14:12:08 2019 -0700

    ATLAS-3396: ZipSourceWithBackingDirectory implementation.
---
 .../java/org/apache/atlas/AtlasConfiguration.java  |   4 +-
 distro/src/conf/atlas-application.properties       |   3 +
 docs/src/site/twiki/Import-API-Options.twiki       |  11 +
 .../atlas/repository/impexp/ImportService.java     |  58 ++--
 .../apache/atlas/repository/impexp/ZipSource.java  |  10 +-
 .../impexp/ZipSourceWithBackingDirectory.java      | 371 +++++++++++++++++++++
 .../store/graph/v1/AtlasEntityStreamForImport.java |  52 +++
 .../store/graph/v1/BulkImporterImpl.java           |  14 +-
 .../store/graph/v1/EntityImportStream.java         |  29 ++
 .../repository/impexp/ExportImportTestBase.java    |   3 -
 .../repository/impexp/ExportIncrementalTest.java   |  68 ++--
 .../repository/impexp/ExportSkipLineageTest.java   |   7 +-
 .../atlas/repository/impexp/ImportServiceTest.java |  68 ++--
 .../impexp/ImportTransformsShaperTest.java         |   2 +-
 .../impexp/ReplicationEntityAttributeTest.java     |  22 +-
 .../impexp/ZipFileResourceTestUtils.java           |  50 +--
 .../atlas/repository/impexp/ZipSourceTest.java     |   4 +-
 .../apache/atlas/web/resources/AdminResource.java  |   7 +-
 18 files changed, 669 insertions(+), 114 deletions(-)

diff --git a/common/src/main/java/org/apache/atlas/AtlasConfiguration.java b/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
index c5357f5..17d67cb 100644
--- a/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -47,7 +47,9 @@ public enum AtlasConfiguration {
 
     //search configuration
     SEARCH_MAX_LIMIT("atlas.search.maxlimit", 10000),
-    SEARCH_DEFAULT_LIMIT("atlas.search.defaultlimit", 100);
+    SEARCH_DEFAULT_LIMIT("atlas.search.defaultlimit", 100),
+
+    IMPORT_TEMP_DIRECTORY("atlas.import.temp.directory", "");
 
     private static final Configuration APPLICATION_PROPERTIES;
 
diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties
index 654a930..62dab1f 100755
--- a/distro/src/conf/atlas-application.properties
+++ b/distro/src/conf/atlas-application.properties
@@ -65,6 +65,9 @@ ${titan.index.properties}
 # Solr-specific configuration property
 atlas.graph.index.search.max-result-set-size=150
 
+#########  Import Configs  #########
+#atlas.import.temp.directory=/temp/import
+
 #########  Notification Configs  #########
 atlas.notification.embedded=true
 atlas.kafka.data=${sys:atlas.home}/data/kafka
diff --git a/docs/src/site/twiki/Import-API-Options.twiki b/docs/src/site/twiki/Import-API-Options.twiki
index 4004e70..7f90475 100644
--- a/docs/src/site/twiki/Import-API-Options.twiki
+++ b/docs/src/site/twiki/Import-API-Options.twiki
@@ -26,6 +26,7 @@ Following options are supported for Import process:
    * Specify transforms during import operation.
    * Resume import by specifying starting entity guid.
    * Optionally import type definition.
+   * Handling large imports.
 
 ---++++ Transforms
 
@@ -133,3 +134,13 @@ curl -g -X POST -u adminuser:password -H "Content-Type: application/json"
             -d r@importOptions.json
             "http://localhost:21000/api/atlas/admin/importfile"
 </verbatim>
+
+---++++ Handling Large Imports
+
+By default, the Import Service stores all of the data in memory. This may be limiting for ZIPs containing large amount of data.
+
+To configure temporary directory use the application property _atlas.import.temp.directory_. If this property is left blank, default in-memory implementation is used.
+
+Please ensure that there is sufficient disk space available for the operation.
+
+The contents of the directory created as backing store for the import operation will be erased after the operation is over.
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index b5d8b7c..90e9f5d 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -18,6 +18,7 @@
 package org.apache.atlas.repository.impexp;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.entitytransform.BaseEntityHandler;
 import org.apache.atlas.entitytransform.TransformerContext;
@@ -26,22 +27,23 @@ import org.apache.atlas.model.impexp.AtlasImportRequest;
 import org.apache.atlas.model.impexp.AtlasImportResult;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
 import org.apache.atlas.repository.store.graph.BulkImporter;
+import org.apache.atlas.repository.store.graph.v1.EntityImportStream;
 import org.apache.atlas.store.AtlasTypeDefStore;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
 import javax.inject.Inject;
-import java.io.ByteArrayInputStream;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.List;
 
 import static org.apache.atlas.model.impexp.AtlasImportRequest.TRANSFORMERS_KEY;
@@ -71,18 +73,25 @@ public class ImportService {
         this.importTransformsShaper = importTransformsShaper;
     }
 
-    public AtlasImportResult run(ZipSource source, String userName,
+    public AtlasImportResult run(InputStream inputStream, String userName,
                                  String hostName, String requestingIP) throws AtlasBaseException {
-        return run(source, null, userName, hostName, requestingIP);
+        return run(inputStream, null, userName, hostName, requestingIP);
     }
 
 
-    public AtlasImportResult run(ZipSource source, AtlasImportRequest request, String userName,
+    public AtlasImportResult run(InputStream inputStream, AtlasImportRequest request, String userName,
                                  String hostName, String requestingIP) throws AtlasBaseException {
         if (request == null) {
             request = new AtlasImportRequest();
         }
 
+        EntityImportStream source = createZipSource(inputStream, AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
+        return run(source, request, userName, hostName, requestingIP);
+    }
+
+    @VisibleForTesting
+    AtlasImportResult run(EntityImportStream source, AtlasImportRequest request, String userName,
+                                 String hostName, String requestingIP) throws AtlasBaseException {
         AtlasImportResult result = new AtlasImportResult(request, userName, requestingIP, hostName, System.currentTimeMillis());
 
         try {
@@ -107,7 +116,10 @@ public class ImportService {
 
             throw new AtlasBaseException(excp);
         } finally {
-            source.close();
+            if (source != null) {
+                source.close();
+            }
+
             LOG.info("<== import(user={}, from={}): status={}", userName, requestingIP, result.getOperationStatus());
         }
 
@@ -115,7 +127,7 @@ public class ImportService {
     }
 
     @VisibleForTesting
-    void setImportTransform(ZipSource source, String transforms) throws AtlasBaseException {
+    void setImportTransform(EntityImportStream source, String transforms) throws AtlasBaseException {
         ImportTransforms importTransform = ImportTransforms.fromJson(transforms);
         if (importTransform == null) {
             return;
@@ -127,11 +139,10 @@ public class ImportService {
         if(LOG.isDebugEnabled()) {
             debugLog("   => transforms: {}", AtlasType.toJson(importTransform));
         }
-
     }
 
     @VisibleForTesting
-    void setEntityTransformerHandlers(ZipSource source, String transformersJson) throws AtlasBaseException {
+    void setEntityTransformerHandlers(EntityImportStream source, String transformersJson) throws AtlasBaseException {
         if (StringUtils.isEmpty(transformersJson)) {
             return;
         }
@@ -151,7 +162,7 @@ public class ImportService {
         LOG.debug(s, params);
     }
 
-    private void setStartPosition(AtlasImportRequest request, ZipSource source) throws AtlasBaseException {
+    private void setStartPosition(AtlasImportRequest request, EntityImportStream source) throws AtlasBaseException {
         if (request.getStartGuid() != null) {
             source.setPositionUsingEntityGuid(request.getStartGuid());
         } else if (request.getStartPosition() != null) {
@@ -159,8 +170,7 @@ public class ImportService {
         }
     }
 
-    public AtlasImportResult run(AtlasImportRequest request, String userName, String hostName, String requestingIP)
-            throws AtlasBaseException {
+    public AtlasImportResult run(AtlasImportRequest request, String userName, String hostName, String requestingIP) throws AtlasBaseException {
         String fileName = request.getFileName();
 
         if (StringUtils.isBlank(fileName)) {
@@ -168,14 +178,11 @@ public class ImportService {
         }
 
         AtlasImportResult result = null;
-
         try {
             LOG.info("==> import(user={}, from={}, fileName={})", userName, requestingIP, fileName);
 
-            String transforms = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(TRANSFORMS_KEY) : null;
             File file = new File(fileName);
-            ZipSource source = new ZipSource(new ByteArrayInputStream(FileUtils.readFileToByteArray(file)), ImportTransforms.fromJson(transforms));
-            result = run(source, request, userName, hostName, requestingIP);
+            result = run(new FileInputStream(file), request, userName, hostName, requestingIP);
         } catch (AtlasBaseException excp) {
             LOG.error("import(user={}, from={}, fileName={}): failed", userName, requestingIP, excp);
 
@@ -184,10 +191,6 @@ public class ImportService {
             LOG.error("import(user={}, from={}, fileName={}): file not found", userName, requestingIP, excp);
 
             throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, fileName + ": file not found");
-        } catch (IOException excp) {
-            LOG.error("import(user={}, from={}, fileName={}): cannot read file", userName, requestingIP, excp);
-
-            throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, fileName + ": cannot read file");
         } catch (Exception excp) {
             LOG.error("import(user={}, from={}, fileName={}): failed", userName, requestingIP, excp);
 
@@ -209,7 +212,7 @@ public class ImportService {
         importTypeDefProcessor.processTypes(typeDefinitionMap, result);
     }
 
-    private void processEntities(String userName, ZipSource importSource, AtlasImportResult result) throws AtlasBaseException {
+    private void processEntities(String userName, EntityImportStream importSource, AtlasImportResult result) throws AtlasBaseException {
         this.bulkImporter.bulkImport(importSource, result);
 
         endTimestamp = System.currentTimeMillis();
@@ -223,4 +226,17 @@ public class ImportService {
     private int getDuration(long endTime, long startTime) {
         return (int) (endTime - startTime);
     }
+
+    private EntityImportStream createZipSource(InputStream inputStream, String configuredTemporaryDirectory) throws AtlasBaseException {
+        try {
+            if (StringUtils.isEmpty(configuredTemporaryDirectory)) {
+                return new ZipSource(inputStream);
+            }
+
+            return new ZipSourceWithBackingDirectory(inputStream, configuredTemporaryDirectory);
+        }
+        catch (IOException ex) {
+            throw new AtlasBaseException(ex);
+        }
+    }
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
index 016acd7..ce5fe18 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
@@ -82,20 +82,25 @@ public class ZipSource implements EntityImportStream {
                                  guidEntityJsonMap.get(key).equals("[]"));
     }
 
+    @Override
     public ImportTransforms getImportTransform() { return this.importTransform; }
 
+    @Override
     public void setImportTransform(ImportTransforms importTransform) {
         this.importTransform = importTransform;
     }
 
+    @Override
     public List<BaseEntityHandler> getEntityHandlers() {
         return entityHandlers;
     }
 
+    @Override
     public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) {
         this.entityHandlers = entityHandlers;
     }
 
+    @Override
     public AtlasTypesDef getTypesDef() throws AtlasBaseException {
         final String fileName = ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString();
 
@@ -103,6 +108,7 @@ public class ZipSource implements EntityImportStream {
         return convertFromJson(AtlasTypesDef.class, s);
     }
 
+    @Override
     public AtlasExportResult getExportResult() throws AtlasBaseException {
         final String fileName = ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toString();
 
@@ -147,6 +153,7 @@ public class ZipSource implements EntityImportStream {
         zipInputStream.close();
     }
 
+    @Override
     public List<String> getCreationOrder() {
         return this.creationOrder;
     }
@@ -204,6 +211,7 @@ public class ZipSource implements EntityImportStream {
         return s;
     }
 
+    @Override
     public void close() {
         try {
             inputStream.close();
@@ -278,7 +286,7 @@ public class ZipSource implements EntityImportStream {
         currentPosition = index;
         reset();
         for (int i = 0; i < creationOrder.size() && i <= index; i++) {
-            iterator.next();
+            onImportComplete(iterator.next());
         }
     }
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceWithBackingDirectory.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceWithBackingDirectory.java
new file mode 100644
index 0000000..7c3199f
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceWithBackingDirectory.java
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.entitytransform.BaseEntityHandler;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportResult;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.repository.store.graph.v1.EntityImportStream;
+import org.apache.atlas.type.AtlasType;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+import static org.apache.atlas.AtlasErrorCode.IMPORT_ATTEMPTING_EMPTY_ZIP;
+
+public class ZipSourceWithBackingDirectory implements EntityImportStream {
+    private static final Logger LOG = LoggerFactory.getLogger(ZipSourceWithBackingDirectory.class);
+    private static final String TEMPORARY_DIRECTORY_PREFIX = "atlas-import-temp-";
+    private static final String EXT_JSON = ".json";
+
+    private Path tempDirectory;
+
+    private ImportTransforms importTransform;
+    private List<BaseEntityHandler> entityHandlers;
+
+    private ArrayList<String> creationOrder = new ArrayList<>();
+    private int currentPosition;
+    private int numberOfEntries;
+
+    public ZipSourceWithBackingDirectory(InputStream inputStream) throws IOException, AtlasBaseException {
+        this(inputStream, null);
+    }
+
+    public ZipSourceWithBackingDirectory(InputStream inputStream, String backingDirectory) throws IOException, AtlasBaseException {
+        setupBackingStore(inputStream, backingDirectory);
+        if (isZipFileEmpty()) {
+            throw new AtlasBaseException(IMPORT_ATTEMPTING_EMPTY_ZIP, "Attempting to import empty ZIP.");
+        }
+    }
+
+    @Override
+    public ImportTransforms getImportTransform() { return this.importTransform; }
+
+    @Override
+    public void setImportTransform(ImportTransforms importTransform) {
+        this.importTransform = importTransform;
+    }
+
+    @Override
+    public List<BaseEntityHandler> getEntityHandlers() {
+        return entityHandlers;
+    }
+
+    @Override
+    public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) {
+        this.entityHandlers = entityHandlers;
+    }
+
+    @Override
+    public AtlasTypesDef getTypesDef() throws AtlasBaseException {
+        return getJsonFromEntry(ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString(), AtlasTypesDef.class);
+    }
+
+    @Override
+    public AtlasExportResult getExportResult() throws AtlasBaseException {
+        return getJsonFromEntry(ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toString(), AtlasExportResult.class);
+    }
+
+    @Override
+    public List<String> getCreationOrder() {
+        return creationOrder;
+    }
+
+    @Override
+    public int getPosition() {
+        return currentPosition;
+    }
+
+    @Override
+    public AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException {
+        final File file = getFileFromTemporaryDirectory(guid + EXT_JSON);
+        if (!file.exists()) {
+            return null;
+        }
+
+        String json = getJsonStringForFile(file);
+        if (StringUtils.isEmpty(json)) {
+            return null;
+        }
+
+        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = convertFromJson(AtlasEntity.AtlasEntityWithExtInfo.class, json);
+
+        if (importTransform != null) {
+            entityWithExtInfo = importTransform.apply(entityWithExtInfo);
+        }
+
+        if (entityHandlers != null) {
+            applyTransformers(entityWithExtInfo);
+        }
+
+        return entityWithExtInfo;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return (currentPosition < numberOfEntries);
+    }
+
+    @Override
+    public AtlasEntity next() {
+        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = getNextEntityWithExtInfo();
+
+        return entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
+    }
+
+    @Override
+    public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
+        try {
+            return getEntityWithExtInfo(moveNext());
+        } catch (AtlasBaseException e) {
+            LOG.error("getNextEntityWithExtInfo", e);
+            return null;
+        }
+    }
+
+    @Override
+    public void reset() {
+        currentPosition = 0;
+    }
+
+    @Override
+    public AtlasEntity getByGuid(String guid) {
+        try {
+            return getEntity(guid);
+        } catch (AtlasBaseException e) {
+            LOG.error("getByGuid: {} failed!", guid, e);
+            return null;
+        }
+    }
+
+
+    @Override
+    public void onImportComplete(String guid) {
+        getFileFromTemporaryDirectory(guid + EXT_JSON).delete();
+    }
+
+    @Override
+    public void setPosition(int index) {
+        reset();
+        for (int i = 0; i < numberOfEntries && i <= index; i++) {
+            onImportComplete(moveNext());
+        }
+    }
+
+    @Override
+    public void setPositionUsingEntityGuid(String guid) {
+        if (StringUtils.isEmpty(guid)) {
+            return;
+        }
+
+        String current;
+        while (currentPosition < numberOfEntries) {
+            current = creationOrder.get(currentPosition);
+            if (current.equals(guid)) {
+                return;
+            }
+
+            moveNext();
+        }
+    }
+
+    @Override
+    public void close() {
+        creationOrder.clear();
+        try {
+            LOG.error("Removing temporary directory: {}", tempDirectory.toString());
+            FileUtils.deleteDirectory(tempDirectory.toFile());
+        } catch (IOException e) {
+            LOG.error("Error deleting: {}", tempDirectory.toString(), e);
+        }
+    }
+
+    private boolean isZipFileEmpty() {
+        return (numberOfEntries == 0);
+    }
+
+    private <T> T getJsonFromEntry(String entryName, Class<T> clazz) throws AtlasBaseException {
+        final File file = getFileFromTemporaryDirectory(entryName + EXT_JSON);
+        if (!file.exists()) {
+            throw new AtlasBaseException(entryName + " not found!");
+        }
+
+        return convertFromJson(clazz, getJsonStringForFile(file));
+    }
+
+    private void setupBackingStore(InputStream inputStream, String backingDirectory) throws AtlasBaseException, IOException {
+        initTempDirectory(backingDirectory);
+        unzipToTempDirectory(inputStream);
+        setupIterator();
+    }
+
+    private void initTempDirectory(String backingDirectory) throws AtlasBaseException {
+        try {
+            tempDirectory = Files.createDirectory(Paths.get(backingDirectory, getChildDirectoryForSession()));
+            if (!permissionChecks(tempDirectory.toFile())) {
+                throw new AtlasBaseException(
+                        String.format("Import: Temporary directory: %s does not have permissions for operation!", tempDirectory.toString()));
+            }
+        }
+        catch(Exception ex) {
+            throw new AtlasBaseException(String.format("Error fetching temporary directory: %s", tempDirectory.toString()), ex);
+        }
+    }
+
+    private String getChildDirectoryForSession() {
+        return String.format("%s%s", TEMPORARY_DIRECTORY_PREFIX, UUID.randomUUID());
+    }
+
+    private boolean permissionChecks(File f) {
+        return f.exists() && f.isDirectory() && f.canWrite();
+    }
+
+    private void unzipToTempDirectory(InputStream inputStream) throws IOException {
+        LOG.info("Import: Temporary directory: {}", tempDirectory.toString());
+
+        ZipInputStream zis = new ZipInputStream(inputStream);
+        try {
+            ZipEntry zipEntry = zis.getNextEntry();
+            while (zipEntry != null) {
+                String entryName = zipEntry.getName();
+
+                writeJsonToFile(entryName,  getJsonPayloadFromZipEntryStream(zis));
+                numberOfEntries++;
+
+                zipEntry = zis.getNextEntry();
+            }
+
+            numberOfEntries -= ZipExportFileNames.values().length;
+        }
+        finally {
+            zis.close();
+            inputStream.close();
+        }
+    }
+
+    private void writeJsonToFile(String entryName, byte[] jsonPayload) throws IOException {
+        File f = getFileFromTemporaryDirectory(entryName);
+        Files.write(f.toPath(), jsonPayload);
+    }
+
+    private File getFileFromTemporaryDirectory(String entryName) {
+        return new File(tempDirectory.toFile(), entryName);
+    }
+
+    private void setupIterator() {
+        try {
+            creationOrder = getJsonFromEntry(ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString(), ArrayList.class);
+        } catch (AtlasBaseException e) {
+            LOG.error("Error fetching: {}. Error generating order.", ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString(), e);
+        }
+
+        reset();
+    }
+
+    private byte[] getJsonPayloadFromZipEntryStream(ZipInputStream zipInputStream) {
+        try {
+            byte[] buf = new byte[1024];
+
+            int n = 0;
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            while ((n = zipInputStream.read(buf, 0, 1024)) > -1) {
+                bos.write(buf, 0, n);
+            }
+
+            return bos.toByteArray();
+        } catch (IOException ex) {
+            LOG.error("Error fetching string from entry.", ex);
+        }
+
+        return null;
+    }
+
+    private String getJsonStringForFile(File file) {
+        try {
+            byte[] bytes = Files.readAllBytes(file.toPath());
+            return new String(bytes);
+        } catch (IOException e) {
+            LOG.warn("Error fetching: {}", file.toString(), e);
+            return null;
+        }
+    }
+
+    private void applyTransformers(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
+        if (entityWithExtInfo == null) {
+            return;
+        }
+
+        transform(entityWithExtInfo.getEntity());
+
+        if (MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) {
+            for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
+                transform(e);
+            }
+        }
+    }
+
+    private void transform(AtlasEntity e) {
+        for (BaseEntityHandler handler : entityHandlers) {
+            handler.transform(e);
+        }
+    }
+
+    private <T> T convertFromJson(Class<T> clazz, String jsonData) throws AtlasBaseException {
+        try {
+            return AtlasType.fromJson(jsonData, clazz);
+
+        } catch (Exception e) {
+            throw new AtlasBaseException("Error converting file to JSON.", e);
+        }
+    }
+
+    private AtlasEntity getEntity(String guid) throws AtlasBaseException {
+        AtlasEntity.AtlasEntityWithExtInfo extInfo = getEntityWithExtInfo(guid);
+        return (extInfo != null) ? extInfo.getEntity() : null;
+    }
+
+    public int size() {
+        return numberOfEntries;
+    }
+
+    private String moveNext() {
+        if (currentPosition < numberOfEntries) {
+            return creationOrder.get(currentPosition++);
+        }
+
+        return null;
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java
index 90ae15d..3c85f0f 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java
@@ -17,8 +17,15 @@
  */
 package org.apache.atlas.repository.store.graph.v1;
 
+import org.apache.atlas.entitytransform.BaseEntityHandler;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportResult;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.repository.impexp.ImportTransforms;
+
+import java.util.List;
 
 public class AtlasEntityStreamForImport extends AtlasEntityStream implements EntityImportStream {
     private int currentPosition = 0;
@@ -36,6 +43,11 @@ public class AtlasEntityStreamForImport extends AtlasEntityStream implements Ent
     }
 
     @Override
+    public AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException {
+        return null;
+    }
+
+    @Override
     public AtlasEntity getByGuid(String guid) {
         AtlasEntity ent = super.entitiesWithExtInfo.getEntity(guid);
 
@@ -69,4 +81,44 @@ public class AtlasEntityStreamForImport extends AtlasEntityStream implements Ent
     public void onImportComplete(String guid) {
 
     }
+
+    @Override
+    public void setImportTransform(ImportTransforms importTransform) {
+
+    }
+
+    @Override
+    public ImportTransforms getImportTransform() {
+        return null;
+    }
+
+    @Override
+    public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) {
+
+    }
+
+    @Override
+    public List<BaseEntityHandler> getEntityHandlers() {
+        return null;
+    }
+
+    @Override
+    public AtlasTypesDef getTypesDef() throws AtlasBaseException {
+        return null;
+    }
+
+    @Override
+    public AtlasExportResult getExportResult() throws AtlasBaseException {
+        return null;
+    }
+
+    @Override
+    public List<String> getCreationOrder() {
+        return null;
+    }
+
+    @Override
+    public void close() {
+
+    }
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java
index 2606692..17c6ac2 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java
@@ -18,7 +18,10 @@
 package org.apache.atlas.repository.store.graph.v1;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.AtlasException;
 import org.apache.atlas.RequestContext;
 import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.exception.AtlasBaseException;
@@ -29,6 +32,7 @@ import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.BulkImporter;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -45,10 +49,12 @@ public class BulkImporterImpl implements BulkImporter {
     private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV1.class);
 
     private final AtlasEntityStore entityStore;
+    private boolean directoryBasedImportConfigured;
 
     @Inject
     public BulkImporterImpl(AtlasEntityStore entityStore) {
         this.entityStore = entityStore;
+        this.directoryBasedImportConfigured = StringUtils.isNotEmpty(AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
     }
 
     @Override
@@ -128,9 +134,11 @@ public class BulkImporterImpl implements BulkImporter {
                                       AtlasImportResult                  importResult,
                                       Set<String>                        processedGuids,
                                       int currentIndex, int streamSize, float currentPercent) {
-        updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
-        updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
-        updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
+        if (!directoryBasedImportConfigured) {
+            updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
+            updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
+            updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
+        }
 
         String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid());
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java
index d4b6c55..8131922 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java
@@ -18,17 +18,46 @@
 package org.apache.atlas.repository.store.graph.v1;
 
 
+import org.apache.atlas.entitytransform.BaseEntityHandler;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportResult;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.repository.impexp.ImportTransforms;
+
+import java.util.List;
 
 public interface EntityImportStream extends EntityStream {
 
     int size();
+
     void setPosition(int position);
+
     int getPosition();
 
     void setPositionUsingEntityGuid(String guid);
 
     AtlasEntityWithExtInfo getNextEntityWithExtInfo();
 
+    AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException;
+
     void onImportComplete(String guid);
+
+    void setImportTransform(ImportTransforms importTransform);
+
+    public ImportTransforms getImportTransform();
+
+    void setEntityHandlers(List<BaseEntityHandler> entityHandlers);
+
+    List<BaseEntityHandler> getEntityHandlers();
+
+    AtlasTypesDef getTypesDef() throws AtlasBaseException;
+
+    AtlasExportResult getExportResult() throws AtlasBaseException;
+
+    List<String> getCreationOrder();
+
+    void close();
 }
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java
index a11bcdc..94b7177 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java
@@ -18,9 +18,6 @@
 
 package org.apache.atlas.repository.impexp;
 
-import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasConstants;
-import org.apache.atlas.AtlasException;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.impexp.ExportImportAuditEntry;
 import org.apache.atlas.model.instance.AtlasEntity;
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
index 5ec62e5..de4cf15 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
@@ -33,16 +33,20 @@ import org.apache.atlas.store.AtlasTypeDefStore;
 import org.apache.atlas.type.AtlasClassificationType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.utils.TestResourceFileUtils;
+import org.apache.commons.io.IOUtils;
 import org.testng.ITestContext;
 import org.testng.SkipException;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
-import org.testng.annotations.DataProvider;
 
 import javax.inject.Inject;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -52,8 +56,8 @@ import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREM
 import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createTypes;
 import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getEntities;
 import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource;
-import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
 import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runExportWithParameters;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
@@ -106,11 +110,13 @@ public class ExportIncrementalTest extends ExportImportTestBase {
     }
 
     @Test
-    public void atT0_ReturnsAllEntities() throws AtlasBaseException {
+    public void atT0_ReturnsAllEntities() throws AtlasBaseException, IOException {
         final int expectedEntityCount = 2;
 
         AtlasExportRequest request = getIncrementalRequest(0);
-        ZipSource source = runExportWithParameters(exportService, request);
+        InputStream inputStream = runExportWithParameters(exportService, request);
+
+        ZipSource source = getZipSourceFromInputStream(inputStream);
         AtlasEntity.AtlasEntityWithExtInfo entities = getEntities(source, expectedEntityCount);
 
         int count = 0;
@@ -128,13 +134,15 @@ public class ExportIncrementalTest extends ExportImportTestBase {
     }
 
     @Test(dependsOnMethods = "atT0_ReturnsAllEntities")
-    public void atT1_NewClassificationAttachedToTable_ReturnsChangedTable() throws AtlasBaseException {
+    public void atT1_NewClassificationAttachedToTable_ReturnsChangedTable() throws AtlasBaseException, IOException {
         final int expectedEntityCount = 1;
 
         entityStore.addClassifications(TABLE_GUID, ImmutableList.of(classificationTypeT1.createDefaultValue()));
 
         AtlasExportRequest request = getIncrementalRequest(nextTimestamp);
-        ZipSource source = runExportWithParameters(exportService, request);
+        InputStream inputStream = runExportWithParameters(exportService, request);
+
+        ZipSource source = getZipSourceFromInputStream(inputStream);
         AtlasEntity.AtlasEntityWithExtInfo entities = getEntities(source, expectedEntityCount);
 
         AtlasEntity entity = null;
@@ -154,7 +162,7 @@ public class ExportIncrementalTest extends ExportImportTestBase {
     }
 
     @Test(dependsOnMethods = "atT1_NewClassificationAttachedToTable_ReturnsChangedTable")
-    public void atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn() throws AtlasBaseException {
+    public void atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn() throws AtlasBaseException, IOException {
         final int expectedEntityCount = 1;
 
         AtlasEntity.AtlasEntityWithExtInfo tableEntity = entityStore.getById(TABLE_GUID);
@@ -162,7 +170,9 @@ public class ExportIncrementalTest extends ExportImportTestBase {
 
         entityStore.addClassifications(COLUMN_GUID_HIGH, ImmutableList.of(typeRegistry.getClassificationTypeByName("T1").createDefaultValue()));
 
-        ZipSource source = runExportWithParameters(exportService, getIncrementalRequest(nextTimestamp));
+        InputStream inputStream = runExportWithParameters(exportService, getIncrementalRequest(nextTimestamp));
+
+        ZipSource source = getZipSourceFromInputStream(inputStream);
         AtlasEntity.AtlasEntityWithExtInfo entities = getEntities(source, expectedEntityCount);
 
         for (Map.Entry<String, AtlasEntity> entry : entities.getReferredEntities().entrySet()) {
@@ -175,17 +185,26 @@ public class ExportIncrementalTest extends ExportImportTestBase {
         assertEquals(preExportTableEntityTimestamp, postUpdateTableEntityTimestamp);
     }
 
+    private ZipSource getZipSourceFromInputStream(InputStream inputStream) {
+        try {
+            return new ZipSource(inputStream);
+        } catch (IOException | AtlasBaseException e) {
+            return null;
+        }
+    }
+
     @Test(dependsOnMethods = "atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn")
     public void exportingWithSameParameters_Succeeds() {
-        ZipSource source = runExportWithParameters(exportService, getIncrementalRequest(nextTimestamp));
+        InputStream inputStream = runExportWithParameters(exportService, getIncrementalRequest(nextTimestamp));
 
-        assertNotNull(source);
+        assertNotNull(getZipSourceFromInputStream(inputStream));
     }
 
     @Test
     public void connectedExport() {
-        ZipSource source = runExportWithParameters(exportService, getConnected());
+        InputStream inputStream = runExportWithParameters(exportService, getConnected());
 
+        ZipSource source = getZipSourceFromInputStream(inputStream);
         UniqueList<String> creationOrder = new UniqueList<>();
         List<String> zipCreationOrder = source.getCreationOrder();
         creationOrder.addAll(zipCreationOrder);
@@ -199,27 +218,29 @@ public class ExportIncrementalTest extends ExportImportTestBase {
     }
 
     @Test(dataProvider = "hiveDb")
-    public void importHiveDb(ZipSource zipSource) throws AtlasBaseException, IOException {
-        runImportWithNoParameters(importService, zipSource);
+    public void importHiveDb(InputStream stream) throws AtlasBaseException, IOException {
+        runImportWithNoParameters(importService, stream);
     }
 
     @Test(dependsOnMethods = "importHiveDb")
-    public void exportTableInrementalConnected() throws AtlasBaseException {
-        ZipSource source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, 0, true));
-        verifyExpectedEntities(getFileNames(source), GUID_DB, GUID_TABLE_CTAS_2);
+    public void exportTableInrementalConnected() throws AtlasBaseException, IOException {
+        InputStream source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, 0, true));
 
-        nextTimestamp = updateTimesampForNextIncrementalExport(source);
+        ZipSource sourceCopy = getZipSourceCopy(source);
+        verifyExpectedEntities(getFileNames(sourceCopy), GUID_DB, GUID_TABLE_CTAS_2);
+
+        nextTimestamp = updateTimesampForNextIncrementalExport(sourceCopy);
 
         try {
             source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, nextTimestamp, true));
-        }catch (SkipException e){
-
+        } catch (SkipException e) {
+            throw e;
         }
 
         entityStore.addClassifications(GUID_TABLE_CTAS_2, ImmutableList.of(classificationTypeT1.createDefaultValue()));
 
         source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, nextTimestamp, true));
-        verifyExpectedEntities(getFileNames(source), GUID_TABLE_CTAS_2);
+        verifyExpectedEntities(getFileNames(getZipSourceCopy(source)), GUID_TABLE_CTAS_2);
     }
 
 
@@ -280,4 +301,11 @@ public class ExportIncrementalTest extends ExportImportTestBase {
         }
         return ret;
     }
+
+    private ZipSource getZipSourceCopy(InputStream is) throws IOException, AtlasBaseException {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        IOUtils.copy(is, baos);
+
+        return new ZipSource(new ByteArrayInputStream(baos.toByteArray()));
+    }
 }
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java
index eaf4602..cc49ce7 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java
@@ -37,6 +37,7 @@ import org.testng.annotations.Test;
 
 import javax.inject.Inject;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.Map;
 
 import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel;
@@ -86,11 +87,13 @@ public class ExportSkipLineageTest extends ExportImportTestBase {
     }
 
     @Test
-    public void exportWithoutLineage() {
+    public void exportWithoutLineage() throws IOException, AtlasBaseException {
         final int expectedEntityCount = 3;
 
         AtlasExportRequest request = getRequest();
-        ZipSource source = runExportWithParameters(exportService, request);
+        InputStream inputStream = runExportWithParameters(exportService, request);
+
+        ZipSource source = new ZipSource(inputStream);
         AtlasEntity.AtlasEntityWithExtInfo entities = ZipFileResourceTestUtils.getEntities(source, expectedEntityCount);
 
         int count = 0;
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
index 693a163..2aee233 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
@@ -18,15 +18,11 @@
 package org.apache.atlas.repository.impexp;
 
 import com.google.inject.Inject;
-import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasConstants;
 import org.apache.atlas.AtlasErrorCode;
-import org.apache.atlas.AtlasException;
 import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.TestModules;
 import org.apache.atlas.TestUtilsV2;
 import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.discovery.AtlasSearchResult;
 import org.apache.atlas.model.impexp.AtlasImportRequest;
 import org.apache.atlas.store.AtlasTypeDefStore;
 import org.apache.atlas.type.AtlasClassificationType;
@@ -43,16 +39,23 @@ import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.*;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getDefaultImportRequest;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromJson;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromResourcesJson;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runAndVerifyQuickStart_v1_Import;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParametersUsingBackingDirectory;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithParameters;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
 
 @Guice(modules = TestModules.TestOnlyModule.class)
 public class ImportServiceTest extends ExportImportTestBase {
@@ -90,9 +93,9 @@ public class ImportServiceTest extends ExportImportTestBase {
     }
 
     @Test(dataProvider = "sales")
-    public void importDB1(ZipSource zipSource) throws AtlasBaseException, IOException {
+    public void importDB1(InputStream inputStream) throws AtlasBaseException, IOException {
         loadBaseModel();
-        runAndVerifyQuickStart_v1_Import(importService, zipSource);
+        runAndVerifyQuickStart_v1_Import(importService, inputStream);
     }
 
     @DataProvider(name = "reporting")
@@ -101,9 +104,9 @@ public class ImportServiceTest extends ExportImportTestBase {
     }
 
     @Test(dataProvider = "reporting")
-    public void importDB2(ZipSource zipSource) throws AtlasBaseException, IOException {
+    public void importDB2(InputStream inputStream) throws AtlasBaseException, IOException {
         loadBaseModel();
-        runAndVerifyQuickStart_v1_Import(importService, zipSource);
+        runAndVerifyQuickStart_v1_Import(importService, inputStream);
     }
 
     @DataProvider(name = "logging")
@@ -112,9 +115,9 @@ public class ImportServiceTest extends ExportImportTestBase {
     }
 
     @Test(dataProvider = "logging")
-    public void importDB3(ZipSource zipSource) throws AtlasBaseException, IOException {
+    public void importDB3(InputStream inputStream) throws AtlasBaseException, IOException {
         loadBaseModel();
-        runAndVerifyQuickStart_v1_Import(importService, zipSource);
+        runAndVerifyQuickStart_v1_Import(importService, inputStream);
     }
 
     @DataProvider(name = "salesNewTypeAttrs")
@@ -123,9 +126,9 @@ public class ImportServiceTest extends ExportImportTestBase {
     }
 
     @Test(dataProvider = "salesNewTypeAttrs", dependsOnMethods = "importDB1")
-    public void importDB4(ZipSource zipSource) throws AtlasBaseException, IOException {
+    public void importDB4(InputStream inputStream) throws AtlasBaseException, IOException {
         loadBaseModel();
-        runImportWithParameters(importService, getDefaultImportRequest(), zipSource);
+        runImportWithParameters(importService, getDefaultImportRequest(), inputStream);
     }
 
     @DataProvider(name = "salesNewTypeAttrs-next")
@@ -135,7 +138,7 @@ public class ImportServiceTest extends ExportImportTestBase {
 
 
     @Test(dataProvider = "salesNewTypeAttrs-next", dependsOnMethods = "importDB4")
-    public void importDB5(ZipSource zipSource) throws AtlasBaseException, IOException {
+    public void importDB5(InputStream inputStream) throws AtlasBaseException, IOException {
         final String newEnumDefName = "database_action";
 
         assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName));
@@ -145,13 +148,13 @@ public class ImportServiceTest extends ExportImportTestBase {
         options.put("updateTypeDefinition", "false");
         request.setOptions(options);
 
-        runImportWithParameters(importService, request, zipSource);
+        runImportWithParameters(importService, request, inputStream);
         assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName));
         assertEquals(typeDefStore.getEnumDefByName(newEnumDefName).getElementDefs().size(), 4);
     }
 
     @Test(dataProvider = "salesNewTypeAttrs-next", dependsOnMethods = "importDB4")
-    public void importDB6(ZipSource zipSource) throws AtlasBaseException, IOException {
+    public void importDB6(InputStream inputStream) throws AtlasBaseException, IOException {
         final String newEnumDefName = "database_action";
 
         assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName));
@@ -161,7 +164,7 @@ public class ImportServiceTest extends ExportImportTestBase {
         options.put("updateTypeDefinition", "true");
         request.setOptions(options);
 
-        runImportWithParameters(importService, request, zipSource);
+        runImportWithParameters(importService, request, inputStream);
         assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName));
         assertEquals(typeDefStore.getEnumDefByName(newEnumDefName).getElementDefs().size(), 8);
     }
@@ -172,11 +175,11 @@ public class ImportServiceTest extends ExportImportTestBase {
     }
 
     @Test(dataProvider = "ctas")
-    public void importCTAS(ZipSource zipSource) throws IOException, AtlasBaseException {
+    public void importCTAS(InputStream inputStream) throws IOException, AtlasBaseException {
         loadBaseModel();
         loadHiveModel();
 
-        runImportWithNoParameters(importService, zipSource);
+        runImportWithNoParameters(importService, inputStream);
     }
 
     @DataProvider(name = "hdfs_path1")
@@ -186,13 +189,13 @@ public class ImportServiceTest extends ExportImportTestBase {
 
 
     @Test(dataProvider = "hdfs_path1", expectedExceptions = AtlasBaseException.class)
-    public void importHdfs_path1(ZipSource zipSource) throws IOException, AtlasBaseException {
+    public void importHdfs_path1(InputStream inputStream) throws IOException, AtlasBaseException {
         loadBaseModel();
         loadFsModel();
         loadModelFromResourcesJson("tag1.json", typeDefStore, typeRegistry);
 
         try {
-            runImportWithNoParameters(importService, zipSource);
+            runImportWithNoParameters(importService, inputStream);
         } catch (AtlasBaseException e) {
             assertEquals(e.getAtlasErrorCode(), AtlasErrorCode.INVALID_IMPORT_ATTRIBUTE_TYPE_CHANGED);
             AtlasClassificationType tag1 = typeRegistry.getClassificationTypeByName("tag1");
@@ -202,6 +205,14 @@ public class ImportServiceTest extends ExportImportTestBase {
         }
     }
 
+    @Test(dataProvider = "ctas")
+    public void importWithBackingDirectory(InputStream inputStream) throws IOException, AtlasBaseException {
+        loadBaseModel();
+        loadFsModel();
+
+        runImportWithNoParametersUsingBackingDirectory(importService, inputStream);
+    }
+
     @Test
     public void importServiceProcessesIOException() {
         ImportService importService = new ImportService(typeDefStore, typeRegistry, null, null,null);
@@ -238,11 +249,12 @@ public class ImportServiceTest extends ExportImportTestBase {
     }
 
     @Test(dataProvider = "salesNewTypeAttrs-next")
-    public void transformUpdatesForSubTypes(ZipSource zipSource) throws IOException, AtlasBaseException {
+    public void transformUpdatesForSubTypes(InputStream inputStream) throws IOException, AtlasBaseException {
         loadModelFromJson("0010-base_model.json", typeDefStore, typeRegistry);
         loadModelFromJson("0030-hive_model.json", typeDefStore, typeRegistry);
 
         String transformJSON = "{ \"Asset\": { \"qualifiedName\":[ \"lowercase\", \"replace:@cl1:@cl2\" ] } }";
+        ZipSource zipSource = new ZipSource(inputStream);
         importService.setImportTransform(zipSource, transformJSON);
         ImportTransforms importTransforms = zipSource.getImportTransform();
 
@@ -252,11 +264,12 @@ public class ImportServiceTest extends ExportImportTestBase {
     }
 
     @Test(dataProvider = "salesNewTypeAttrs-next")
-    public void transformUpdatesForSubTypesAddsToExistingTransforms(ZipSource zipSource) throws IOException, AtlasBaseException {
+    public void transformUpdatesForSubTypesAddsToExistingTransforms(InputStream inputStream) throws IOException, AtlasBaseException {
         loadModelFromJson("0010-base_model.json", typeDefStore, typeRegistry);
         loadModelFromJson("0030-hive_model.json", typeDefStore, typeRegistry);
 
         String transformJSON = "{ \"Asset\": { \"qualifiedName\":[ \"replace:@cl1:@cl2\" ] }, \"hive_table\": { \"qualifiedName\":[ \"lowercase\" ] } }";
+        ZipSource zipSource = new ZipSource(inputStream);
         importService.setImportTransform(zipSource, transformJSON);
         ImportTransforms importTransforms = zipSource.getImportTransform();
 
@@ -266,13 +279,8 @@ public class ImportServiceTest extends ExportImportTestBase {
         assertEquals(importTransforms.getTransforms().get("hive_table").get("qualifiedName").size(), 2);
     }
 
-    @Test(dataProvider = "empty-zip", expectedExceptions = AtlasBaseException.class)
-    public void importEmptyZip(ZipSource zipSource) {
-
-    }
-
     @Test(expectedExceptions = AtlasBaseException.class)
     public void importEmptyZip() throws IOException, AtlasBaseException {
-        getZipSource("empty.zip");
+        new ZipSource((InputStream) getZipSource("empty.zip")[0][0]);
     }
 }
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java
index 06bdaa6..78fdaca 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java
@@ -66,7 +66,7 @@ public class ImportTransformsShaperTest extends ExportImportTestBase {
     public void newTagIsCreatedAndEntitiesAreTagged() throws AtlasBaseException, IOException {
         AtlasImportResult result = ZipFileResourceTestUtils.runImportWithParameters(importService,
                 getImporRequest(),
-                ZipFileResourceTestUtils.getZipSourceFrom("stocks.zip"));
+                ZipFileResourceTestUtils.getInputStreamFrom("stocks.zip"));
 
         AtlasClassificationType classification = typeRegistry.getClassificationTypeByName(TAG_NAME);
         assertNotNull(classification);
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
index 5e909e1..5315535 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
@@ -24,10 +24,10 @@ import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.TestModules;
 import org.apache.atlas.TestUtilsV2;
 import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.impexp.AtlasServer;
 import org.apache.atlas.model.impexp.AtlasExportRequest;
 import org.apache.atlas.model.impexp.AtlasImportRequest;
 import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AtlasServer;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.typedef.AtlasEntityDef;
 import org.apache.atlas.repository.Constants;
@@ -37,6 +37,7 @@ import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.utils.TestResourceFileUtils;
+import org.apache.commons.io.IOUtils;
 import org.testng.SkipException;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
@@ -44,7 +45,10 @@ import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
 import javax.inject.Inject;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.List;
 
 import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_KEY_REPLICATED_TO;
@@ -84,7 +88,7 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
     @Inject
     private AtlasEntityStoreV1 entityStore;
 
-    private ZipSource zipSource;
+    private InputStream inputStream;
 
     @BeforeClass
     public void setup() throws IOException, AtlasBaseException {
@@ -103,13 +107,19 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
     }
 
     @Test
-    public void exportWithReplicationToOption_AddsClusterObjectIdToReplicatedFromAttribute() throws AtlasBaseException {
+    public void exportWithReplicationToOption_AddsClusterObjectIdToReplicatedFromAttribute() throws AtlasBaseException, IOException {
         final int expectedEntityCount = 2;
 
         AtlasExportRequest request = getUpdateMetaInfoUpdateRequest();
-        zipSource = runExportWithParameters(exportService, request);
+        InputStream inputStream = runExportWithParameters(exportService, request);
+
+        assertNotNull(inputStream);
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        IOUtils.copy(inputStream, baos);
+        this.inputStream = new ByteArrayInputStream(baos.toByteArray());
 
-        assertNotNull(zipSource);
+        ZipSource zipSource = new ZipSource(new ByteArrayInputStream(baos.toByteArray()));
         assertNotNull(zipSource.getCreationOrder());
         assertEquals(zipSource.getCreationOrder().size(), expectedEntityCount);
 
@@ -135,7 +145,7 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
     @Test(dependsOnMethods = "exportWithReplicationToOption_AddsClusterObjectIdToReplicatedFromAttribute")
     public void importWithReplicationFromOption_AddsClusterObjectIdToReplicatedFromAttribute() throws AtlasBaseException, IOException {
         AtlasImportRequest request = getImportRequestWithReplicationOption();
-        AtlasImportResult importResult = runImportWithParameters(importService, request, zipSource);
+        AtlasImportResult importResult = runImportWithParameters(importService, request, inputStream);
 
         assertCluster(
                 AuditsWriter.getServerNameFromFullName(REPLICATED_FROM_CLUSTER_NAME),
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
index a2a5f58..4b3af4c 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
@@ -33,12 +33,13 @@ import org.apache.atlas.model.typedef.AtlasTypesDef;
 import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer;
 import org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1;
 import org.apache.atlas.repository.store.graph.v1.AtlasEntityStreamForImport;
+import org.apache.atlas.repository.store.graph.v1.EntityImportStream;
 import org.apache.atlas.store.AtlasTypeDefStore;
-import org.apache.atlas.type.AtlasStructType;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.utils.TestResourceFileUtils;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,6 +50,8 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -90,19 +93,11 @@ public class ZipFileResourceTestUtils {
     }
 
     public static Object[][] getZipSource(String fileName) throws IOException, AtlasBaseException {
-        return new Object[][]{{getZipSourceFrom(fileName)}};
+        return new Object[][]{{getInputStreamFrom(fileName)}};
     }
 
-    public static ZipSource getZipSourceFrom(String fileName) throws IOException, AtlasBaseException {
-        FileInputStream fs = ZipFileResourceTestUtils.getFileInputStream(fileName);
-
-        return new ZipSource(fs);
-    }
-
-    private static ZipSource getZipSourceFrom(ByteArrayOutputStream baos) throws IOException, AtlasBaseException {
-        ByteArrayInputStream bis = new ByteArrayInputStream(baos.toByteArray());
-        ZipSource zipSource = new ZipSource(bis);
-        return zipSource;
+    public static InputStream getInputStreamFrom(String fileName) {
+        return ZipFileResourceTestUtils.getFileInputStream(fileName);
     }
 
     public static void verifyImportedEntities(List<String> creationOrder, List<String> processedEntities) {
@@ -163,7 +158,7 @@ public class ZipFileResourceTestUtils {
         }
     }
 
-    public static ZipSource runExportWithParameters(ExportService exportService, AtlasExportRequest request) {
+    public static InputStream runExportWithParameters(ExportService exportService, AtlasExportRequest request) {
         final String requestingIP = "1.0.0.0";
         final String hostName = "localhost";
         final String userName = "admin";
@@ -176,7 +171,7 @@ public class ZipFileResourceTestUtils {
             assertEquals(result.getOperationStatus(), AtlasExportResult.OperationStatus.SUCCESS);
 
             zipSink.close();
-            return getZipSourceFrom(baos);
+            return new ByteArrayInputStream(baos.toByteArray());
         }
         catch(Exception ex) {
             throw new SkipException(String.format("runExportWithParameters: %s: failed!", request.toString()));
@@ -253,27 +248,42 @@ public class ZipFileResourceTestUtils {
     }
 
 
-    public static AtlasImportResult runImportWithParameters(ImportService importService, AtlasImportRequest request, ZipSource source) throws AtlasBaseException, IOException {
+    public static AtlasImportResult runImportWithParameters(ImportService importService, AtlasImportRequest request, InputStream inputStream) throws AtlasBaseException, IOException {
+        final String requestingIP = "1.0.0.0";
+        final String hostName = "localhost";
+        final String userName = "admin";
+
+        AtlasImportResult result = importService.run(inputStream, request, userName, hostName, requestingIP);
+        assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS);
+        return result;
+    }
+
+    public static AtlasImportResult runImportWithNoParameters(ImportService importService, InputStream inputStream) throws AtlasBaseException, IOException {
         final String requestingIP = "1.0.0.0";
         final String hostName = "localhost";
         final String userName = "admin";
 
-        AtlasImportResult result = importService.run(source, request, userName, hostName, requestingIP);
+        AtlasImportResult result = importService.run(inputStream, userName, hostName, requestingIP);
         assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS);
         return result;
     }
 
-    public static AtlasImportResult runImportWithNoParameters(ImportService importService, ZipSource source) throws AtlasBaseException, IOException {
+    public static AtlasImportResult runImportWithNoParametersUsingBackingDirectory(ImportService importService, InputStream inputStream) throws AtlasBaseException, IOException {
         final String requestingIP = "1.0.0.0";
         final String hostName = "localhost";
         final String userName = "admin";
 
-        AtlasImportResult result = importService.run(source, userName, hostName, requestingIP);
+        EntityImportStream sourceWithBackingDirectory = new ZipSourceWithBackingDirectory(inputStream, Files.createTempDirectory("temp").toString());
+        AtlasImportResult result = importService.run(sourceWithBackingDirectory,  new AtlasImportRequest(), userName, hostName, requestingIP);
         assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS);
         return result;
     }
 
-    public static void runAndVerifyQuickStart_v1_Import(ImportService importService, ZipSource zipSource) throws AtlasBaseException, IOException {
+    public static void runAndVerifyQuickStart_v1_Import(ImportService importService, InputStream is) throws AtlasBaseException, IOException {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        IOUtils.copy(is, baos);
+
+        ZipSource zipSource = new ZipSource(new ByteArrayInputStream(baos.toByteArray()));
         AtlasExportResult exportResult = zipSource.getExportResult();
         List<String> creationOrder = zipSource.getCreationOrder();
 
@@ -281,7 +291,7 @@ public class ZipFileResourceTestUtils {
         RequestContextV1.get().setUser(TestUtilsV2.TEST_USER);
 
         AtlasImportRequest request = getDefaultImportRequest();
-        AtlasImportResult result = runImportWithParameters(importService, request, zipSource);
+        AtlasImportResult result = runImportWithParameters(importService, request, new ByteArrayInputStream(baos.toByteArray()));
 
         assertNotNull(result);
         verifyImportedMetrics(exportResult, result);
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java
index 7436dc0..46164e8 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java
@@ -28,6 +28,7 @@ import org.testng.annotations.Test;
 import java.io.ByteArrayInputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.List;
 
 import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource;
@@ -127,7 +128,8 @@ public class ZipSourceTest {
     }
 
     @Test(dataProvider = "sales")
-    public void iteratorSetPositionBehavor(ZipSource zipSource) throws IOException, AtlasBaseException {
+    public void iteratorSetPositionBehavor(InputStream inputStream) throws IOException, AtlasBaseException {
+        ZipSource zipSource = new ZipSource(inputStream);
         Assert.assertTrue(zipSource.hasNext());
 
         List<String> creationOrder = zipSource.getCreationOrder();
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index 8417e7e..d998bd3 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -19,7 +19,6 @@
 package org.apache.atlas.web.resources;
 
 import com.sun.jersey.multipart.FormDataParam;
-
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasErrorCode;
@@ -28,11 +27,11 @@ import org.apache.atlas.authorize.AtlasResourceTypes;
 import org.apache.atlas.authorize.simple.AtlasAuthorizationUtils;
 import org.apache.atlas.discovery.SearchContext;
 import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.impexp.AtlasServer;
 import org.apache.atlas.model.impexp.AtlasExportRequest;
 import org.apache.atlas.model.impexp.AtlasExportResult;
 import org.apache.atlas.model.impexp.AtlasImportRequest;
 import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AtlasServer;
 import org.apache.atlas.model.impexp.ExportImportAuditEntry;
 import org.apache.atlas.model.instance.AtlasCheckStateRequest;
 import org.apache.atlas.model.instance.AtlasCheckStateResult;
@@ -42,7 +41,6 @@ import org.apache.atlas.repository.impexp.ExportImportAuditService;
 import org.apache.atlas.repository.impexp.ExportService;
 import org.apache.atlas.repository.impexp.ImportService;
 import org.apache.atlas.repository.impexp.ZipSink;
-import org.apache.atlas.repository.impexp.ZipSource;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.services.MetricsService;
 import org.apache.atlas.type.AtlasType;
@@ -390,9 +388,8 @@ public class AdminResource {
 
         try {
             AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class);
-            ZipSource zipSource = new ZipSource(inputStream);
 
-            result = importService.run(zipSource, request, Servlets.getUserName(httpServletRequest),
+            result = importService.run(inputStream, request, Servlets.getUserName(httpServletRequest),
                     Servlets.getHostName(httpServletRequest),
                     AtlasAuthorizationUtils.getRequestIpAddress(httpServletRequest));
         } catch (AtlasBaseException excp) {