You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2019/09/10 22:10:13 UTC
[atlas] branch master updated: ATLAS-3396:
ZipSourceWithBackingDirectory: Implementation. Port to master.
This is an automated email from the ASF dual-hosted git repository.
amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 8bad6b0 ATLAS-3396: ZipSourceWithBackingDirectory: Implementation. Port to master.
8bad6b0 is described below
commit 8bad6b0b6724f15ec6173769bccd33184d842f8e
Author: Ashutosh Mestry <am...@hortonworks.com>
AuthorDate: Tue Sep 10 15:10:00 2019 -0700
ATLAS-3396: ZipSourceWithBackingDirectory: Implementation. Port to master.
---
distro/src/conf/atlas-application.properties | 3 +
docs/src/site/twiki/Import-API-Options.twiki | 11 +
.../java/org/apache/atlas/AtlasConfiguration.java | 4 +-
.../atlas/repository/impexp/ImportService.java | 58 ++--
.../apache/atlas/repository/impexp/ZipSource.java | 10 +-
.../impexp/ZipSourceWithBackingDirectory.java | 371 +++++++++++++++++++++
.../store/graph/v2/AtlasEntityStreamForImport.java | 52 +++
.../store/graph/v2/BulkImporterImpl.java | 11 +-
.../store/graph/v2/EntityImportStream.java | 28 ++
.../repository/impexp/ExportIncrementalTest.java | 68 ++--
.../repository/impexp/ExportSkipLineageTest.java | 7 +-
.../atlas/repository/impexp/ImportServiceTest.java | 68 ++--
.../impexp/ImportTransformsShaperTest.java | 2 +-
.../RelationshipAttributesExtractorTest.java | 5 +-
.../impexp/ReplicationEntityAttributeTest.java | 22 +-
.../impexp/ZipFileResourceTestUtils.java | 49 +--
.../atlas/repository/impexp/ZipSourceTest.java | 4 +-
.../ClassificationPropagationTest.java | 6 +-
.../apache/atlas/services/MetricsServiceTest.java | 6 +-
.../apache/atlas/web/resources/AdminResource.java | 4 +-
20 files changed, 670 insertions(+), 119 deletions(-)
diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties
index 471424b..7846452 100755
--- a/distro/src/conf/atlas-application.properties
+++ b/distro/src/conf/atlas-application.properties
@@ -82,6 +82,9 @@ ${graph.index.properties}
# Solr-specific configuration property
atlas.graph.index.search.max-result-set-size=150
+######### Import Configs #########
+#atlas.import.temp.directory=/temp/import
+
######### Notification Configs #########
atlas.notification.embedded=true
atlas.kafka.data=${sys:atlas.home}/data/kafka
diff --git a/docs/src/site/twiki/Import-API-Options.twiki b/docs/src/site/twiki/Import-API-Options.twiki
index 4004e70..7f90475 100644
--- a/docs/src/site/twiki/Import-API-Options.twiki
+++ b/docs/src/site/twiki/Import-API-Options.twiki
@@ -26,6 +26,7 @@ Following options are supported for Import process:
* Specify transforms during import operation.
* Resume import by specifying starting entity guid.
* Optionally import type definition.
+ * Handling large imports.
---++++ Transforms
@@ -133,3 +134,13 @@ curl -g -X POST -u adminuser:password -H "Content-Type: application/json"
-d r@importOptions.json
"http://localhost:21000/api/atlas/admin/importfile"
</verbatim>
+
+---++++ Handling Large Imports
+
+By default, the Import Service stores all of the data in memory. This may be limiting for ZIPs containing large amount of data.
+
+To configure temporary directory use the application property _atlas.import.temp.directory_. If this property is left blank, default in-memory implementation is used.
+
+Please ensure that there is sufficient disk space available for the operation.
+
+The contents of the directory created as backing store for the import operation will be erased after the operation is over.
diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index 7b71e51..345b105 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -55,7 +55,9 @@ public enum AtlasConfiguration {
//search configuration
SEARCH_MAX_LIMIT("atlas.search.maxlimit", 10000),
- SEARCH_DEFAULT_LIMIT("atlas.search.defaultlimit", 100);
+ SEARCH_DEFAULT_LIMIT("atlas.search.defaultlimit", 100),
+
+ IMPORT_TEMP_DIRECTORY("atlas.import.temp.directory", "");
private static final Configuration APPLICATION_PROPERTIES;
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index 3ded798..df49ae1 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -18,6 +18,7 @@
package org.apache.atlas.repository.impexp;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.entitytransform.BaseEntityHandler;
@@ -27,22 +28,23 @@ import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.graph.BulkImporter;
+import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
-import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
-import java.io.ByteArrayInputStream;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
import java.util.List;
import static org.apache.atlas.model.impexp.AtlasImportRequest.TRANSFORMERS_KEY;
@@ -72,18 +74,25 @@ public class ImportService {
this.importTransformsShaper = importTransformsShaper;
}
- public AtlasImportResult run(ZipSource source, String userName,
+ public AtlasImportResult run(InputStream inputStream, String userName,
String hostName, String requestingIP) throws AtlasBaseException {
- return run(source, null, userName, hostName, requestingIP);
+ return run(inputStream, null, userName, hostName, requestingIP);
}
- public AtlasImportResult run(ZipSource source, AtlasImportRequest request, String userName,
+ public AtlasImportResult run(InputStream inputStream, AtlasImportRequest request, String userName,
String hostName, String requestingIP) throws AtlasBaseException {
if (request == null) {
request = new AtlasImportRequest();
}
+ EntityImportStream source = createZipSource(inputStream, AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
+ return run(source, request, userName, hostName, requestingIP);
+ }
+
+ @VisibleForTesting
+ AtlasImportResult run(EntityImportStream source, AtlasImportRequest request, String userName,
+ String hostName, String requestingIP) throws AtlasBaseException {
AtlasImportResult result = new AtlasImportResult(request, userName, requestingIP, hostName, System.currentTimeMillis());
try {
@@ -112,7 +121,10 @@ public class ImportService {
} finally {
RequestContext.get().setImportInProgress(false);
- source.close();
+ if (source != null) {
+ source.close();
+ }
+
LOG.info("<== import(user={}, from={}): status={}", userName, requestingIP, result.getOperationStatus());
}
@@ -120,7 +132,7 @@ public class ImportService {
}
@VisibleForTesting
- void setImportTransform(ZipSource source, String transforms) throws AtlasBaseException {
+ void setImportTransform(EntityImportStream source, String transforms) throws AtlasBaseException {
ImportTransforms importTransform = ImportTransforms.fromJson(transforms);
if (importTransform == null) {
return;
@@ -132,11 +144,10 @@ public class ImportService {
if(LOG.isDebugEnabled()) {
debugLog(" => transforms: {}", AtlasType.toJson(importTransform));
}
-
}
@VisibleForTesting
- void setEntityTransformerHandlers(ZipSource source, String transformersJson) throws AtlasBaseException {
+ void setEntityTransformerHandlers(EntityImportStream source, String transformersJson) throws AtlasBaseException {
if (StringUtils.isEmpty(transformersJson)) {
return;
}
@@ -156,7 +167,7 @@ public class ImportService {
LOG.debug(s, params);
}
- private void setStartPosition(AtlasImportRequest request, ZipSource source) throws AtlasBaseException {
+ private void setStartPosition(AtlasImportRequest request, EntityImportStream source) throws AtlasBaseException {
if (request.getStartGuid() != null) {
source.setPositionUsingEntityGuid(request.getStartGuid());
} else if (request.getStartPosition() != null) {
@@ -164,8 +175,7 @@ public class ImportService {
}
}
- public AtlasImportResult run(AtlasImportRequest request, String userName, String hostName, String requestingIP)
- throws AtlasBaseException {
+ public AtlasImportResult run(AtlasImportRequest request, String userName, String hostName, String requestingIP) throws AtlasBaseException {
String fileName = request.getFileName();
if (StringUtils.isBlank(fileName)) {
@@ -173,14 +183,11 @@ public class ImportService {
}
AtlasImportResult result = null;
-
try {
LOG.info("==> import(user={}, from={}, fileName={})", userName, requestingIP, fileName);
- String transforms = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(TRANSFORMS_KEY) : null;
File file = new File(fileName);
- ZipSource source = new ZipSource(new ByteArrayInputStream(FileUtils.readFileToByteArray(file)), ImportTransforms.fromJson(transforms));
- result = run(source, request, userName, hostName, requestingIP);
+ result = run(new FileInputStream(file), request, userName, hostName, requestingIP);
} catch (AtlasBaseException excp) {
LOG.error("import(user={}, from={}, fileName={}): failed", userName, requestingIP, excp);
@@ -189,10 +196,6 @@ public class ImportService {
LOG.error("import(user={}, from={}, fileName={}): file not found", userName, requestingIP, excp);
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, fileName + ": file not found");
- } catch (IOException excp) {
- LOG.error("import(user={}, from={}, fileName={}): cannot read file", userName, requestingIP, excp);
-
- throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, fileName + ": cannot read file");
} catch (Exception excp) {
LOG.error("import(user={}, from={}, fileName={}): failed", userName, requestingIP, excp);
@@ -214,7 +217,7 @@ public class ImportService {
importTypeDefProcessor.processTypes(typeDefinitionMap, result);
}
- private void processEntities(String userName, ZipSource importSource, AtlasImportResult result) throws AtlasBaseException {
+ private void processEntities(String userName, EntityImportStream importSource, AtlasImportResult result) throws AtlasBaseException {
result.setExportResult(importSource.getExportResult());
this.bulkImporter.bulkImport(importSource, result);
@@ -228,4 +231,17 @@ public class ImportService {
private int getDuration(long endTime, long startTime) {
return (int) (endTime - startTime);
}
+
+ private EntityImportStream createZipSource(InputStream inputStream, String configuredTemporaryDirectory) throws AtlasBaseException {
+ try {
+ if (StringUtils.isEmpty(configuredTemporaryDirectory)) {
+ return new ZipSource(inputStream);
+ }
+
+ return new ZipSourceWithBackingDirectory(inputStream, configuredTemporaryDirectory);
+ }
+ catch (IOException ex) {
+ throw new AtlasBaseException(ex);
+ }
+ }
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
index 1ce96a8..812add9 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
@@ -82,20 +82,25 @@ public class ZipSource implements EntityImportStream {
guidEntityJsonMap.get(key).equals("[]"));
}
+ @Override
public ImportTransforms getImportTransform() { return this.importTransform; }
+ @Override
public void setImportTransform(ImportTransforms importTransform) {
this.importTransform = importTransform;
}
+ @Override
public List<BaseEntityHandler> getEntityHandlers() {
return entityHandlers;
}
+ @Override
public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) {
this.entityHandlers = entityHandlers;
}
+ @Override
public AtlasTypesDef getTypesDef() throws AtlasBaseException {
final String fileName = ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString();
@@ -103,6 +108,7 @@ public class ZipSource implements EntityImportStream {
return convertFromJson(AtlasTypesDef.class, s);
}
+ @Override
public AtlasExportResult getExportResult() throws AtlasBaseException {
final String fileName = ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toString();
@@ -147,6 +153,7 @@ public class ZipSource implements EntityImportStream {
zipInputStream.close();
}
+ @Override
public List<String> getCreationOrder() {
return this.creationOrder;
}
@@ -210,6 +217,7 @@ public class ZipSource implements EntityImportStream {
return s;
}
+ @Override
public void close() {
try {
inputStream.close();
@@ -284,7 +292,7 @@ public class ZipSource implements EntityImportStream {
currentPosition = index;
reset();
for (int i = 0; i < creationOrder.size() && i <= index; i++) {
- iterator.next();
+ onImportComplete(iterator.next());
}
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceWithBackingDirectory.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceWithBackingDirectory.java
new file mode 100644
index 0000000..7963800
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceWithBackingDirectory.java
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.entitytransform.BaseEntityHandler;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportResult;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
+import org.apache.atlas.type.AtlasType;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+import static org.apache.atlas.AtlasErrorCode.IMPORT_ATTEMPTING_EMPTY_ZIP;
+
+public class ZipSourceWithBackingDirectory implements EntityImportStream {
+ private static final Logger LOG = LoggerFactory.getLogger(ZipSourceWithBackingDirectory.class);
+ private static final String TEMPORARY_DIRECTORY_PREFIX = "atlas-import-temp-";
+ private static final String EXT_JSON = ".json";
+
+ private Path tempDirectory;
+
+ private ImportTransforms importTransform;
+ private List<BaseEntityHandler> entityHandlers;
+
+ private ArrayList<String> creationOrder = new ArrayList<>();
+ private int currentPosition;
+ private int numberOfEntries;
+
+ public ZipSourceWithBackingDirectory(InputStream inputStream) throws IOException, AtlasBaseException {
+ this(inputStream, null);
+ }
+
+ public ZipSourceWithBackingDirectory(InputStream inputStream, String backingDirectory) throws IOException, AtlasBaseException {
+ setupBackingStore(inputStream, backingDirectory);
+ if (isZipFileEmpty()) {
+ throw new AtlasBaseException(IMPORT_ATTEMPTING_EMPTY_ZIP, "Attempting to import empty ZIP.");
+ }
+ }
+
+ @Override
+ public ImportTransforms getImportTransform() { return this.importTransform; }
+
+ @Override
+ public void setImportTransform(ImportTransforms importTransform) {
+ this.importTransform = importTransform;
+ }
+
+ @Override
+ public List<BaseEntityHandler> getEntityHandlers() {
+ return entityHandlers;
+ }
+
+ @Override
+ public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) {
+ this.entityHandlers = entityHandlers;
+ }
+
+ @Override
+ public AtlasTypesDef getTypesDef() throws AtlasBaseException {
+ return getJsonFromEntry(ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString(), AtlasTypesDef.class);
+ }
+
+ @Override
+ public AtlasExportResult getExportResult() throws AtlasBaseException {
+ return getJsonFromEntry(ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toString(), AtlasExportResult.class);
+ }
+
+ @Override
+ public List<String> getCreationOrder() {
+ return creationOrder;
+ }
+
+ @Override
+ public int getPosition() {
+ return currentPosition;
+ }
+
+ @Override
+ public AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException {
+ final File file = getFileFromTemporaryDirectory(guid + EXT_JSON);
+ if (!file.exists()) {
+ return null;
+ }
+
+ String json = getJsonStringForFile(file);
+ if (StringUtils.isEmpty(json)) {
+ return null;
+ }
+
+ AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = convertFromJson(AtlasEntity.AtlasEntityWithExtInfo.class, json);
+
+ if (importTransform != null) {
+ entityWithExtInfo = importTransform.apply(entityWithExtInfo);
+ }
+
+ if (entityHandlers != null) {
+ applyTransformers(entityWithExtInfo);
+ }
+
+ return entityWithExtInfo;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return (currentPosition < numberOfEntries);
+ }
+
+ @Override
+ public AtlasEntity next() {
+ AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = getNextEntityWithExtInfo();
+
+ return entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
+ }
+
+ @Override
+ public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
+ try {
+ return getEntityWithExtInfo(moveNext());
+ } catch (AtlasBaseException e) {
+ LOG.error("getNextEntityWithExtInfo", e);
+ return null;
+ }
+ }
+
+ @Override
+ public void reset() {
+ currentPosition = 0;
+ }
+
+ @Override
+ public AtlasEntity getByGuid(String guid) {
+ try {
+ return getEntity(guid);
+ } catch (AtlasBaseException e) {
+ LOG.error("getByGuid: {} failed!", guid, e);
+ return null;
+ }
+ }
+
+
+ @Override
+ public void onImportComplete(String guid) {
+ getFileFromTemporaryDirectory(guid + EXT_JSON).delete();
+ }
+
+ @Override
+ public void setPosition(int index) {
+ reset();
+ for (int i = 0; i < numberOfEntries && i <= index; i++) {
+ onImportComplete(moveNext());
+ }
+ }
+
+ @Override
+ public void setPositionUsingEntityGuid(String guid) {
+ if (StringUtils.isEmpty(guid)) {
+ return;
+ }
+
+ String current;
+ while (currentPosition < numberOfEntries) {
+ current = creationOrder.get(currentPosition);
+ if (current.equals(guid)) {
+ return;
+ }
+
+ moveNext();
+ }
+ }
+
+ @Override
+ public void close() {
+ creationOrder.clear();
+ try {
+ LOG.error("Import: Removing temporary directory: {}", tempDirectory.toString());
+ FileUtils.deleteDirectory(tempDirectory.toFile());
+ } catch (IOException e) {
+ LOG.error("Import: Error deleting: {}", tempDirectory.toString(), e);
+ }
+ }
+
+ private boolean isZipFileEmpty() {
+ return (numberOfEntries == 0);
+ }
+
+ private <T> T getJsonFromEntry(String entryName, Class<T> clazz) throws AtlasBaseException {
+ final File file = getFileFromTemporaryDirectory(entryName + EXT_JSON);
+ if (!file.exists()) {
+ throw new AtlasBaseException(entryName + " not found!");
+ }
+
+ return convertFromJson(clazz, getJsonStringForFile(file));
+ }
+
+ private void setupBackingStore(InputStream inputStream, String backingDirectory) throws AtlasBaseException, IOException {
+ initTempDirectory(backingDirectory);
+ unzipToTempDirectory(inputStream);
+ setupIterator();
+ }
+
+ private void initTempDirectory(String backingDirectory) throws AtlasBaseException {
+ try {
+ tempDirectory = Files.createDirectory(Paths.get(backingDirectory, getChildDirectoryForSession()));
+ if (!permissionChecks(tempDirectory.toFile())) {
+ throw new AtlasBaseException(
+ String.format("Import: Temporary directory: %s does not have permissions for operation!", tempDirectory.toString()));
+ }
+ }
+ catch(Exception ex) {
+ throw new AtlasBaseException(String.format("Error fetching temporary directory: %s", tempDirectory.toString()), ex);
+ }
+ }
+
+ private String getChildDirectoryForSession() {
+ return String.format("%s%s", TEMPORARY_DIRECTORY_PREFIX, UUID.randomUUID());
+ }
+
+ private boolean permissionChecks(File f) {
+ return f.exists() && f.isDirectory() && f.canWrite();
+ }
+
+ private void unzipToTempDirectory(InputStream inputStream) throws IOException {
+ LOG.info("Import: Temporary directory: {}", tempDirectory.toString());
+
+ ZipInputStream zis = new ZipInputStream(inputStream);
+ try {
+ ZipEntry zipEntry = zis.getNextEntry();
+ while (zipEntry != null) {
+ String entryName = zipEntry.getName();
+
+ writeJsonToFile(entryName, getJsonPayloadFromZipEntryStream(zis));
+ numberOfEntries++;
+
+ zipEntry = zis.getNextEntry();
+ }
+
+ numberOfEntries -= ZipExportFileNames.values().length;
+ }
+ finally {
+ zis.close();
+ inputStream.close();
+ }
+ }
+
+ private void writeJsonToFile(String entryName, byte[] jsonPayload) throws IOException {
+ File f = getFileFromTemporaryDirectory(entryName);
+ Files.write(f.toPath(), jsonPayload);
+ }
+
+ private File getFileFromTemporaryDirectory(String entryName) {
+ return new File(tempDirectory.toFile(), entryName);
+ }
+
+ private void setupIterator() {
+ try {
+ creationOrder = getJsonFromEntry(ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString(), ArrayList.class);
+ } catch (AtlasBaseException e) {
+ LOG.error("Error fetching: {}. Error generating order.", ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString(), e);
+ }
+
+ reset();
+ }
+
+ private byte[] getJsonPayloadFromZipEntryStream(ZipInputStream zipInputStream) {
+ try {
+ byte[] buf = new byte[1024];
+
+ int n = 0;
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ while ((n = zipInputStream.read(buf, 0, 1024)) > -1) {
+ bos.write(buf, 0, n);
+ }
+
+ return bos.toByteArray();
+ } catch (IOException ex) {
+ LOG.error("Error fetching string from entry.", ex);
+ }
+
+ return null;
+ }
+
+ private String getJsonStringForFile(File file) {
+ try {
+ byte[] bytes = Files.readAllBytes(file.toPath());
+ return new String(bytes);
+ } catch (IOException e) {
+ LOG.warn("Error fetching: {}", file.toString(), e);
+ return null;
+ }
+ }
+
+ private void applyTransformers(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
+ if (entityWithExtInfo == null) {
+ return;
+ }
+
+ transform(entityWithExtInfo.getEntity());
+
+ if (MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) {
+ for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
+ transform(e);
+ }
+ }
+ }
+
+ private void transform(AtlasEntity e) {
+ for (BaseEntityHandler handler : entityHandlers) {
+ handler.transform(e);
+ }
+ }
+
+ private <T> T convertFromJson(Class<T> clazz, String jsonData) throws AtlasBaseException {
+ try {
+ return AtlasType.fromJson(jsonData, clazz);
+
+ } catch (Exception e) {
+ throw new AtlasBaseException("Error converting file to JSON.", e);
+ }
+ }
+
+ private AtlasEntity getEntity(String guid) throws AtlasBaseException {
+ AtlasEntity.AtlasEntityWithExtInfo extInfo = getEntityWithExtInfo(guid);
+ return (extInfo != null) ? extInfo.getEntity() : null;
+ }
+
+ public int size() {
+ return numberOfEntries;
+ }
+
+ private String moveNext() {
+ if (currentPosition < numberOfEntries) {
+ return creationOrder.get(currentPosition++);
+ }
+
+ return null;
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStreamForImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStreamForImport.java
index 6bf962e..5ad9d60 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStreamForImport.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStreamForImport.java
@@ -17,8 +17,15 @@
*/
package org.apache.atlas.repository.store.graph.v2;
+import org.apache.atlas.entitytransform.BaseEntityHandler;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.repository.impexp.ImportTransforms;
+
+import java.util.List;
public class AtlasEntityStreamForImport extends AtlasEntityStream implements EntityImportStream {
private int currentPosition = 0;
@@ -36,6 +43,11 @@ public class AtlasEntityStreamForImport extends AtlasEntityStream implements Ent
}
@Override
+ public AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException {
+ return null;
+ }
+
+ @Override
public AtlasEntity getByGuid(String guid) {
AtlasEntity ent = super.entitiesWithExtInfo.getEntity(guid);
@@ -69,4 +81,44 @@ public class AtlasEntityStreamForImport extends AtlasEntityStream implements Ent
public void onImportComplete(String guid) {
}
+
+ @Override
+ public void setImportTransform(ImportTransforms importTransform) {
+
+ }
+
+ @Override
+ public ImportTransforms getImportTransform() {
+ return null;
+ }
+
+ @Override
+ public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) {
+
+ }
+
+ @Override
+ public List<BaseEntityHandler> getEntityHandlers() {
+ return null;
+ }
+
+ @Override
+ public AtlasTypesDef getTypesDef() throws AtlasBaseException {
+ return null;
+ }
+
+ @Override
+ public AtlasExportResult getExportResult() throws AtlasBaseException {
+ return null;
+ }
+
+ @Override
+ public List<String> getCreationOrder() {
+ return null;
+ }
+
+ @Override
+ public void close() {
+
+ }
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
index 2f330c0..54c32c5 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
@@ -18,6 +18,7 @@
package org.apache.atlas.repository.store.graph.v2;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.annotation.GraphTransaction;
@@ -57,12 +58,14 @@ public class BulkImporterImpl implements BulkImporter {
private final EntityGraphRetriever entityGraphRetriever;
private AtlasTypeRegistry typeRegistry;
private final int MAX_ATTEMPTS = 2;
+ private boolean directoryBasedImportConfigured;
@Inject
public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
this.entityStore = entityStore;
this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
this.typeRegistry = typeRegistry;
+ this.directoryBasedImportConfigured = StringUtils.isNotEmpty(AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
}
@Override
@@ -205,9 +208,11 @@ public class BulkImporterImpl implements BulkImporter {
AtlasImportResult importResult,
Set<String> processedGuids,
int currentIndex, int streamSize, float currentPercent) {
- updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
- updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
- updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
+ if (!directoryBasedImportConfigured) {
+ updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
+ updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
+ updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
+ }
String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid());
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityImportStream.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityImportStream.java
index cf7ac28..c43a04e 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityImportStream.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityImportStream.java
@@ -18,17 +18,45 @@
package org.apache.atlas.repository.store.graph.v2;
+import org.apache.atlas.entitytransform.BaseEntityHandler;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportResult;
+import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.repository.impexp.ImportTransforms;
+
+import java.util.List;
public interface EntityImportStream extends EntityStream {
int size();
+
void setPosition(int position);
+
int getPosition();
void setPositionUsingEntityGuid(String guid);
AtlasEntityWithExtInfo getNextEntityWithExtInfo();
+ AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException;
+
void onImportComplete(String guid);
+
+ void setImportTransform(ImportTransforms importTransform);
+
+ public ImportTransforms getImportTransform();
+
+ void setEntityHandlers(List<BaseEntityHandler> entityHandlers);
+
+ List<BaseEntityHandler> getEntityHandlers();
+
+ AtlasTypesDef getTypesDef() throws AtlasBaseException;
+
+ AtlasExportResult getExportResult() throws AtlasBaseException;
+
+ List<String> getCreationOrder();
+
+ void close();
}
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
index 7aeb6a7..4d43852 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
@@ -33,16 +33,20 @@ import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.TestResourceFileUtils;
+import org.apache.commons.io.IOUtils;
import org.testng.ITestContext;
import org.testng.SkipException;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
-import org.testng.annotations.DataProvider;
import javax.inject.Inject;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -52,8 +56,8 @@ import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREM
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createTypes;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getEntities;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource;
-import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runExportWithParameters;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@@ -107,11 +111,13 @@ public class ExportIncrementalTest extends ExportImportTestBase {
}
@Test
- public void atT0_ReturnsAllEntities() throws AtlasBaseException {
+ public void atT0_ReturnsAllEntities() throws AtlasBaseException, IOException {
final int expectedEntityCount = 2;
AtlasExportRequest request = getIncrementalRequest(0);
- ZipSource source = runExportWithParameters(exportService, request);
+ InputStream inputStream = runExportWithParameters(exportService, request);
+
+ ZipSource source = getZipSourceFromInputStream(inputStream);
AtlasEntity.AtlasEntityWithExtInfo entities = getEntities(source, expectedEntityCount);
int count = 0;
@@ -129,13 +135,15 @@ public class ExportIncrementalTest extends ExportImportTestBase {
}
@Test(dependsOnMethods = "atT0_ReturnsAllEntities")
- public void atT1_NewClassificationAttachedToTable_ReturnsChangedTable() throws AtlasBaseException {
+ public void atT1_NewClassificationAttachedToTable_ReturnsChangedTable() throws AtlasBaseException, IOException {
final int expectedEntityCount = 1;
entityStore.addClassifications(TABLE_GUID, ImmutableList.of(classificationTypeT1.createDefaultValue()));
AtlasExportRequest request = getIncrementalRequest(nextTimestamp);
- ZipSource source = runExportWithParameters(exportService, request);
+ InputStream inputStream = runExportWithParameters(exportService, request);
+
+ ZipSource source = getZipSourceFromInputStream(inputStream);
AtlasEntity.AtlasEntityWithExtInfo entities = getEntities(source, expectedEntityCount);
AtlasEntity entity = null;
@@ -155,7 +163,7 @@ public class ExportIncrementalTest extends ExportImportTestBase {
}
@Test(dependsOnMethods = "atT1_NewClassificationAttachedToTable_ReturnsChangedTable")
- public void atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn() throws AtlasBaseException {
+ public void atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn() throws AtlasBaseException, IOException {
final int expectedEntityCount = 1;
AtlasEntity.AtlasEntityWithExtInfo tableEntity = entityStore.getById(TABLE_GUID);
@@ -163,7 +171,9 @@ public class ExportIncrementalTest extends ExportImportTestBase {
entityStore.addClassifications(COLUMN_GUID_HIGH, ImmutableList.of(typeRegistry.getClassificationTypeByName("T1").createDefaultValue()));
- ZipSource source = runExportWithParameters(exportService, getIncrementalRequest(nextTimestamp));
+ InputStream inputStream = runExportWithParameters(exportService, getIncrementalRequest(nextTimestamp));
+
+ ZipSource source = getZipSourceFromInputStream(inputStream);
AtlasEntity.AtlasEntityWithExtInfo entities = getEntities(source, expectedEntityCount);
for (Map.Entry<String, AtlasEntity> entry : entities.getReferredEntities().entrySet()) {
@@ -176,17 +186,26 @@ public class ExportIncrementalTest extends ExportImportTestBase {
assertEquals(preExportTableEntityTimestamp, postUpdateTableEntityTimestamp);
}
+ private ZipSource getZipSourceFromInputStream(InputStream inputStream) {
+ try {
+ return new ZipSource(inputStream);
+ } catch (IOException | AtlasBaseException e) {
+ return null;
+ }
+ }
+
@Test(dependsOnMethods = "atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn")
public void exportingWithSameParameters_Succeeds() {
- ZipSource source = runExportWithParameters(exportService, getIncrementalRequest(nextTimestamp));
+ InputStream inputStream = runExportWithParameters(exportService, getIncrementalRequest(nextTimestamp));
- assertNotNull(source);
+ assertNotNull(getZipSourceFromInputStream(inputStream));
}
@Test
public void connectedExport() {
- ZipSource source = runExportWithParameters(exportService, getConnected());
+ InputStream inputStream = runExportWithParameters(exportService, getConnected());
+ ZipSource source = getZipSourceFromInputStream(inputStream);
UniqueList<String> creationOrder = new UniqueList<>();
List<String> zipCreationOrder = source.getCreationOrder();
creationOrder.addAll(zipCreationOrder);
@@ -200,27 +219,29 @@ public class ExportIncrementalTest extends ExportImportTestBase {
}
@Test(dataProvider = "hiveDb")
- public void importHiveDb(ZipSource zipSource) throws AtlasBaseException, IOException {
- runImportWithNoParameters(importService, zipSource);
+ public void importHiveDb(InputStream stream) throws AtlasBaseException, IOException {
+ runImportWithNoParameters(importService, stream);
}
@Test(dependsOnMethods = "importHiveDb")
- public void exportTableInrementalConnected() throws AtlasBaseException {
- ZipSource source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, 0, true));
- verifyExpectedEntities(getFileNames(source), GUID_DB, GUID_TABLE_CTAS_2);
+ public void exportTableInrementalConnected() throws AtlasBaseException, IOException {
+ InputStream source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, 0, true));
- nextTimestamp = updateTimesampForNextIncrementalExport(source);
+ ZipSource sourceCopy = getZipSourceCopy(source);
+ verifyExpectedEntities(getFileNames(sourceCopy), GUID_DB, GUID_TABLE_CTAS_2);
+
+ nextTimestamp = updateTimesampForNextIncrementalExport(sourceCopy);
try {
source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, nextTimestamp, true));
- }catch (SkipException e){
-
+ } catch (SkipException e) {
+ throw e;
}
entityStore.addClassifications(GUID_TABLE_CTAS_2, ImmutableList.of(classificationTypeT1.createDefaultValue()));
source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, nextTimestamp, true));
- verifyExpectedEntities(getFileNames(source), GUID_TABLE_CTAS_2);
+ verifyExpectedEntities(getFileNames(getZipSourceCopy(source)), GUID_TABLE_CTAS_2);
}
@@ -281,4 +302,11 @@ public class ExportIncrementalTest extends ExportImportTestBase {
}
return ret;
}
+
+ private ZipSource getZipSourceCopy(InputStream is) throws IOException, AtlasBaseException {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ IOUtils.copy(is, baos);
+
+ return new ZipSource(new ByteArrayInputStream(baos.toByteArray()));
+ }
}
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java
index 18b4a30..25e0a53 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java
@@ -40,6 +40,7 @@ import org.testng.annotations.Test;
import javax.inject.Inject;
import java.io.IOException;
+import java.io.InputStream;
import java.util.Map;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel;
@@ -87,11 +88,13 @@ public class ExportSkipLineageTest extends ExportImportTestBase {
}
@Test
- public void exportWithoutLineage() {
+ public void exportWithoutLineage() throws IOException, AtlasBaseException {
final int expectedEntityCount = 3;
AtlasExportRequest request = getRequest();
- ZipSource source = runExportWithParameters(exportService, request);
+ InputStream inputStream = runExportWithParameters(exportService, request);
+
+ ZipSource source = new ZipSource(inputStream);
AtlasEntity.AtlasEntityWithExtInfo entities = ZipFileResourceTestUtils.getEntities(source, expectedEntityCount);
int count = 0;
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
index 7044243..33fe0ad 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
@@ -52,6 +52,7 @@ import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.IOException;
+import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -74,7 +75,6 @@ import static org.testng.Assert.assertTrue;
@Guice(modules = TestModules.TestOnlyModule.class)
public class ImportServiceTest extends ExportImportTestBase {
- private static final Logger LOG = LoggerFactory.getLogger(ImportServiceTest.class);
private static final int DEFAULT_LIMIT = 25;
private final ImportService importService;
@@ -124,9 +124,9 @@ public class ImportServiceTest extends ExportImportTestBase {
}
@Test(dataProvider = "sales")
- public void importDB1(ZipSource zipSource) throws AtlasBaseException, IOException {
+ public void importDB1(InputStream inputStream) throws AtlasBaseException, IOException {
loadBaseModel();
- runAndVerifyQuickStart_v1_Import(importService, zipSource);
+ runAndVerifyQuickStart_v1_Import(importService, inputStream);
assertEntityCount("DB_v1", "bfe88eb8-7556-403c-8210-647013f44a44", 1);
@@ -141,9 +141,9 @@ public class ImportServiceTest extends ExportImportTestBase {
}
@Test(dataProvider = "reporting")
- public void importDB2(ZipSource zipSource) throws AtlasBaseException, IOException {
+ public void importDB2(InputStream inputStream) throws AtlasBaseException, IOException {
loadBaseModel();
- runAndVerifyQuickStart_v1_Import(importService, zipSource);
+ runAndVerifyQuickStart_v1_Import(importService, inputStream);
}
@DataProvider(name = "logging")
@@ -152,9 +152,9 @@ public class ImportServiceTest extends ExportImportTestBase {
}
@Test(dataProvider = "logging")
- public void importDB3(ZipSource zipSource) throws AtlasBaseException, IOException {
+ public void importDB3(InputStream inputStream) throws AtlasBaseException, IOException {
loadBaseModel();
- runAndVerifyQuickStart_v1_Import(importService, zipSource);
+ runAndVerifyQuickStart_v1_Import(importService, inputStream);
}
@DataProvider(name = "salesNewTypeAttrs")
@@ -163,9 +163,9 @@ public class ImportServiceTest extends ExportImportTestBase {
}
@Test(dataProvider = "salesNewTypeAttrs", dependsOnMethods = "importDB1")
- public void importDB4(ZipSource zipSource) throws AtlasBaseException, IOException {
+ public void importDB4(InputStream inputStream) throws AtlasBaseException, IOException {
loadBaseModel();
- runImportWithParameters(importService, getDefaultImportRequest(), zipSource);
+ runImportWithParameters(importService, getDefaultImportRequest(), inputStream);
}
@DataProvider(name = "salesNewTypeAttrs-next")
@@ -174,7 +174,7 @@ public class ImportServiceTest extends ExportImportTestBase {
}
@Test(dataProvider = "salesNewTypeAttrs-next", dependsOnMethods = "importDB4")
- public void importDB5(ZipSource zipSource) throws AtlasBaseException, IOException {
+ public void importDB5(InputStream inputStream) throws AtlasBaseException, IOException {
final String newEnumDefName = "database_action";
assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName));
@@ -184,13 +184,13 @@ public class ImportServiceTest extends ExportImportTestBase {
options.put("updateTypeDefinition", "false");
request.setOptions(options);
- runImportWithParameters(importService, request, zipSource);
+ runImportWithParameters(importService, request, inputStream);
assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName));
assertEquals(typeDefStore.getEnumDefByName(newEnumDefName).getElementDefs().size(), 4);
}
@Test(dataProvider = "salesNewTypeAttrs-next", dependsOnMethods = "importDB4")
- public void importDB6(ZipSource zipSource) throws AtlasBaseException, IOException {
+ public void importDB6(InputStream inputStream) throws AtlasBaseException, IOException {
final String newEnumDefName = "database_action";
assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName));
@@ -200,7 +200,7 @@ public class ImportServiceTest extends ExportImportTestBase {
options.put("updateTypeDefinition", "true");
request.setOptions(options);
- runImportWithParameters(importService, request, zipSource);
+ runImportWithParameters(importService, request, inputStream);
assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName));
assertEquals(typeDefStore.getEnumDefByName(newEnumDefName).getElementDefs().size(), 8);
}
@@ -211,11 +211,11 @@ public class ImportServiceTest extends ExportImportTestBase {
}
@Test(dataProvider = "ctas")
- public void importCTAS(ZipSource zipSource) throws IOException, AtlasBaseException {
+ public void importCTAS(InputStream inputStream) throws IOException, AtlasBaseException {
loadBaseModel();
loadHiveModel();
- runImportWithNoParameters(importService, zipSource);
+ runImportWithNoParameters(importService, inputStream);
}
@DataProvider(name = "stocks-legacy")
@@ -224,12 +224,12 @@ public class ImportServiceTest extends ExportImportTestBase {
}
@Test(dataProvider = "stocks-legacy")
- public void importLegacy(ZipSource zipSource) throws IOException, AtlasBaseException {
+ public void importLegacy(InputStream inputStream) throws IOException, AtlasBaseException {
loadBaseModel();
loadFsModel();
loadHiveModel();
- runImportWithNoParameters(importService, zipSource);
+ runImportWithNoParameters(importService, inputStream);
List<AtlasEntityHeader> result = getImportedEntities("hive_db", "886c5e9c-3ac6-40be-8201-fb0cebb64783");
assertEquals(result.size(), 1);
@@ -244,12 +244,12 @@ public class ImportServiceTest extends ExportImportTestBase {
}
@Test(dataProvider = "tag-prop-2")
- public void importTagProp2(ZipSource zipSource) throws IOException, AtlasBaseException {
+ public void importTagProp2(InputStream inputStream) throws IOException, AtlasBaseException {
loadBaseModel();
loadFsModel();
loadHiveModel();
- runImportWithNoParameters(importService, zipSource);
+ runImportWithNoParameters(importService, inputStream);
assertEntityCount("hive_db", "7d7d5a18-d992-457e-83c0-e36f5b95ebdb", 1);
assertEntityCount("hive_table", "dbe729bb-c614-4e23-b845-3258efdf7a58", 1);
AtlasEntity entity = assertEntity("hive_table", "092e9888-de96-4908-8be3-925ee72e3395");
@@ -260,7 +260,7 @@ public class ImportServiceTest extends ExportImportTestBase {
}
@Test(dataProvider = "stocks-legacy")
- public void importExistingTopLevelEntity(ZipSource zipSource) throws IOException, AtlasBaseException{
+ public void importExistingTopLevelEntity(InputStream inputStream) throws IOException, AtlasBaseException{
loadBaseModel();
loadFsModel();
loadHiveModel();
@@ -277,7 +277,7 @@ public class ImportServiceTest extends ExportImportTestBase {
assertNotNull(createResponse);
String preImportGuid = createResponse.getCreatedEntities().get(0).getGuid();
- runImportWithNoParameters(importService, zipSource);
+ runImportWithNoParameters(importService, inputStream);
AtlasVertex v = AtlasGraphUtilsV2.findByGuid("886c5e9c-3ac6-40be-8201-fb0cebb64783");
assertNotNull(v);
@@ -295,10 +295,10 @@ public class ImportServiceTest extends ExportImportTestBase {
}
@Test(dataProvider = "stocks-glossary")
- public void importGlossary(ZipSource zipSource) throws IOException, AtlasBaseException {
+ public void importGlossary(InputStream inputStream) throws IOException, AtlasBaseException {
loadBaseModel();
loadGlossary();
- runImportWithNoParameters(importService, zipSource);
+ runImportWithNoParameters(importService, inputStream);
assertEntityCount("AtlasGlossary", "40c80052-3129-4f7c-8f2f-391677935416", 1);
assertEntityCount("AtlasGlossaryTerm", "e93ac426-de04-4d54-a7c9-d76c1e96369b", 1);
@@ -317,13 +317,13 @@ public class ImportServiceTest extends ExportImportTestBase {
}
@Test(dataProvider = "hdfs_path1", expectedExceptions = AtlasBaseException.class)
- public void importHdfs_path1(ZipSource zipSource) throws IOException, AtlasBaseException {
+ public void importHdfs_path1(InputStream inputStream) throws IOException, AtlasBaseException {
loadBaseModel();
loadFsModel();
loadModelFromResourcesJson("tag1.json", typeDefStore, typeRegistry);
try {
- runImportWithNoParameters(importService, zipSource);
+ runImportWithNoParameters(importService, inputStream);
} catch (AtlasBaseException e) {
assertEquals(e.getAtlasErrorCode(), AtlasErrorCode.INVALID_IMPORT_ATTRIBUTE_TYPE_CHANGED);
AtlasClassificationType tag1 = typeRegistry.getClassificationTypeByName("tag1");
@@ -344,11 +344,11 @@ public class ImportServiceTest extends ExportImportTestBase {
}
@Test(dataProvider = "relationshipLineage")
- public void importDB8(ZipSource zipSource) throws AtlasBaseException, IOException {
+ public void importDB8(InputStream inputStream) throws AtlasBaseException, IOException {
loadBaseModel();
loadHiveModel();
AtlasImportRequest request = getDefaultImportRequest();
- runImportWithParameters(importService, request, zipSource);
+ runImportWithParameters(importService, request, inputStream);
}
@DataProvider(name = "relationship")
@@ -357,11 +357,11 @@ public class ImportServiceTest extends ExportImportTestBase {
}
@Test(dataProvider = "relationship")
- public void importDB7(ZipSource zipSource) throws AtlasBaseException, IOException {
+ public void importDB7(InputStream inputStream) throws AtlasBaseException, IOException {
loadBaseModel();
loadHiveModel();
AtlasImportRequest request = getDefaultImportRequest();
- runImportWithParameters(importService, request, zipSource);
+ runImportWithParameters(importService, request, inputStream);
assertEntityCount("hive_db", "d7dc0848-fbba-4d63-9264-a460798361f5", 1);
assertEntityCount("hive_table", "2fb31eaa-4bb2-4eb8-b333-a888ba7c84fe", 1);
@@ -430,11 +430,12 @@ public class ImportServiceTest extends ExportImportTestBase {
}
@Test(dataProvider = "salesNewTypeAttrs-next")
- public void transformUpdatesForSubTypes(ZipSource zipSource) throws IOException, AtlasBaseException {
+ public void transformUpdatesForSubTypes(InputStream inputStream) throws IOException, AtlasBaseException {
loadBaseModel();
loadHiveModel();
String transformJSON = "{ \"Asset\": { \"qualifiedName\":[ \"lowercase\", \"replace:@cl1:@cl2\" ] } }";
+ ZipSource zipSource = new ZipSource(inputStream);
importService.setImportTransform(zipSource, transformJSON);
ImportTransforms importTransforms = zipSource.getImportTransform();
@@ -444,11 +445,12 @@ public class ImportServiceTest extends ExportImportTestBase {
}
@Test(dataProvider = "salesNewTypeAttrs-next")
- public void transformUpdatesForSubTypesAddsToExistingTransforms(ZipSource zipSource) throws IOException, AtlasBaseException {
- loadBaseModel();
- loadHiveModel();
+ public void transformUpdatesForSubTypesAddsToExistingTransforms(InputStream inputStream) throws IOException, AtlasBaseException {
+ loadBaseModel();
+ loadHiveModel();
String transformJSON = "{ \"Asset\": { \"qualifiedName\":[ \"replace:@cl1:@cl2\" ] }, \"hive_table\": { \"qualifiedName\":[ \"lowercase\" ] } }";
+ ZipSource zipSource = new ZipSource(inputStream);
importService.setImportTransform(zipSource, transformJSON);
ImportTransforms importTransforms = zipSource.getImportTransform();
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java
index 06bdaa6..78fdaca 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java
@@ -66,7 +66,7 @@ public class ImportTransformsShaperTest extends ExportImportTestBase {
public void newTagIsCreatedAndEntitiesAreTagged() throws AtlasBaseException, IOException {
AtlasImportResult result = ZipFileResourceTestUtils.runImportWithParameters(importService,
getImporRequest(),
- ZipFileResourceTestUtils.getZipSourceFrom("stocks.zip"));
+ ZipFileResourceTestUtils.getInputStreamFrom("stocks.zip"));
AtlasClassificationType classification = typeRegistry.getClassificationTypeByName(TAG_NAME);
assertNotNull(classification);
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractorTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractorTest.java
index 03d50f1..920fc28 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractorTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractorTest.java
@@ -41,6 +41,7 @@ import javax.inject.Inject;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
@@ -106,8 +107,8 @@ public class RelationshipAttributesExtractorTest {
}
@Test(dataProvider = "hiveDb")
- public void importHiveDb(ZipSource zipSource) throws AtlasBaseException, IOException {
- runImportWithNoParameters(importService, zipSource);
+ public void importHiveDb(InputStream inputStream) throws AtlasBaseException, IOException {
+ runImportWithNoParameters(importService, inputStream);
}
@Test(dependsOnMethods = "importHiveDb")
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
index 92d4fb0..7a1ed18 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
@@ -24,10 +24,10 @@ import org.apache.atlas.RequestContext;
import org.apache.atlas.TestModules;
import org.apache.atlas.TestUtilsV2;
import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.impexp.AtlasServer;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AtlasServer;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.repository.Constants;
@@ -40,6 +40,7 @@ import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.TestResourceFileUtils;
+import org.apache.commons.io.IOUtils;
import org.testng.SkipException;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
@@ -47,7 +48,10 @@ import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.inject.Inject;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.util.List;
import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_KEY_REPLICATED_TO;
@@ -88,7 +92,7 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
@Inject
private AtlasEntityStoreV2 entityStore;
- private ZipSource zipSource;
+ private InputStream inputStream;
@BeforeClass
public void setup() throws IOException, AtlasBaseException {
@@ -107,13 +111,19 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
}
@Test
- public void exportWithReplicationToOption_AddsClusterObjectIdToReplicatedFromAttribute() throws AtlasBaseException {
+ public void exportWithReplicationToOption_AddsClusterObjectIdToReplicatedFromAttribute() throws AtlasBaseException, IOException {
final int expectedEntityCount = 2;
AtlasExportRequest request = getUpdateMetaInfoUpdateRequest();
- zipSource = runExportWithParameters(exportService, request);
+ InputStream inputStream = runExportWithParameters(exportService, request);
+
+ assertNotNull(inputStream);
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ IOUtils.copy(inputStream, baos);
+ this.inputStream = new ByteArrayInputStream(baos.toByteArray());
- assertNotNull(zipSource);
+ ZipSource zipSource = new ZipSource(new ByteArrayInputStream(baos.toByteArray()));
assertNotNull(zipSource.getCreationOrder());
assertEquals(zipSource.getCreationOrder().size(), expectedEntityCount);
@@ -139,7 +149,7 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
@Test(dependsOnMethods = "exportWithReplicationToOption_AddsClusterObjectIdToReplicatedFromAttribute")
public void importWithReplicationFromOption_AddsClusterObjectIdToReplicatedFromAttribute() throws AtlasBaseException, IOException {
AtlasImportRequest request = getImportRequestWithReplicationOption();
- AtlasImportResult importResult = runImportWithParameters(importService, request, zipSource);
+ AtlasImportResult importResult = runImportWithParameters(importService, request, inputStream);
assertCluster(
AuditsWriter.getServerNameFromFullName(REPLICATED_FROM_CLUSTER_NAME),
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
index 76b423e..f4e84b2 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
@@ -33,12 +33,14 @@ import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStreamForImport;
+import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasJson;
import org.apache.atlas.utils.TestResourceFileUtils;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,6 +51,8 @@ import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -151,19 +155,11 @@ public class ZipFileResourceTestUtils {
}
public static Object[][] getZipSource(String fileName) throws IOException, AtlasBaseException {
- return new Object[][]{{getZipSourceFrom(fileName)}};
+ return new Object[][]{{getInputStreamFrom(fileName)}};
}
- public static ZipSource getZipSourceFrom(String fileName) throws IOException, AtlasBaseException {
- FileInputStream fs = ZipFileResourceTestUtils.getFileInputStream(fileName);
-
- return new ZipSource(fs);
- }
-
- private static ZipSource getZipSourceFrom(ByteArrayOutputStream baos) throws IOException, AtlasBaseException {
- ByteArrayInputStream bis = new ByteArrayInputStream(baos.toByteArray());
- ZipSource zipSource = new ZipSource(bis);
- return zipSource;
+ public static InputStream getInputStreamFrom(String fileName) {
+ return ZipFileResourceTestUtils.getFileInputStream(fileName);
}
public static void verifyImportedEntities(List<String> creationOrder, List<String> processedEntities) {
@@ -224,7 +220,7 @@ public class ZipFileResourceTestUtils {
}
}
- public static ZipSource runExportWithParameters(ExportService exportService, AtlasExportRequest request) {
+ public static InputStream runExportWithParameters(ExportService exportService, AtlasExportRequest request) {
final String requestingIP = "1.0.0.0";
final String hostName = "localhost";
final String userName = "admin";
@@ -237,7 +233,7 @@ public class ZipFileResourceTestUtils {
assertEquals(result.getOperationStatus(), AtlasExportResult.OperationStatus.SUCCESS);
zipSink.close();
- return getZipSourceFrom(baos);
+ return new ByteArrayInputStream(baos.toByteArray());
}
catch(Exception ex) {
throw new SkipException(String.format("runExportWithParameters: %s: failed!", request.toString()));
@@ -325,27 +321,42 @@ public class ZipFileResourceTestUtils {
}
- public static AtlasImportResult runImportWithParameters(ImportService importService, AtlasImportRequest request, ZipSource source) throws AtlasBaseException, IOException {
+ public static AtlasImportResult runImportWithParameters(ImportService importService, AtlasImportRequest request, InputStream inputStream) throws AtlasBaseException, IOException {
+ final String requestingIP = "1.0.0.0";
+ final String hostName = "localhost";
+ final String userName = "admin";
+
+ AtlasImportResult result = importService.run(inputStream, request, userName, hostName, requestingIP);
+ assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS);
+ return result;
+ }
+
+ public static AtlasImportResult runImportWithNoParameters(ImportService importService, InputStream inputStream) throws AtlasBaseException, IOException {
final String requestingIP = "1.0.0.0";
final String hostName = "localhost";
final String userName = "admin";
- AtlasImportResult result = importService.run(source, request, userName, hostName, requestingIP);
+ AtlasImportResult result = importService.run(inputStream, userName, hostName, requestingIP);
assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS);
return result;
}
- public static AtlasImportResult runImportWithNoParameters(ImportService importService, ZipSource source) throws AtlasBaseException, IOException {
+ public static AtlasImportResult runImportWithNoParametersUsingBackingDirectory(ImportService importService, InputStream inputStream) throws AtlasBaseException, IOException {
final String requestingIP = "1.0.0.0";
final String hostName = "localhost";
final String userName = "admin";
- AtlasImportResult result = importService.run(source, userName, hostName, requestingIP);
+ EntityImportStream sourceWithBackingDirectory = new ZipSourceWithBackingDirectory(inputStream, Files.createTempDirectory("temp").toString());
+ AtlasImportResult result = importService.run(sourceWithBackingDirectory, new AtlasImportRequest(), userName, hostName, requestingIP);
assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS);
return result;
}
- public static void runAndVerifyQuickStart_v1_Import(ImportService importService, ZipSource zipSource) throws AtlasBaseException, IOException {
+ public static void runAndVerifyQuickStart_v1_Import(ImportService importService, InputStream is) throws AtlasBaseException, IOException {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ IOUtils.copy(is, baos);
+
+ ZipSource zipSource = new ZipSource(new ByteArrayInputStream(baos.toByteArray()));
AtlasExportResult exportResult = zipSource.getExportResult();
List<String> creationOrder = zipSource.getCreationOrder();
@@ -353,7 +364,7 @@ public class ZipFileResourceTestUtils {
RequestContext.get().setUser(TestUtilsV2.TEST_USER, null);
AtlasImportRequest request = getDefaultImportRequest();
- AtlasImportResult result = runImportWithParameters(importService, request, zipSource);
+ AtlasImportResult result = runImportWithParameters(importService, request, new ByteArrayInputStream(baos.toByteArray()));
assertNotNull(result);
verifyImportedMetrics(exportResult, result);
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java
index 7436dc0..46164e8 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java
@@ -28,6 +28,7 @@ import org.testng.annotations.Test;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.util.List;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource;
@@ -127,7 +128,8 @@ public class ZipSourceTest {
}
@Test(dataProvider = "sales")
- public void iteratorSetPositionBehavor(ZipSource zipSource) throws IOException, AtlasBaseException {
+ public void iteratorSetPositionBehavor(InputStream inputStream) throws IOException, AtlasBaseException {
+ ZipSource zipSource = new ZipSource(inputStream);
Assert.assertTrue(zipSource.hasNext());
List<String> creationOrder = zipSource.getCreationOrder();
diff --git a/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationTest.java b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationTest.java
index 9d6d057..6f9c05e 100644
--- a/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationTest.java
@@ -51,6 +51,7 @@ import org.testng.annotations.Test;
import javax.inject.Inject;
import java.io.FileInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -601,9 +602,8 @@ public class ClassificationPropagationTest {
}
}
- public static ZipSource getZipSource(String fileName) throws IOException, AtlasBaseException {
- FileInputStream fs = ZipFileResourceTestUtils.getFileInputStream(fileName);
- return new ZipSource(fs);
+ public static InputStream getZipSource(String fileName) throws IOException {
+ return ZipFileResourceTestUtils.getFileInputStream(fileName);
}
private void loadSampleClassificationDefs() throws AtlasBaseException {
diff --git a/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java b/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java
index b2f2633..baeafd4 100644
--- a/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java
@@ -39,6 +39,7 @@ import org.testng.annotations.Test;
import javax.inject.Inject;
import java.io.FileInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
@@ -246,9 +247,8 @@ public class MetricsServiceTest {
}
}
- public static ZipSource getZipSource(String fileName) throws IOException, AtlasBaseException {
- FileInputStream fs = ZipFileResourceTestUtils.getFileInputStream(fileName);
- return new ZipSource(fs);
+ public static InputStream getZipSource(String fileName) throws AtlasBaseException {
+ return ZipFileResourceTestUtils.getFileInputStream(fileName);
}
private static class TestClock extends Clock {
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index e78fcb6..464d46f 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -45,7 +45,6 @@ import org.apache.atlas.repository.impexp.ExportService;
import org.apache.atlas.repository.impexp.ImportService;
import org.apache.atlas.repository.impexp.MigrationProgressService;
import org.apache.atlas.repository.impexp.ZipSink;
-import org.apache.atlas.repository.impexp.ZipSource;
import org.apache.atlas.repository.patches.AtlasPatchManager;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.services.MetricsService;
@@ -404,9 +403,8 @@ public class AdminResource {
try {
AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class);
- ZipSource zipSource = new ZipSource(inputStream);
- result = importService.run(zipSource, request, AtlasAuthorizationUtils.getCurrentUserName(),
+ result = importService.run(inputStream, request, Servlets.getUserName(httpServletRequest),
Servlets.getHostName(httpServletRequest),
AtlasAuthorizationUtils.getRequestIpAddress(httpServletRequest));
} catch (AtlasBaseException excp) {