You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2019/09/09 23:37:21 UTC
[atlas] branch branch-0.8 updated: ATLAS-3396:
ZipSourceWithBackingDirectory implementation.
This is an automated email from the ASF dual-hosted git repository.
amestry pushed a commit to branch branch-0.8
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-0.8 by this push:
new 1778ed9 ATLAS-3396: ZipSourceWithBackingDirectory implementation.
1778ed9 is described below
commit 1778ed9d9f37e59a2f5eecfe8d4bf6384ac46942
Author: Ashutosh Mestry <am...@hortonworks.com>
AuthorDate: Mon Sep 9 14:12:08 2019 -0700
ATLAS-3396: ZipSourceWithBackingDirectory implementation.
---
.../java/org/apache/atlas/AtlasConfiguration.java | 4 +-
distro/src/conf/atlas-application.properties | 3 +
docs/src/site/twiki/Import-API-Options.twiki | 11 +
.../atlas/repository/impexp/ImportService.java | 58 ++--
.../apache/atlas/repository/impexp/ZipSource.java | 10 +-
.../impexp/ZipSourceWithBackingDirectory.java | 371 +++++++++++++++++++++
.../store/graph/v1/AtlasEntityStreamForImport.java | 52 +++
.../store/graph/v1/BulkImporterImpl.java | 14 +-
.../store/graph/v1/EntityImportStream.java | 29 ++
.../repository/impexp/ExportImportTestBase.java | 3 -
.../repository/impexp/ExportIncrementalTest.java | 68 ++--
.../repository/impexp/ExportSkipLineageTest.java | 7 +-
.../atlas/repository/impexp/ImportServiceTest.java | 68 ++--
.../impexp/ImportTransformsShaperTest.java | 2 +-
.../impexp/ReplicationEntityAttributeTest.java | 22 +-
.../impexp/ZipFileResourceTestUtils.java | 50 +--
.../atlas/repository/impexp/ZipSourceTest.java | 4 +-
.../apache/atlas/web/resources/AdminResource.java | 7 +-
18 files changed, 669 insertions(+), 114 deletions(-)
diff --git a/common/src/main/java/org/apache/atlas/AtlasConfiguration.java b/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
index c5357f5..17d67cb 100644
--- a/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -47,7 +47,9 @@ public enum AtlasConfiguration {
//search configuration
SEARCH_MAX_LIMIT("atlas.search.maxlimit", 10000),
- SEARCH_DEFAULT_LIMIT("atlas.search.defaultlimit", 100);
+ SEARCH_DEFAULT_LIMIT("atlas.search.defaultlimit", 100),
+
+ IMPORT_TEMP_DIRECTORY("atlas.import.temp.directory", "");
private static final Configuration APPLICATION_PROPERTIES;
diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties
index 654a930..62dab1f 100755
--- a/distro/src/conf/atlas-application.properties
+++ b/distro/src/conf/atlas-application.properties
@@ -65,6 +65,9 @@ ${titan.index.properties}
# Solr-specific configuration property
atlas.graph.index.search.max-result-set-size=150
+######### Import Configs #########
+#atlas.import.temp.directory=/temp/import
+
######### Notification Configs #########
atlas.notification.embedded=true
atlas.kafka.data=${sys:atlas.home}/data/kafka
diff --git a/docs/src/site/twiki/Import-API-Options.twiki b/docs/src/site/twiki/Import-API-Options.twiki
index 4004e70..7f90475 100644
--- a/docs/src/site/twiki/Import-API-Options.twiki
+++ b/docs/src/site/twiki/Import-API-Options.twiki
@@ -26,6 +26,7 @@ Following options are supported for Import process:
* Specify transforms during import operation.
* Resume import by specifying starting entity guid.
* Optionally import type definition.
+ * Handling large imports.
---++++ Transforms
@@ -133,3 +134,13 @@ curl -g -X POST -u adminuser:password -H "Content-Type: application/json"
-d r@importOptions.json
"http://localhost:21000/api/atlas/admin/importfile"
</verbatim>
+
+---++++ Handling Large Imports
+
+By default, the Import Service stores all of the data in memory. This may be limiting for ZIPs containing large amount of data.
+
+To configure temporary directory use the application property _atlas.import.temp.directory_. If this property is left blank, default in-memory implementation is used.
+
+Please ensure that there is sufficient disk space available for the operation.
+
+The contents of the directory created as backing store for the import operation will be erased after the operation is over.
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index b5d8b7c..90e9f5d 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -18,6 +18,7 @@
package org.apache.atlas.repository.impexp;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.entitytransform.BaseEntityHandler;
import org.apache.atlas.entitytransform.TransformerContext;
@@ -26,22 +27,23 @@ import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.graph.BulkImporter;
+import org.apache.atlas.repository.store.graph.v1.EntityImportStream;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
-import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
-import java.io.ByteArrayInputStream;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
import java.util.List;
import static org.apache.atlas.model.impexp.AtlasImportRequest.TRANSFORMERS_KEY;
@@ -71,18 +73,25 @@ public class ImportService {
this.importTransformsShaper = importTransformsShaper;
}
- public AtlasImportResult run(ZipSource source, String userName,
+ public AtlasImportResult run(InputStream inputStream, String userName,
String hostName, String requestingIP) throws AtlasBaseException {
- return run(source, null, userName, hostName, requestingIP);
+ return run(inputStream, null, userName, hostName, requestingIP);
}
- public AtlasImportResult run(ZipSource source, AtlasImportRequest request, String userName,
+ public AtlasImportResult run(InputStream inputStream, AtlasImportRequest request, String userName,
String hostName, String requestingIP) throws AtlasBaseException {
if (request == null) {
request = new AtlasImportRequest();
}
+ EntityImportStream source = createZipSource(inputStream, AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
+ return run(source, request, userName, hostName, requestingIP);
+ }
+
+ @VisibleForTesting
+ AtlasImportResult run(EntityImportStream source, AtlasImportRequest request, String userName,
+ String hostName, String requestingIP) throws AtlasBaseException {
AtlasImportResult result = new AtlasImportResult(request, userName, requestingIP, hostName, System.currentTimeMillis());
try {
@@ -107,7 +116,10 @@ public class ImportService {
throw new AtlasBaseException(excp);
} finally {
- source.close();
+ if (source != null) {
+ source.close();
+ }
+
LOG.info("<== import(user={}, from={}): status={}", userName, requestingIP, result.getOperationStatus());
}
@@ -115,7 +127,7 @@ public class ImportService {
}
@VisibleForTesting
- void setImportTransform(ZipSource source, String transforms) throws AtlasBaseException {
+ void setImportTransform(EntityImportStream source, String transforms) throws AtlasBaseException {
ImportTransforms importTransform = ImportTransforms.fromJson(transforms);
if (importTransform == null) {
return;
@@ -127,11 +139,10 @@ public class ImportService {
if(LOG.isDebugEnabled()) {
debugLog(" => transforms: {}", AtlasType.toJson(importTransform));
}
-
}
@VisibleForTesting
- void setEntityTransformerHandlers(ZipSource source, String transformersJson) throws AtlasBaseException {
+ void setEntityTransformerHandlers(EntityImportStream source, String transformersJson) throws AtlasBaseException {
if (StringUtils.isEmpty(transformersJson)) {
return;
}
@@ -151,7 +162,7 @@ public class ImportService {
LOG.debug(s, params);
}
- private void setStartPosition(AtlasImportRequest request, ZipSource source) throws AtlasBaseException {
+ private void setStartPosition(AtlasImportRequest request, EntityImportStream source) throws AtlasBaseException {
if (request.getStartGuid() != null) {
source.setPositionUsingEntityGuid(request.getStartGuid());
} else if (request.getStartPosition() != null) {
@@ -159,8 +170,7 @@ public class ImportService {
}
}
- public AtlasImportResult run(AtlasImportRequest request, String userName, String hostName, String requestingIP)
- throws AtlasBaseException {
+ public AtlasImportResult run(AtlasImportRequest request, String userName, String hostName, String requestingIP) throws AtlasBaseException {
String fileName = request.getFileName();
if (StringUtils.isBlank(fileName)) {
@@ -168,14 +178,11 @@ public class ImportService {
}
AtlasImportResult result = null;
-
try {
LOG.info("==> import(user={}, from={}, fileName={})", userName, requestingIP, fileName);
- String transforms = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(TRANSFORMS_KEY) : null;
File file = new File(fileName);
- ZipSource source = new ZipSource(new ByteArrayInputStream(FileUtils.readFileToByteArray(file)), ImportTransforms.fromJson(transforms));
- result = run(source, request, userName, hostName, requestingIP);
+ result = run(new FileInputStream(file), request, userName, hostName, requestingIP);
} catch (AtlasBaseException excp) {
LOG.error("import(user={}, from={}, fileName={}): failed", userName, requestingIP, excp);
@@ -184,10 +191,6 @@ public class ImportService {
LOG.error("import(user={}, from={}, fileName={}): file not found", userName, requestingIP, excp);
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, fileName + ": file not found");
- } catch (IOException excp) {
- LOG.error("import(user={}, from={}, fileName={}): cannot read file", userName, requestingIP, excp);
-
- throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, fileName + ": cannot read file");
} catch (Exception excp) {
LOG.error("import(user={}, from={}, fileName={}): failed", userName, requestingIP, excp);
@@ -209,7 +212,7 @@ public class ImportService {
importTypeDefProcessor.processTypes(typeDefinitionMap, result);
}
- private void processEntities(String userName, ZipSource importSource, AtlasImportResult result) throws AtlasBaseException {
+ private void processEntities(String userName, EntityImportStream importSource, AtlasImportResult result) throws AtlasBaseException {
this.bulkImporter.bulkImport(importSource, result);
endTimestamp = System.currentTimeMillis();
@@ -223,4 +226,17 @@ public class ImportService {
private int getDuration(long endTime, long startTime) {
return (int) (endTime - startTime);
}
+
+ private EntityImportStream createZipSource(InputStream inputStream, String configuredTemporaryDirectory) throws AtlasBaseException {
+ try {
+ if (StringUtils.isEmpty(configuredTemporaryDirectory)) {
+ return new ZipSource(inputStream);
+ }
+
+ return new ZipSourceWithBackingDirectory(inputStream, configuredTemporaryDirectory);
+ }
+ catch (IOException ex) {
+ throw new AtlasBaseException(ex);
+ }
+ }
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
index 016acd7..ce5fe18 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
@@ -82,20 +82,25 @@ public class ZipSource implements EntityImportStream {
guidEntityJsonMap.get(key).equals("[]"));
}
+ @Override
public ImportTransforms getImportTransform() { return this.importTransform; }
+ @Override
public void setImportTransform(ImportTransforms importTransform) {
this.importTransform = importTransform;
}
+ @Override
public List<BaseEntityHandler> getEntityHandlers() {
return entityHandlers;
}
+ @Override
public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) {
this.entityHandlers = entityHandlers;
}
+ @Override
public AtlasTypesDef getTypesDef() throws AtlasBaseException {
final String fileName = ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString();
@@ -103,6 +108,7 @@ public class ZipSource implements EntityImportStream {
return convertFromJson(AtlasTypesDef.class, s);
}
+ @Override
public AtlasExportResult getExportResult() throws AtlasBaseException {
final String fileName = ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toString();
@@ -147,6 +153,7 @@ public class ZipSource implements EntityImportStream {
zipInputStream.close();
}
+ @Override
public List<String> getCreationOrder() {
return this.creationOrder;
}
@@ -204,6 +211,7 @@ public class ZipSource implements EntityImportStream {
return s;
}
+ @Override
public void close() {
try {
inputStream.close();
@@ -278,7 +286,7 @@ public class ZipSource implements EntityImportStream {
currentPosition = index;
reset();
for (int i = 0; i < creationOrder.size() && i <= index; i++) {
- iterator.next();
+ onImportComplete(iterator.next());
}
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceWithBackingDirectory.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceWithBackingDirectory.java
new file mode 100644
index 0000000..7c3199f
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceWithBackingDirectory.java
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.entitytransform.BaseEntityHandler;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportResult;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.repository.store.graph.v1.EntityImportStream;
+import org.apache.atlas.type.AtlasType;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+import static org.apache.atlas.AtlasErrorCode.IMPORT_ATTEMPTING_EMPTY_ZIP;
+
+public class ZipSourceWithBackingDirectory implements EntityImportStream {
+ private static final Logger LOG = LoggerFactory.getLogger(ZipSourceWithBackingDirectory.class);
+ private static final String TEMPORARY_DIRECTORY_PREFIX = "atlas-import-temp-";
+ private static final String EXT_JSON = ".json";
+
+ private Path tempDirectory;
+
+ private ImportTransforms importTransform;
+ private List<BaseEntityHandler> entityHandlers;
+
+ private ArrayList<String> creationOrder = new ArrayList<>();
+ private int currentPosition;
+ private int numberOfEntries;
+
+ public ZipSourceWithBackingDirectory(InputStream inputStream) throws IOException, AtlasBaseException {
+ this(inputStream, null);
+ }
+
+ public ZipSourceWithBackingDirectory(InputStream inputStream, String backingDirectory) throws IOException, AtlasBaseException {
+ setupBackingStore(inputStream, backingDirectory);
+ if (isZipFileEmpty()) {
+ throw new AtlasBaseException(IMPORT_ATTEMPTING_EMPTY_ZIP, "Attempting to import empty ZIP.");
+ }
+ }
+
+ @Override
+ public ImportTransforms getImportTransform() { return this.importTransform; }
+
+ @Override
+ public void setImportTransform(ImportTransforms importTransform) {
+ this.importTransform = importTransform;
+ }
+
+ @Override
+ public List<BaseEntityHandler> getEntityHandlers() {
+ return entityHandlers;
+ }
+
+ @Override
+ public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) {
+ this.entityHandlers = entityHandlers;
+ }
+
+ @Override
+ public AtlasTypesDef getTypesDef() throws AtlasBaseException {
+ return getJsonFromEntry(ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString(), AtlasTypesDef.class);
+ }
+
+ @Override
+ public AtlasExportResult getExportResult() throws AtlasBaseException {
+ return getJsonFromEntry(ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toString(), AtlasExportResult.class);
+ }
+
+ @Override
+ public List<String> getCreationOrder() {
+ return creationOrder;
+ }
+
+ @Override
+ public int getPosition() {
+ return currentPosition;
+ }
+
+ @Override
+ public AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException {
+ final File file = getFileFromTemporaryDirectory(guid + EXT_JSON);
+ if (!file.exists()) {
+ return null;
+ }
+
+ String json = getJsonStringForFile(file);
+ if (StringUtils.isEmpty(json)) {
+ return null;
+ }
+
+ AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = convertFromJson(AtlasEntity.AtlasEntityWithExtInfo.class, json);
+
+ if (importTransform != null) {
+ entityWithExtInfo = importTransform.apply(entityWithExtInfo);
+ }
+
+ if (entityHandlers != null) {
+ applyTransformers(entityWithExtInfo);
+ }
+
+ return entityWithExtInfo;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return (currentPosition < numberOfEntries);
+ }
+
+ @Override
+ public AtlasEntity next() {
+ AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = getNextEntityWithExtInfo();
+
+ return entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
+ }
+
+ @Override
+ public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
+ try {
+ return getEntityWithExtInfo(moveNext());
+ } catch (AtlasBaseException e) {
+ LOG.error("getNextEntityWithExtInfo", e);
+ return null;
+ }
+ }
+
+ @Override
+ public void reset() {
+ currentPosition = 0;
+ }
+
+ @Override
+ public AtlasEntity getByGuid(String guid) {
+ try {
+ return getEntity(guid);
+ } catch (AtlasBaseException e) {
+ LOG.error("getByGuid: {} failed!", guid, e);
+ return null;
+ }
+ }
+
+
+ @Override
+ public void onImportComplete(String guid) {
+ getFileFromTemporaryDirectory(guid + EXT_JSON).delete();
+ }
+
+ @Override
+ public void setPosition(int index) {
+ reset();
+ for (int i = 0; i < numberOfEntries && i <= index; i++) {
+ onImportComplete(moveNext());
+ }
+ }
+
+ @Override
+ public void setPositionUsingEntityGuid(String guid) {
+ if (StringUtils.isEmpty(guid)) {
+ return;
+ }
+
+ String current;
+ while (currentPosition < numberOfEntries) {
+ current = creationOrder.get(currentPosition);
+ if (current.equals(guid)) {
+ return;
+ }
+
+ moveNext();
+ }
+ }
+
+ @Override
+ public void close() {
+ creationOrder.clear();
+ try {
+ LOG.error("Removing temporary directory: {}", tempDirectory.toString());
+ FileUtils.deleteDirectory(tempDirectory.toFile());
+ } catch (IOException e) {
+ LOG.error("Error deleting: {}", tempDirectory.toString(), e);
+ }
+ }
+
+ private boolean isZipFileEmpty() {
+ return (numberOfEntries == 0);
+ }
+
+ private <T> T getJsonFromEntry(String entryName, Class<T> clazz) throws AtlasBaseException {
+ final File file = getFileFromTemporaryDirectory(entryName + EXT_JSON);
+ if (!file.exists()) {
+ throw new AtlasBaseException(entryName + " not found!");
+ }
+
+ return convertFromJson(clazz, getJsonStringForFile(file));
+ }
+
+ private void setupBackingStore(InputStream inputStream, String backingDirectory) throws AtlasBaseException, IOException {
+ initTempDirectory(backingDirectory);
+ unzipToTempDirectory(inputStream);
+ setupIterator();
+ }
+
+ private void initTempDirectory(String backingDirectory) throws AtlasBaseException {
+ try {
+ tempDirectory = Files.createDirectory(Paths.get(backingDirectory, getChildDirectoryForSession()));
+ if (!permissionChecks(tempDirectory.toFile())) {
+ throw new AtlasBaseException(
+ String.format("Import: Temporary directory: %s does not have permissions for operation!", tempDirectory.toString()));
+ }
+ }
+ catch(Exception ex) {
+ throw new AtlasBaseException(String.format("Error fetching temporary directory: %s", tempDirectory.toString()), ex);
+ }
+ }
+
+ private String getChildDirectoryForSession() {
+ return String.format("%s%s", TEMPORARY_DIRECTORY_PREFIX, UUID.randomUUID());
+ }
+
+ private boolean permissionChecks(File f) {
+ return f.exists() && f.isDirectory() && f.canWrite();
+ }
+
+ private void unzipToTempDirectory(InputStream inputStream) throws IOException {
+ LOG.info("Import: Temporary directory: {}", tempDirectory.toString());
+
+ ZipInputStream zis = new ZipInputStream(inputStream);
+ try {
+ ZipEntry zipEntry = zis.getNextEntry();
+ while (zipEntry != null) {
+ String entryName = zipEntry.getName();
+
+ writeJsonToFile(entryName, getJsonPayloadFromZipEntryStream(zis));
+ numberOfEntries++;
+
+ zipEntry = zis.getNextEntry();
+ }
+
+ numberOfEntries -= ZipExportFileNames.values().length;
+ }
+ finally {
+ zis.close();
+ inputStream.close();
+ }
+ }
+
+ private void writeJsonToFile(String entryName, byte[] jsonPayload) throws IOException {
+ File f = getFileFromTemporaryDirectory(entryName);
+ Files.write(f.toPath(), jsonPayload);
+ }
+
+ private File getFileFromTemporaryDirectory(String entryName) {
+ return new File(tempDirectory.toFile(), entryName);
+ }
+
+ private void setupIterator() {
+ try {
+ creationOrder = getJsonFromEntry(ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString(), ArrayList.class);
+ } catch (AtlasBaseException e) {
+ LOG.error("Error fetching: {}. Error generating order.", ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString(), e);
+ }
+
+ reset();
+ }
+
+ private byte[] getJsonPayloadFromZipEntryStream(ZipInputStream zipInputStream) {
+ try {
+ byte[] buf = new byte[1024];
+
+ int n = 0;
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ while ((n = zipInputStream.read(buf, 0, 1024)) > -1) {
+ bos.write(buf, 0, n);
+ }
+
+ return bos.toByteArray();
+ } catch (IOException ex) {
+ LOG.error("Error fetching string from entry.", ex);
+ }
+
+ return null;
+ }
+
+ private String getJsonStringForFile(File file) {
+ try {
+ byte[] bytes = Files.readAllBytes(file.toPath());
+ return new String(bytes);
+ } catch (IOException e) {
+ LOG.warn("Error fetching: {}", file.toString(), e);
+ return null;
+ }
+ }
+
+ private void applyTransformers(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
+ if (entityWithExtInfo == null) {
+ return;
+ }
+
+ transform(entityWithExtInfo.getEntity());
+
+ if (MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) {
+ for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
+ transform(e);
+ }
+ }
+ }
+
+ private void transform(AtlasEntity e) {
+ for (BaseEntityHandler handler : entityHandlers) {
+ handler.transform(e);
+ }
+ }
+
+ private <T> T convertFromJson(Class<T> clazz, String jsonData) throws AtlasBaseException {
+ try {
+ return AtlasType.fromJson(jsonData, clazz);
+
+ } catch (Exception e) {
+ throw new AtlasBaseException("Error converting file to JSON.", e);
+ }
+ }
+
+ private AtlasEntity getEntity(String guid) throws AtlasBaseException {
+ AtlasEntity.AtlasEntityWithExtInfo extInfo = getEntityWithExtInfo(guid);
+ return (extInfo != null) ? extInfo.getEntity() : null;
+ }
+
+ public int size() {
+ return numberOfEntries;
+ }
+
+ private String moveNext() {
+ if (currentPosition < numberOfEntries) {
+ return creationOrder.get(currentPosition++);
+ }
+
+ return null;
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java
index 90ae15d..3c85f0f 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java
@@ -17,8 +17,15 @@
*/
package org.apache.atlas.repository.store.graph.v1;
+import org.apache.atlas.entitytransform.BaseEntityHandler;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.repository.impexp.ImportTransforms;
+
+import java.util.List;
public class AtlasEntityStreamForImport extends AtlasEntityStream implements EntityImportStream {
private int currentPosition = 0;
@@ -36,6 +43,11 @@ public class AtlasEntityStreamForImport extends AtlasEntityStream implements Ent
}
@Override
+ public AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException {
+ return null;
+ }
+
+ @Override
public AtlasEntity getByGuid(String guid) {
AtlasEntity ent = super.entitiesWithExtInfo.getEntity(guid);
@@ -69,4 +81,44 @@ public class AtlasEntityStreamForImport extends AtlasEntityStream implements Ent
public void onImportComplete(String guid) {
}
+
+ @Override
+ public void setImportTransform(ImportTransforms importTransform) {
+
+ }
+
+ @Override
+ public ImportTransforms getImportTransform() {
+ return null;
+ }
+
+ @Override
+ public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) {
+
+ }
+
+ @Override
+ public List<BaseEntityHandler> getEntityHandlers() {
+ return null;
+ }
+
+ @Override
+ public AtlasTypesDef getTypesDef() throws AtlasBaseException {
+ return null;
+ }
+
+ @Override
+ public AtlasExportResult getExportResult() throws AtlasBaseException {
+ return null;
+ }
+
+ @Override
+ public List<String> getCreationOrder() {
+ return null;
+ }
+
+ @Override
+ public void close() {
+
+ }
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java
index 2606692..17c6ac2 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java
@@ -18,7 +18,10 @@
package org.apache.atlas.repository.store.graph.v1;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.RequestContextV1;
import org.apache.atlas.exception.AtlasBaseException;
@@ -29,6 +32,7 @@ import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.BulkImporter;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -45,10 +49,12 @@ public class BulkImporterImpl implements BulkImporter {
private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV1.class);
private final AtlasEntityStore entityStore;
+ private boolean directoryBasedImportConfigured;
@Inject
public BulkImporterImpl(AtlasEntityStore entityStore) {
this.entityStore = entityStore;
+ this.directoryBasedImportConfigured = StringUtils.isNotEmpty(AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
}
@Override
@@ -128,9 +134,11 @@ public class BulkImporterImpl implements BulkImporter {
AtlasImportResult importResult,
Set<String> processedGuids,
int currentIndex, int streamSize, float currentPercent) {
- updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
- updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
- updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
+ if (!directoryBasedImportConfigured) {
+ updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
+ updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
+ updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
+ }
String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid());
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java
index d4b6c55..8131922 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java
@@ -18,17 +18,46 @@
package org.apache.atlas.repository.store.graph.v1;
+import org.apache.atlas.entitytransform.BaseEntityHandler;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportResult;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.repository.impexp.ImportTransforms;
+
+import java.util.List;
public interface EntityImportStream extends EntityStream {
int size();
+
void setPosition(int position);
+
int getPosition();
void setPositionUsingEntityGuid(String guid);
AtlasEntityWithExtInfo getNextEntityWithExtInfo();
+ AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException;
+
void onImportComplete(String guid);
+
+ void setImportTransform(ImportTransforms importTransform);
+
+ public ImportTransforms getImportTransform();
+
+ void setEntityHandlers(List<BaseEntityHandler> entityHandlers);
+
+ List<BaseEntityHandler> getEntityHandlers();
+
+ AtlasTypesDef getTypesDef() throws AtlasBaseException;
+
+ AtlasExportResult getExportResult() throws AtlasBaseException;
+
+ List<String> getCreationOrder();
+
+ void close();
}
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java
index a11bcdc..94b7177 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java
@@ -18,9 +18,6 @@
package org.apache.atlas.repository.impexp;
-import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasConstants;
-import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.ExportImportAuditEntry;
import org.apache.atlas.model.instance.AtlasEntity;
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
index 5ec62e5..de4cf15 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
@@ -33,16 +33,20 @@ import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.TestResourceFileUtils;
+import org.apache.commons.io.IOUtils;
import org.testng.ITestContext;
import org.testng.SkipException;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
-import org.testng.annotations.DataProvider;
import javax.inject.Inject;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -52,8 +56,8 @@ import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREM
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createTypes;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getEntities;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource;
-import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runExportWithParameters;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@@ -106,11 +110,13 @@ public class ExportIncrementalTest extends ExportImportTestBase {
}
@Test
- public void atT0_ReturnsAllEntities() throws AtlasBaseException {
+ public void atT0_ReturnsAllEntities() throws AtlasBaseException, IOException {
final int expectedEntityCount = 2;
AtlasExportRequest request = getIncrementalRequest(0);
- ZipSource source = runExportWithParameters(exportService, request);
+ InputStream inputStream = runExportWithParameters(exportService, request);
+
+ ZipSource source = getZipSourceFromInputStream(inputStream);
AtlasEntity.AtlasEntityWithExtInfo entities = getEntities(source, expectedEntityCount);
int count = 0;
@@ -128,13 +134,15 @@ public class ExportIncrementalTest extends ExportImportTestBase {
}
@Test(dependsOnMethods = "atT0_ReturnsAllEntities")
- public void atT1_NewClassificationAttachedToTable_ReturnsChangedTable() throws AtlasBaseException {
+ public void atT1_NewClassificationAttachedToTable_ReturnsChangedTable() throws AtlasBaseException, IOException {
final int expectedEntityCount = 1;
entityStore.addClassifications(TABLE_GUID, ImmutableList.of(classificationTypeT1.createDefaultValue()));
AtlasExportRequest request = getIncrementalRequest(nextTimestamp);
- ZipSource source = runExportWithParameters(exportService, request);
+ InputStream inputStream = runExportWithParameters(exportService, request);
+
+ ZipSource source = getZipSourceFromInputStream(inputStream);
AtlasEntity.AtlasEntityWithExtInfo entities = getEntities(source, expectedEntityCount);
AtlasEntity entity = null;
@@ -154,7 +162,7 @@ public class ExportIncrementalTest extends ExportImportTestBase {
}
@Test(dependsOnMethods = "atT1_NewClassificationAttachedToTable_ReturnsChangedTable")
- public void atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn() throws AtlasBaseException {
+ public void atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn() throws AtlasBaseException, IOException {
final int expectedEntityCount = 1;
AtlasEntity.AtlasEntityWithExtInfo tableEntity = entityStore.getById(TABLE_GUID);
@@ -162,7 +170,9 @@ public class ExportIncrementalTest extends ExportImportTestBase {
entityStore.addClassifications(COLUMN_GUID_HIGH, ImmutableList.of(typeRegistry.getClassificationTypeByName("T1").createDefaultValue()));
- ZipSource source = runExportWithParameters(exportService, getIncrementalRequest(nextTimestamp));
+ InputStream inputStream = runExportWithParameters(exportService, getIncrementalRequest(nextTimestamp));
+
+ ZipSource source = getZipSourceFromInputStream(inputStream);
AtlasEntity.AtlasEntityWithExtInfo entities = getEntities(source, expectedEntityCount);
for (Map.Entry<String, AtlasEntity> entry : entities.getReferredEntities().entrySet()) {
@@ -175,17 +185,26 @@ public class ExportIncrementalTest extends ExportImportTestBase {
assertEquals(preExportTableEntityTimestamp, postUpdateTableEntityTimestamp);
}
+ private ZipSource getZipSourceFromInputStream(InputStream inputStream) {
+ try {
+ return new ZipSource(inputStream);
+ } catch (IOException | AtlasBaseException e) {
+ return null;
+ }
+ }
+
@Test(dependsOnMethods = "atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn")
public void exportingWithSameParameters_Succeeds() {
- ZipSource source = runExportWithParameters(exportService, getIncrementalRequest(nextTimestamp));
+ InputStream inputStream = runExportWithParameters(exportService, getIncrementalRequest(nextTimestamp));
- assertNotNull(source);
+ assertNotNull(getZipSourceFromInputStream(inputStream));
}
@Test
public void connectedExport() {
- ZipSource source = runExportWithParameters(exportService, getConnected());
+ InputStream inputStream = runExportWithParameters(exportService, getConnected());
+ ZipSource source = getZipSourceFromInputStream(inputStream);
UniqueList<String> creationOrder = new UniqueList<>();
List<String> zipCreationOrder = source.getCreationOrder();
creationOrder.addAll(zipCreationOrder);
@@ -199,27 +218,29 @@ public class ExportIncrementalTest extends ExportImportTestBase {
}
@Test(dataProvider = "hiveDb")
- public void importHiveDb(ZipSource zipSource) throws AtlasBaseException, IOException {
- runImportWithNoParameters(importService, zipSource);
+ public void importHiveDb(InputStream stream) throws AtlasBaseException, IOException {
+ runImportWithNoParameters(importService, stream);
}
@Test(dependsOnMethods = "importHiveDb")
- public void exportTableInrementalConnected() throws AtlasBaseException {
- ZipSource source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, 0, true));
- verifyExpectedEntities(getFileNames(source), GUID_DB, GUID_TABLE_CTAS_2);
+ public void exportTableInrementalConnected() throws AtlasBaseException, IOException {
+ InputStream source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, 0, true));
- nextTimestamp = updateTimesampForNextIncrementalExport(source);
+ ZipSource sourceCopy = getZipSourceCopy(source);
+ verifyExpectedEntities(getFileNames(sourceCopy), GUID_DB, GUID_TABLE_CTAS_2);
+
+ nextTimestamp = updateTimesampForNextIncrementalExport(sourceCopy);
try {
source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, nextTimestamp, true));
- }catch (SkipException e){
-
+ } catch (SkipException e) {
+ throw e;
}
entityStore.addClassifications(GUID_TABLE_CTAS_2, ImmutableList.of(classificationTypeT1.createDefaultValue()));
source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, nextTimestamp, true));
- verifyExpectedEntities(getFileNames(source), GUID_TABLE_CTAS_2);
+ verifyExpectedEntities(getFileNames(getZipSourceCopy(source)), GUID_TABLE_CTAS_2);
}
@@ -280,4 +301,11 @@ public class ExportIncrementalTest extends ExportImportTestBase {
}
return ret;
}
+
+ private ZipSource getZipSourceCopy(InputStream is) throws IOException, AtlasBaseException {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ IOUtils.copy(is, baos);
+
+ return new ZipSource(new ByteArrayInputStream(baos.toByteArray()));
+ }
}
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java
index eaf4602..cc49ce7 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java
@@ -37,6 +37,7 @@ import org.testng.annotations.Test;
import javax.inject.Inject;
import java.io.IOException;
+import java.io.InputStream;
import java.util.Map;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel;
@@ -86,11 +87,13 @@ public class ExportSkipLineageTest extends ExportImportTestBase {
}
@Test
- public void exportWithoutLineage() {
+ public void exportWithoutLineage() throws IOException, AtlasBaseException {
final int expectedEntityCount = 3;
AtlasExportRequest request = getRequest();
- ZipSource source = runExportWithParameters(exportService, request);
+ InputStream inputStream = runExportWithParameters(exportService, request);
+
+ ZipSource source = new ZipSource(inputStream);
AtlasEntity.AtlasEntityWithExtInfo entities = ZipFileResourceTestUtils.getEntities(source, expectedEntityCount);
int count = 0;
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
index 693a163..2aee233 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
@@ -18,15 +18,11 @@
package org.apache.atlas.repository.impexp;
import com.google.inject.Inject;
-import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasConstants;
import org.apache.atlas.AtlasErrorCode;
-import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContextV1;
import org.apache.atlas.TestModules;
import org.apache.atlas.TestUtilsV2;
import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasClassificationType;
@@ -43,16 +39,23 @@ import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.IOException;
+import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
-import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.*;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getDefaultImportRequest;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromJson;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromResourcesJson;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runAndVerifyQuickStart_v1_Import;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParametersUsingBackingDirectory;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithParameters;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
@Guice(modules = TestModules.TestOnlyModule.class)
public class ImportServiceTest extends ExportImportTestBase {
@@ -90,9 +93,9 @@ public class ImportServiceTest extends ExportImportTestBase {
}
@Test(dataProvider = "sales")
- public void importDB1(ZipSource zipSource) throws AtlasBaseException, IOException {
+ public void importDB1(InputStream inputStream) throws AtlasBaseException, IOException {
loadBaseModel();
- runAndVerifyQuickStart_v1_Import(importService, zipSource);
+ runAndVerifyQuickStart_v1_Import(importService, inputStream);
}
@DataProvider(name = "reporting")
@@ -101,9 +104,9 @@ public class ImportServiceTest extends ExportImportTestBase {
}
@Test(dataProvider = "reporting")
- public void importDB2(ZipSource zipSource) throws AtlasBaseException, IOException {
+ public void importDB2(InputStream inputStream) throws AtlasBaseException, IOException {
loadBaseModel();
- runAndVerifyQuickStart_v1_Import(importService, zipSource);
+ runAndVerifyQuickStart_v1_Import(importService, inputStream);
}
@DataProvider(name = "logging")
@@ -112,9 +115,9 @@ public class ImportServiceTest extends ExportImportTestBase {
}
@Test(dataProvider = "logging")
- public void importDB3(ZipSource zipSource) throws AtlasBaseException, IOException {
+ public void importDB3(InputStream inputStream) throws AtlasBaseException, IOException {
loadBaseModel();
- runAndVerifyQuickStart_v1_Import(importService, zipSource);
+ runAndVerifyQuickStart_v1_Import(importService, inputStream);
}
@DataProvider(name = "salesNewTypeAttrs")
@@ -123,9 +126,9 @@ public class ImportServiceTest extends ExportImportTestBase {
}
@Test(dataProvider = "salesNewTypeAttrs", dependsOnMethods = "importDB1")
- public void importDB4(ZipSource zipSource) throws AtlasBaseException, IOException {
+ public void importDB4(InputStream inputStream) throws AtlasBaseException, IOException {
loadBaseModel();
- runImportWithParameters(importService, getDefaultImportRequest(), zipSource);
+ runImportWithParameters(importService, getDefaultImportRequest(), inputStream);
}
@DataProvider(name = "salesNewTypeAttrs-next")
@@ -135,7 +138,7 @@ public class ImportServiceTest extends ExportImportTestBase {
@Test(dataProvider = "salesNewTypeAttrs-next", dependsOnMethods = "importDB4")
- public void importDB5(ZipSource zipSource) throws AtlasBaseException, IOException {
+ public void importDB5(InputStream inputStream) throws AtlasBaseException, IOException {
final String newEnumDefName = "database_action";
assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName));
@@ -145,13 +148,13 @@ public class ImportServiceTest extends ExportImportTestBase {
options.put("updateTypeDefinition", "false");
request.setOptions(options);
- runImportWithParameters(importService, request, zipSource);
+ runImportWithParameters(importService, request, inputStream);
assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName));
assertEquals(typeDefStore.getEnumDefByName(newEnumDefName).getElementDefs().size(), 4);
}
@Test(dataProvider = "salesNewTypeAttrs-next", dependsOnMethods = "importDB4")
- public void importDB6(ZipSource zipSource) throws AtlasBaseException, IOException {
+ public void importDB6(InputStream inputStream) throws AtlasBaseException, IOException {
final String newEnumDefName = "database_action";
assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName));
@@ -161,7 +164,7 @@ public class ImportServiceTest extends ExportImportTestBase {
options.put("updateTypeDefinition", "true");
request.setOptions(options);
- runImportWithParameters(importService, request, zipSource);
+ runImportWithParameters(importService, request, inputStream);
assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName));
assertEquals(typeDefStore.getEnumDefByName(newEnumDefName).getElementDefs().size(), 8);
}
@@ -172,11 +175,11 @@ public class ImportServiceTest extends ExportImportTestBase {
}
@Test(dataProvider = "ctas")
- public void importCTAS(ZipSource zipSource) throws IOException, AtlasBaseException {
+ public void importCTAS(InputStream inputStream) throws IOException, AtlasBaseException {
loadBaseModel();
loadHiveModel();
- runImportWithNoParameters(importService, zipSource);
+ runImportWithNoParameters(importService, inputStream);
}
@DataProvider(name = "hdfs_path1")
@@ -186,13 +189,13 @@ public class ImportServiceTest extends ExportImportTestBase {
@Test(dataProvider = "hdfs_path1", expectedExceptions = AtlasBaseException.class)
- public void importHdfs_path1(ZipSource zipSource) throws IOException, AtlasBaseException {
+ public void importHdfs_path1(InputStream inputStream) throws IOException, AtlasBaseException {
loadBaseModel();
loadFsModel();
loadModelFromResourcesJson("tag1.json", typeDefStore, typeRegistry);
try {
- runImportWithNoParameters(importService, zipSource);
+ runImportWithNoParameters(importService, inputStream);
} catch (AtlasBaseException e) {
assertEquals(e.getAtlasErrorCode(), AtlasErrorCode.INVALID_IMPORT_ATTRIBUTE_TYPE_CHANGED);
AtlasClassificationType tag1 = typeRegistry.getClassificationTypeByName("tag1");
@@ -202,6 +205,14 @@ public class ImportServiceTest extends ExportImportTestBase {
}
}
+ @Test(dataProvider = "ctas")
+ public void importWithBackingDirectory(InputStream inputStream) throws IOException, AtlasBaseException {
+ loadBaseModel();
+ loadFsModel();
+
+ runImportWithNoParametersUsingBackingDirectory(importService, inputStream);
+ }
+
@Test
public void importServiceProcessesIOException() {
ImportService importService = new ImportService(typeDefStore, typeRegistry, null, null,null);
@@ -238,11 +249,12 @@ public class ImportServiceTest extends ExportImportTestBase {
}
@Test(dataProvider = "salesNewTypeAttrs-next")
- public void transformUpdatesForSubTypes(ZipSource zipSource) throws IOException, AtlasBaseException {
+ public void transformUpdatesForSubTypes(InputStream inputStream) throws IOException, AtlasBaseException {
loadModelFromJson("0010-base_model.json", typeDefStore, typeRegistry);
loadModelFromJson("0030-hive_model.json", typeDefStore, typeRegistry);
String transformJSON = "{ \"Asset\": { \"qualifiedName\":[ \"lowercase\", \"replace:@cl1:@cl2\" ] } }";
+ ZipSource zipSource = new ZipSource(inputStream);
importService.setImportTransform(zipSource, transformJSON);
ImportTransforms importTransforms = zipSource.getImportTransform();
@@ -252,11 +264,12 @@ public class ImportServiceTest extends ExportImportTestBase {
}
@Test(dataProvider = "salesNewTypeAttrs-next")
- public void transformUpdatesForSubTypesAddsToExistingTransforms(ZipSource zipSource) throws IOException, AtlasBaseException {
+ public void transformUpdatesForSubTypesAddsToExistingTransforms(InputStream inputStream) throws IOException, AtlasBaseException {
loadModelFromJson("0010-base_model.json", typeDefStore, typeRegistry);
loadModelFromJson("0030-hive_model.json", typeDefStore, typeRegistry);
String transformJSON = "{ \"Asset\": { \"qualifiedName\":[ \"replace:@cl1:@cl2\" ] }, \"hive_table\": { \"qualifiedName\":[ \"lowercase\" ] } }";
+ ZipSource zipSource = new ZipSource(inputStream);
importService.setImportTransform(zipSource, transformJSON);
ImportTransforms importTransforms = zipSource.getImportTransform();
@@ -266,13 +279,8 @@ public class ImportServiceTest extends ExportImportTestBase {
assertEquals(importTransforms.getTransforms().get("hive_table").get("qualifiedName").size(), 2);
}
- @Test(dataProvider = "empty-zip", expectedExceptions = AtlasBaseException.class)
- public void importEmptyZip(ZipSource zipSource) {
-
- }
-
@Test(expectedExceptions = AtlasBaseException.class)
public void importEmptyZip() throws IOException, AtlasBaseException {
- getZipSource("empty.zip");
+ new ZipSource((InputStream) getZipSource("empty.zip")[0][0]);
}
}
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java
index 06bdaa6..78fdaca 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java
@@ -66,7 +66,7 @@ public class ImportTransformsShaperTest extends ExportImportTestBase {
public void newTagIsCreatedAndEntitiesAreTagged() throws AtlasBaseException, IOException {
AtlasImportResult result = ZipFileResourceTestUtils.runImportWithParameters(importService,
getImporRequest(),
- ZipFileResourceTestUtils.getZipSourceFrom("stocks.zip"));
+ ZipFileResourceTestUtils.getInputStreamFrom("stocks.zip"));
AtlasClassificationType classification = typeRegistry.getClassificationTypeByName(TAG_NAME);
assertNotNull(classification);
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
index 5e909e1..5315535 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
@@ -24,10 +24,10 @@ import org.apache.atlas.RequestContextV1;
import org.apache.atlas.TestModules;
import org.apache.atlas.TestUtilsV2;
import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.impexp.AtlasServer;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AtlasServer;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.repository.Constants;
@@ -37,6 +37,7 @@ import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.TestResourceFileUtils;
+import org.apache.commons.io.IOUtils;
import org.testng.SkipException;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
@@ -44,7 +45,10 @@ import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.inject.Inject;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.util.List;
import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_KEY_REPLICATED_TO;
@@ -84,7 +88,7 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
@Inject
private AtlasEntityStoreV1 entityStore;
- private ZipSource zipSource;
+ private InputStream inputStream;
@BeforeClass
public void setup() throws IOException, AtlasBaseException {
@@ -103,13 +107,19 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
}
@Test
- public void exportWithReplicationToOption_AddsClusterObjectIdToReplicatedFromAttribute() throws AtlasBaseException {
+ public void exportWithReplicationToOption_AddsClusterObjectIdToReplicatedFromAttribute() throws AtlasBaseException, IOException {
final int expectedEntityCount = 2;
AtlasExportRequest request = getUpdateMetaInfoUpdateRequest();
- zipSource = runExportWithParameters(exportService, request);
+ InputStream inputStream = runExportWithParameters(exportService, request);
+
+ assertNotNull(inputStream);
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ IOUtils.copy(inputStream, baos);
+ this.inputStream = new ByteArrayInputStream(baos.toByteArray());
- assertNotNull(zipSource);
+ ZipSource zipSource = new ZipSource(new ByteArrayInputStream(baos.toByteArray()));
assertNotNull(zipSource.getCreationOrder());
assertEquals(zipSource.getCreationOrder().size(), expectedEntityCount);
@@ -135,7 +145,7 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
@Test(dependsOnMethods = "exportWithReplicationToOption_AddsClusterObjectIdToReplicatedFromAttribute")
public void importWithReplicationFromOption_AddsClusterObjectIdToReplicatedFromAttribute() throws AtlasBaseException, IOException {
AtlasImportRequest request = getImportRequestWithReplicationOption();
- AtlasImportResult importResult = runImportWithParameters(importService, request, zipSource);
+ AtlasImportResult importResult = runImportWithParameters(importService, request, inputStream);
assertCluster(
AuditsWriter.getServerNameFromFullName(REPLICATED_FROM_CLUSTER_NAME),
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
index a2a5f58..4b3af4c 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
@@ -33,12 +33,13 @@ import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer;
import org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1;
import org.apache.atlas.repository.store.graph.v1.AtlasEntityStreamForImport;
+import org.apache.atlas.repository.store.graph.v1.EntityImportStream;
import org.apache.atlas.store.AtlasTypeDefStore;
-import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.TestResourceFileUtils;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,6 +50,8 @@ import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -90,19 +93,11 @@ public class ZipFileResourceTestUtils {
}
public static Object[][] getZipSource(String fileName) throws IOException, AtlasBaseException {
- return new Object[][]{{getZipSourceFrom(fileName)}};
+ return new Object[][]{{getInputStreamFrom(fileName)}};
}
- public static ZipSource getZipSourceFrom(String fileName) throws IOException, AtlasBaseException {
- FileInputStream fs = ZipFileResourceTestUtils.getFileInputStream(fileName);
-
- return new ZipSource(fs);
- }
-
- private static ZipSource getZipSourceFrom(ByteArrayOutputStream baos) throws IOException, AtlasBaseException {
- ByteArrayInputStream bis = new ByteArrayInputStream(baos.toByteArray());
- ZipSource zipSource = new ZipSource(bis);
- return zipSource;
+ public static InputStream getInputStreamFrom(String fileName) {
+ return ZipFileResourceTestUtils.getFileInputStream(fileName);
}
public static void verifyImportedEntities(List<String> creationOrder, List<String> processedEntities) {
@@ -163,7 +158,7 @@ public class ZipFileResourceTestUtils {
}
}
- public static ZipSource runExportWithParameters(ExportService exportService, AtlasExportRequest request) {
+ public static InputStream runExportWithParameters(ExportService exportService, AtlasExportRequest request) {
final String requestingIP = "1.0.0.0";
final String hostName = "localhost";
final String userName = "admin";
@@ -176,7 +171,7 @@ public class ZipFileResourceTestUtils {
assertEquals(result.getOperationStatus(), AtlasExportResult.OperationStatus.SUCCESS);
zipSink.close();
- return getZipSourceFrom(baos);
+ return new ByteArrayInputStream(baos.toByteArray());
}
catch(Exception ex) {
throw new SkipException(String.format("runExportWithParameters: %s: failed!", request.toString()));
@@ -253,27 +248,42 @@ public class ZipFileResourceTestUtils {
}
- public static AtlasImportResult runImportWithParameters(ImportService importService, AtlasImportRequest request, ZipSource source) throws AtlasBaseException, IOException {
+ public static AtlasImportResult runImportWithParameters(ImportService importService, AtlasImportRequest request, InputStream inputStream) throws AtlasBaseException, IOException {
+ final String requestingIP = "1.0.0.0";
+ final String hostName = "localhost";
+ final String userName = "admin";
+
+ AtlasImportResult result = importService.run(inputStream, request, userName, hostName, requestingIP);
+ assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS);
+ return result;
+ }
+
+ public static AtlasImportResult runImportWithNoParameters(ImportService importService, InputStream inputStream) throws AtlasBaseException, IOException {
final String requestingIP = "1.0.0.0";
final String hostName = "localhost";
final String userName = "admin";
- AtlasImportResult result = importService.run(source, request, userName, hostName, requestingIP);
+ AtlasImportResult result = importService.run(inputStream, userName, hostName, requestingIP);
assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS);
return result;
}
- public static AtlasImportResult runImportWithNoParameters(ImportService importService, ZipSource source) throws AtlasBaseException, IOException {
+ public static AtlasImportResult runImportWithNoParametersUsingBackingDirectory(ImportService importService, InputStream inputStream) throws AtlasBaseException, IOException {
final String requestingIP = "1.0.0.0";
final String hostName = "localhost";
final String userName = "admin";
- AtlasImportResult result = importService.run(source, userName, hostName, requestingIP);
+ EntityImportStream sourceWithBackingDirectory = new ZipSourceWithBackingDirectory(inputStream, Files.createTempDirectory("temp").toString());
+ AtlasImportResult result = importService.run(sourceWithBackingDirectory, new AtlasImportRequest(), userName, hostName, requestingIP);
assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS);
return result;
}
- public static void runAndVerifyQuickStart_v1_Import(ImportService importService, ZipSource zipSource) throws AtlasBaseException, IOException {
+ public static void runAndVerifyQuickStart_v1_Import(ImportService importService, InputStream is) throws AtlasBaseException, IOException {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ IOUtils.copy(is, baos);
+
+ ZipSource zipSource = new ZipSource(new ByteArrayInputStream(baos.toByteArray()));
AtlasExportResult exportResult = zipSource.getExportResult();
List<String> creationOrder = zipSource.getCreationOrder();
@@ -281,7 +291,7 @@ public class ZipFileResourceTestUtils {
RequestContextV1.get().setUser(TestUtilsV2.TEST_USER);
AtlasImportRequest request = getDefaultImportRequest();
- AtlasImportResult result = runImportWithParameters(importService, request, zipSource);
+ AtlasImportResult result = runImportWithParameters(importService, request, new ByteArrayInputStream(baos.toByteArray()));
assertNotNull(result);
verifyImportedMetrics(exportResult, result);
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java
index 7436dc0..46164e8 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java
@@ -28,6 +28,7 @@ import org.testng.annotations.Test;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.util.List;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource;
@@ -127,7 +128,8 @@ public class ZipSourceTest {
}
@Test(dataProvider = "sales")
- public void iteratorSetPositionBehavor(ZipSource zipSource) throws IOException, AtlasBaseException {
+ public void iteratorSetPositionBehavor(InputStream inputStream) throws IOException, AtlasBaseException {
+ ZipSource zipSource = new ZipSource(inputStream);
Assert.assertTrue(zipSource.hasNext());
List<String> creationOrder = zipSource.getCreationOrder();
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index 8417e7e..d998bd3 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -19,7 +19,6 @@
package org.apache.atlas.web.resources;
import com.sun.jersey.multipart.FormDataParam;
-
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasErrorCode;
@@ -28,11 +27,11 @@ import org.apache.atlas.authorize.AtlasResourceTypes;
import org.apache.atlas.authorize.simple.AtlasAuthorizationUtils;
import org.apache.atlas.discovery.SearchContext;
import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.impexp.AtlasServer;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AtlasServer;
import org.apache.atlas.model.impexp.ExportImportAuditEntry;
import org.apache.atlas.model.instance.AtlasCheckStateRequest;
import org.apache.atlas.model.instance.AtlasCheckStateResult;
@@ -42,7 +41,6 @@ import org.apache.atlas.repository.impexp.ExportImportAuditService;
import org.apache.atlas.repository.impexp.ExportService;
import org.apache.atlas.repository.impexp.ImportService;
import org.apache.atlas.repository.impexp.ZipSink;
-import org.apache.atlas.repository.impexp.ZipSource;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.services.MetricsService;
import org.apache.atlas.type.AtlasType;
@@ -390,9 +388,8 @@ public class AdminResource {
try {
AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class);
- ZipSource zipSource = new ZipSource(inputStream);
- result = importService.run(zipSource, request, Servlets.getUserName(httpServletRequest),
+ result = importService.run(inputStream, request, Servlets.getUserName(httpServletRequest),
Servlets.getHostName(httpServletRequest),
AtlasAuthorizationUtils.getRequestIpAddress(httpServletRequest));
} catch (AtlasBaseException excp) {