You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2020/04/10 20:46:23 UTC
[atlas] branch master updated: ATLAS-3679 : Bulk import Business
Metadata attribute.
This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new fa2e5b4 ATLAS-3679 : Bulk import Business Metadata attribute.
fa2e5b4 is described below
commit fa2e5b49aa0d71464e850d631bef2640b2e4ef55
Author: mayanknj <ma...@freestoneinfotech.com>
AuthorDate: Mon Mar 30 10:11:41 2020 +0530
ATLAS-3679 : Bulk import Business Metadata attribute.
Signed-off-by: Sarath Subramanian <sa...@apache.org>
---
.../org/apache/atlas/repository/Constants.java | 4 +-
.../main/java/org/apache/atlas/AtlasErrorCode.java | 2 +
.../atlas/bulkimport/BulkImportResponse.java | 154 +++++++++++++
.../test/java/org/apache/atlas/TestUtilsV2.java | 44 +++-
.../repository/store/graph/AtlasEntityStore.java | 11 +
.../store/graph/v2/AtlasEntityStoreV2.java | 243 ++++++++++++++++++++-
.../store/graph/v2/AtlasGraphUtilsV2.java | 131 ++++++++++-
.../main/java/org/apache/atlas/util/FileUtils.java | 22 +-
.../store/graph/v2/AtlasEntityStoreV2Test.java | 131 ++++++++---
.../test/resources/csvFiles/incorrectFile_2.csv | 2 +
.../src/test/resources/csvFiles/template_1.csv | 2 +-
.../src/test/resources/csvFiles/template_2.csv | 2 +
.../java/org/apache/atlas/web/rest/EntityREST.java | 51 ++++-
13 files changed, 752 insertions(+), 47 deletions(-)
diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java
index 42600f2..a71787b 100644
--- a/common/src/main/java/org/apache/atlas/repository/Constants.java
+++ b/common/src/main/java/org/apache/atlas/repository/Constants.java
@@ -202,9 +202,9 @@ public final class Constants {
public static final Integer INCOMPLETE_ENTITY_VALUE = Integer.valueOf(1);
/*
- * All supported file-format extensions for AtlasGlossaryTerms file upload
+ * All supported file-format extensions for Bulk Imports through file upload
*/
- public enum GlossaryImportSupportedFileExtensions { XLSX, XLS, CSV }
+ public enum SupportedFileExtensions { XLSX, XLS, CSV }
private Constants() {
}
diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
index ff56402..16947ec 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
@@ -168,6 +168,7 @@ public enum AtlasErrorCode {
BUSINESS_METADATA_ATTRIBUTE_DOES_NOT_EXIST(400, "ATLAS-400-00-096", "Business-metadata attribute does not exist in entity: {0}"),
BUSINESS_METADATA_ATTRIBUTE_ALREADY_EXISTS(400, "ATLAS-400-00-097", "Business-metadata attribute already exists in entity: {0}"),
INVALID_FILE_TYPE(400, "ATLAS-400-00-98", "The provided file type {0} is not supported."),
+ INVALID_BUSINESS_ATTRIBUTES_IMPORT_DATA(400, "ATLAS-400-00-99","The uploaded file was not processed due to following errors : {0}"),
UNAUTHORIZED_ACCESS(403, "ATLAS-403-00-001", "{0} is not authorized to perform {1}"),
@@ -192,6 +193,7 @@ public enum AtlasErrorCode {
INSTANCE_GUID_DELETED(404, "ATLAS-404-00-012", "Given instance guid {0} has been deleted"),
NO_PROPAGATED_CLASSIFICATIONS_FOUND_FOR_ENTITY(404, "ATLAS-404-00-013", "No propagated classifications associated with entity: {0}"),
NO_DATA_FOUND(404, "ATLAS-404-00-014", "No data found in the uploaded file"),
+ FILE_NAME_NOT_FOUND(404, "ATLAS-404-00-015", "File name should not be blank"),
// All data conflict errors go here
TYPE_ALREADY_EXISTS(409, "ATLAS-409-00-001", "Given type {0} already exists"),
diff --git a/intg/src/main/java/org/apache/atlas/bulkimport/BulkImportResponse.java b/intg/src/main/java/org/apache/atlas/bulkimport/BulkImportResponse.java
new file mode 100644
index 0000000..0ee54e9
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/bulkimport/BulkImportResponse.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.bulkimport;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class BulkImportResponse {
+
+ private List<ImportInfo> failedImportInfoList = new ArrayList<ImportInfo>();
+ private List<ImportInfo> successImportInfoList = new ArrayList<ImportInfo>();
+
+ public BulkImportResponse() {}
+
+ public List<ImportInfo> getFailedImportInfoList() {
+ return failedImportInfoList;
+ }
+
+ public void setFailedImportInfoList(List<ImportInfo> failedImportInfoList){
+ this.failedImportInfoList = failedImportInfoList;
+ }
+
+ public void setFailedImportInfoList(ImportInfo importInfo){
+ List<ImportInfo> failedImportInfoList = this.failedImportInfoList;
+
+ if (failedImportInfoList == null) {
+ failedImportInfoList = new ArrayList<>();
+ }
+ failedImportInfoList.add(importInfo);
+ setFailedImportInfoList(failedImportInfoList);
+ }
+
+ public List<ImportInfo> getSuccessImportInfoList() {
+ return successImportInfoList;
+ }
+
+ public void setSuccessImportInfoList(List<ImportInfo> successImportInfoList){
+ this.successImportInfoList = successImportInfoList;
+ }
+
+ public void setSuccessImportInfoList(ImportInfo importInfo){
+ List<ImportInfo> successImportInfoList = this.successImportInfoList;
+
+ if (successImportInfoList == null) {
+ successImportInfoList = new ArrayList<>();
+ }
+ successImportInfoList.add(importInfo);
+ setSuccessImportInfoList(successImportInfoList);
+ }
+
+ public enum ImportStatus {
+ SUCCESS, FAILED
+ }
+
+ @Override
+ public String toString() {
+ return "BulkImportResponse{" +
+ "failedImportInfoList=" + failedImportInfoList +
+ ", successImportInfoList=" + successImportInfoList +
+ '}';
+ }
+
+ static public class ImportInfo {
+
+ private String parentObjectName;
+ private String childObjectName;
+ private ImportStatus importStatus;
+ private String remarks;
+ private Integer rowNumber;
+
+ public ImportInfo(String parentObjectName, String childObjectName, ImportStatus importStatus, String remarks, Integer rowNumber) {
+ this.parentObjectName = parentObjectName;
+ this.childObjectName = childObjectName;
+ this.importStatus = importStatus;
+ this.remarks = remarks;
+ this.rowNumber = rowNumber;
+ }
+
+ public ImportInfo(String parentObjectName, String childObjectName, ImportStatus importStatus) {
+ this(parentObjectName, childObjectName, importStatus, "",-1);
+ }
+
+ public ImportInfo( ImportStatus importStatus, String remarks, Integer rowNumber) {
+ this("","", importStatus, remarks, rowNumber);
+ }
+
+ public ImportInfo(String parentObjectName, String childObjectName) {
+ this(parentObjectName,childObjectName, ImportStatus.SUCCESS, "", -1);
+ }
+
+ public ImportInfo(String parentObjectName, String childObjectName, ImportStatus importStatus, String remarks) {
+ this(parentObjectName, childObjectName, importStatus, remarks, -1);
+ }
+
+ public String getParentObjectName() {
+ return parentObjectName;
+ }
+
+ public void setParentObjectName(String parentObjectName) {
+ this.parentObjectName = parentObjectName;
+ }
+
+ public String getChildObjectName() {
+ return childObjectName;
+ }
+
+ public void setChildObjectName(String childObjectName) {
+ this.childObjectName = childObjectName;
+ }
+
+ public String getRemarks() {
+ return remarks;
+ }
+
+ public void setRemarks(String remarks) {
+ this.remarks = remarks;
+ }
+
+ public ImportStatus getImportStatus() {
+ return importStatus;
+ }
+
+ public void setImportStatus(ImportStatus importStatus) {
+ this.importStatus = importStatus;
+ }
+
+ @Override
+ public String toString() {
+ return "ImportInfo{" +
+ "parentObjectName='" + parentObjectName + '\'' +
+ ", childObjectName='" + childObjectName + '\'' +
+ ", remarks='" + remarks + '\'' +
+ ", importStatus=" + importStatus +
+ ", rowNumber=" + rowNumber +
+ '}';
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/intg/src/test/java/org/apache/atlas/TestUtilsV2.java b/intg/src/test/java/org/apache/atlas/TestUtilsV2.java
index 55497fc..2b9cf6e 100755
--- a/intg/src/test/java/org/apache/atlas/TestUtilsV2.java
+++ b/intg/src/test/java/org/apache/atlas/TestUtilsV2.java
@@ -35,8 +35,17 @@ import org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.RandomStringUtils;
-
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
@@ -66,7 +75,7 @@ import static org.apache.atlas.type.AtlasTypeUtil.createBusinessMetadataDef;
* Test utility class.
*/
public final class TestUtilsV2 {
-
+ private static final Logger LOG = LoggerFactory.getLogger(TestUtilsV2.class);
public static final long TEST_DATE_IN_LONG = 1418265358440L;
public static final String TEST_USER = "testUser";
@@ -1545,4 +1554,35 @@ public final class TestUtilsV2 {
typeDef.setCreatedBy(TestUtilsV2.TEST_USER);
typeDef.setUpdatedBy(TestUtilsV2.TEST_USER);
}
+
+ public static InputStream getFile(String subDir, String fileName){
+ final String userDir = System.getProperty("user.dir");
+ String filePath = getTestFilePath(userDir, subDir, fileName);
+ File file = new File(filePath);
+ InputStream fs = null;
+
+ try {
+ fs = new FileInputStream(file);
+ } catch (FileNotFoundException e) {
+ LOG.error("File could not be found at: {}", filePath, e);
+ }
+
+ return fs;
+ }
+
+ public static String getFileData(String subDir, String fileName)throws IOException {
+ final String userDir = System.getProperty("user.dir");
+ String filePath = getTestFilePath(userDir, subDir, fileName);
+ File f = new File(filePath);
+ String ret = FileUtils.readFileToString(f, "UTF-8");
+ return ret;
+ }
+
+ private static String getTestFilePath(String startPath, String subDir, String fileName) {
+ if (StringUtils.isNotEmpty(subDir)) {
+ return startPath + "/src/test/resources/" + subDir + "/" + fileName;
+ } else {
+ return startPath + "/src/test/resources/" + fileName;
+ }
+ }
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
index 834c9d1..7b9455e 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
@@ -29,7 +29,9 @@ import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.store.graph.v2.EntityStream;
import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.bulkimport.BulkImportResponse;
+import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -282,4 +284,13 @@ public interface AtlasEntityStore {
* Add given labels to the given entity, if labels is null/empty, no labels will be added.
*/
void addLabels(String guid, Set<String> labels) throws AtlasBaseException;
+
+ /**
+ *
+ * @param inputStream
+ * @param fileName
+ * @throws AtlasBaseException
+ *
+ */
+ BulkImportResponse bulkCreateOrUpdateBusinessAttributes(InputStream inputStream, String fileName) throws AtlasBaseException;
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index 195c32b..379150b 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -40,19 +40,26 @@ import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasEntityHeaders;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
import org.apache.atlas.store.DeleteType;
+import org.apache.atlas.type.AtlasArrayType;
+import org.apache.atlas.type.AtlasBusinessMetadataType.AtlasBusinessAttribute;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
-import org.apache.atlas.type.AtlasBusinessMetadataType.AtlasBusinessAttribute;
+import org.apache.atlas.type.AtlasEnumType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.bulkimport.BulkImportResponse;
+import org.apache.atlas.util.FileUtils;
import org.apache.atlas.utils.AtlasEntityUtil;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.atlas.utils.AtlasPerfTracer;
@@ -64,9 +71,13 @@ import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
+import java.io.IOException;
+import java.io.InputStream;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -74,7 +85,9 @@ import java.util.Objects;
import java.util.Set;
import static java.lang.Boolean.FALSE;
-import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.*;
+import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
+import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PURGE;
+import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
import static org.apache.atlas.repository.Constants.IS_INCOMPLETE_PROPERTY_KEY;
import static org.apache.atlas.repository.graph.GraphHelper.getCustomAttributes;
import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
@@ -94,6 +107,7 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
private final EntityGraphMapper entityGraphMapper;
private final EntityGraphRetriever entityRetriever;
+
@Inject
public AtlasEntityStoreV2(DeleteHandlerDelegate deleteDelegate, AtlasTypeRegistry typeRegistry,
IAtlasEntityChangeNotifier entityChangeNotifier, EntityGraphMapper entityGraphMapper) {
@@ -1531,4 +1545,229 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, messages);
}
}
+
+ @Override
+ @GraphTransaction
+ public BulkImportResponse bulkCreateOrUpdateBusinessAttributes(InputStream inputStream, String fileName) throws AtlasBaseException {
+ BulkImportResponse ret = new BulkImportResponse();
+ try {
+ if (StringUtils.isBlank(fileName)) {
+ throw new AtlasBaseException(AtlasErrorCode.FILE_NAME_NOT_FOUND, fileName);
+ }
+ List<String[]> fileData = FileUtils.readFileData(fileName, inputStream);
+
+ Map<String, AtlasEntity> attributesToAssociate = getBusinessMetadataDefList(fileData, ret);
+
+ for (Map.Entry<String, AtlasEntity> entry : attributesToAssociate.entrySet()) {
+ AtlasEntity entity = entry.getValue();
+ try{
+ addOrUpdateBusinessAttributes(entity.getGuid(), entity.getBusinessAttributes(), true);
+ BulkImportResponse.ImportInfo successImportInfo = new BulkImportResponse.ImportInfo(entity.getAttribute(Constants.QUALIFIED_NAME).toString(), entity.getBusinessAttributes().toString());
+ ret.setSuccessImportInfoList(successImportInfo);
+ }catch(Exception e){
+ LOG.error("Error occurred while updating BusinessMetadata Attributes for Entity "+entity.getAttribute(Constants.QUALIFIED_NAME).toString());
+ BulkImportResponse.ImportInfo failedImportInfo = new BulkImportResponse.ImportInfo(entity.getAttribute(Constants.QUALIFIED_NAME).toString(), entity.getBusinessAttributes().toString(), BulkImportResponse.ImportStatus.FAILED, e.getMessage());
+ ret.setFailedImportInfoList(failedImportInfo);
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("An Exception occurred while uploading the file : "+e.getMessage());
+ throw new AtlasBaseException(AtlasErrorCode.FAILED_TO_UPLOAD, e);
+ }
+
+ return ret;
+ }
+
+ private Map<String, AtlasEntity> getBusinessMetadataDefList(List<String[]> fileData, BulkImportResponse bulkImportResponse) throws AtlasBaseException {
+ Map<String, AtlasEntity> ret = new HashMap<>();
+ Map<String, Map<String, Object>> newBMAttributes = new HashMap<>();
+ Map<String, AtlasVertex> vertexCache = new HashMap<>();
+ Map<String, Object> attribute = new HashMap<>();
+
+ for (int lineIndex = 0; lineIndex < fileData.size(); lineIndex++) {
+ List<String> failedTermMsgList = new ArrayList<>();
+ AtlasEntity atlasEntity = new AtlasEntity();
+ String[] record = fileData.get(lineIndex);
+ if (missingFieldsCheck(record, bulkImportResponse, lineIndex+1)) {
+ continue;
+ }
+ String typeName = record[FileUtils.TYPENAME_COLUMN_INDEX];
+ String uniqueAttrValue = record[FileUtils.UNIQUE_ATTR_VALUE_COLUMN_INDEX];
+ String bmAttribute = record[FileUtils.BM_ATTR_NAME_COLUMN_INDEX];
+ String bmAttributeValue = record[FileUtils.BM_ATTR_VALUE_COLUMN_INDEX];
+ String uniqueAttrName = Constants.QUALIFIED_NAME;
+ if (record.length > FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX) {
+ uniqueAttrName = typeName+"."+record[FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX];
+ }
+
+ if (validateTypeName(typeName, bulkImportResponse, lineIndex+1)) {
+ continue;
+ }
+
+ String vertexKey = typeName + "_" + uniqueAttrName + "_" + uniqueAttrValue;
+ AtlasVertex atlasVertex = vertexCache.get(vertexKey);
+ if (atlasVertex == null) {
+ atlasVertex = AtlasGraphUtilsV2.findByTypeAndUniquePropertyName(typeName, uniqueAttrName, uniqueAttrValue);
+ }
+
+ if (atlasVertex == null) {
+ LOG.error("Provided UniqueAttributeValue is not valid : " + uniqueAttrValue + " at line #" + (lineIndex + 1));
+ failedTermMsgList.add("Provided UniqueAttributeValue is not valid : " + uniqueAttrValue + " at line #" + (lineIndex + 1));
+ }
+
+ vertexCache.put(vertexKey, atlasVertex);
+ String[] businessAttributeName = bmAttribute.split(FileUtils.ESCAPE_CHARACTER + ".");
+ if (validateBMAttributeName(uniqueAttrValue,bmAttribute,bulkImportResponse,lineIndex+1)) {
+ continue;
+ }
+
+ String bMName = businessAttributeName[0];
+ String bMAttributeName = businessAttributeName[1];
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
+ if (validateBMAttribute(uniqueAttrValue, businessAttributeName, entityType, bulkImportResponse,lineIndex+1)) {
+ continue;
+ }
+
+ AtlasBusinessAttribute atlasBusinessAttribute = entityType.getBusinessAttributes().get(bMName).get(bMAttributeName);
+ if (atlasBusinessAttribute.getAttributeType().getTypeCategory() == TypeCategory.ARRAY) {
+ AtlasArrayType arrayType = (AtlasArrayType) atlasBusinessAttribute.getAttributeType();
+ List attributeValueData;
+
+ if(arrayType.getElementType() instanceof AtlasEnumType){
+ attributeValueData = AtlasGraphUtilsV2.assignEnumValues(bmAttributeValue, (AtlasEnumType) arrayType.getElementType(), failedTermMsgList, lineIndex+1);
+ }else{
+ attributeValueData = assignMultipleValues(bmAttributeValue, arrayType.getElementTypeName(), failedTermMsgList, lineIndex+1);
+ }
+ attribute.put(bmAttribute, attributeValueData);
+ } else {
+ attribute.put(bmAttribute, bmAttributeValue);
+ }
+
+ if(failedMsgCheck(uniqueAttrValue,bmAttribute, failedTermMsgList, bulkImportResponse, lineIndex+1)) {
+ continue;
+ }
+
+ if(ret.containsKey(vertexKey)) {
+ atlasEntity = ret.get(vertexKey);
+ atlasEntity.setBusinessAttribute(bMName, bMAttributeName, attribute.get(bmAttribute));
+ ret.put(vertexKey, atlasEntity);
+ } else {
+ String guid = GraphHelper.getGuid(atlasVertex);
+ atlasEntity.setGuid(guid);
+ atlasEntity.setTypeName(typeName);
+ atlasEntity.setAttribute(Constants.QUALIFIED_NAME,uniqueAttrValue);
+ newBMAttributes = entityRetriever.getBusinessMetadata(atlasVertex) != null ? entityRetriever.getBusinessMetadata(atlasVertex) : newBMAttributes;
+ atlasEntity.setBusinessAttributes(newBMAttributes);
+ atlasEntity.setBusinessAttribute(bMName, bMAttributeName, attribute.get(bmAttribute));
+ ret.put(vertexKey, atlasEntity);
+ }
+ }
+ return ret;
+ }
+
+ private boolean validateTypeName(String typeName, BulkImportResponse bulkImportResponse, int lineIndex) {
+ boolean ret = false;
+
+ if(!typeRegistry.getAllEntityDefNames().contains(typeName)){
+ ret = true;
+ LOG.error("Invalid entity-type: " + typeName + " at line #" + lineIndex);
+ String failedTermMsgs = "Invalid entity-type: " + typeName + " at line #" + lineIndex;
+ BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(BulkImportResponse.ImportStatus.FAILED, failedTermMsgs, lineIndex);
+ bulkImportResponse.getFailedImportInfoList().add(importInfo);
+ }
+ return ret;
+ }
+
+ private List assignMultipleValues(String bmAttributeValues, String elementTypeName, List failedTermMsgList, int lineIndex) {
+
+ String[] arr = bmAttributeValues.split(FileUtils.ESCAPE_CHARACTER + FileUtils.PIPE_CHARACTER);
+ try {
+ switch (elementTypeName) {
+
+ case AtlasBaseTypeDef.ATLAS_TYPE_FLOAT:
+ return AtlasGraphUtilsV2.floatParser(arr, failedTermMsgList, lineIndex);
+
+ case AtlasBaseTypeDef.ATLAS_TYPE_INT:
+ return AtlasGraphUtilsV2.intParser(arr, failedTermMsgList, lineIndex);
+
+ case AtlasBaseTypeDef.ATLAS_TYPE_LONG:
+ return AtlasGraphUtilsV2.longParser(arr, failedTermMsgList, lineIndex);
+
+ case AtlasBaseTypeDef.ATLAS_TYPE_SHORT:
+ return AtlasGraphUtilsV2.shortParser(arr, failedTermMsgList, lineIndex);
+
+ case AtlasBaseTypeDef.ATLAS_TYPE_DOUBLE:
+ return AtlasGraphUtilsV2.doubleParser(arr, failedTermMsgList, lineIndex);
+
+ case AtlasBaseTypeDef.ATLAS_TYPE_DATE:
+ return AtlasGraphUtilsV2.dateParser(arr, failedTermMsgList, lineIndex);
+
+ case AtlasBaseTypeDef.ATLAS_TYPE_BOOLEAN:
+ return AtlasGraphUtilsV2.booleanParser(arr, failedTermMsgList, lineIndex);
+
+ default:
+ return Arrays.asList(arr);
+ }
+ } catch (Exception e) {
+ LOG.error("On line index " + lineIndex + "the provided BusinessMetadata AttributeValue " + bmAttributeValues + " are not of type - " + elementTypeName);
+ failedTermMsgList.add("On line index " + lineIndex + "the provided BusinessMetadata AttributeValue " + bmAttributeValues + " are not of type - " + elementTypeName);
+ }
+ return null;
+ }
+
+ private boolean missingFieldsCheck(String[] record, BulkImportResponse bulkImportResponse, int lineIndex){
+ boolean missingFieldsCheck = (record.length < FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX) ||
+ StringUtils.isBlank(record[FileUtils.TYPENAME_COLUMN_INDEX]) ||
+ StringUtils.isBlank(record[FileUtils.UNIQUE_ATTR_VALUE_COLUMN_INDEX]) ||
+ StringUtils.isBlank(record[FileUtils.BM_ATTR_NAME_COLUMN_INDEX]) ||
+ StringUtils.isBlank(record[FileUtils.BM_ATTR_VALUE_COLUMN_INDEX]);
+
+ if(missingFieldsCheck){
+ LOG.error("Missing fields: " + Arrays.toString(record) + " at line #" + lineIndex);
+ String failedTermMsgs = "Missing fields: " + Arrays.toString(record) + " at line #" + lineIndex;
+ BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(BulkImportResponse.ImportStatus.FAILED, failedTermMsgs, lineIndex);
+ bulkImportResponse.getFailedImportInfoList().add(importInfo);
+ }
+ return missingFieldsCheck;
+ }
+
+ private boolean validateBMAttributeName(String uniqueAttrValue, String bmAttribute, BulkImportResponse bulkImportResponse, int lineIndex) {
+ boolean ret = false;
+ String[] businessAttributeName = bmAttribute.split(FileUtils.ESCAPE_CHARACTER + ".");
+ if(businessAttributeName.length < 2){
+ LOG.error("Provided businessAttributeName is not in proper format : " + bmAttribute + " at line #" + lineIndex);
+ String failedTermMsgs = "Provided businessAttributeName is not in proper format : " + bmAttribute + " at line #" + lineIndex;
+ BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(uniqueAttrValue, bmAttribute, BulkImportResponse.ImportStatus.FAILED, failedTermMsgs, lineIndex);
+ bulkImportResponse.getFailedImportInfoList().add(importInfo);
+ ret = true;
+ }
+ return ret;
+ }
+
+ private boolean validateBMAttribute(String uniqueAttrValue,String[] businessAttributeName, AtlasEntityType entityType, BulkImportResponse bulkImportResponse, int lineIndex) {
+ boolean ret = false;
+ String bMName = businessAttributeName[0];
+ String bMAttributeName = businessAttributeName[1];
+
+ if(entityType.getBusinessAttributes(bMName) == null ||
+ entityType.getBusinessAttributes(bMName).get(bMAttributeName) == null){
+ ret = true;
+ LOG.error("Provided businessAttributeName is not valid : " + bMName+"."+bMAttributeName + " at line #" + lineIndex);
+ String failedTermMsgs = "Provided businessAttributeName is not valid : " + bMName+"."+bMAttributeName + " at line #" + lineIndex;
+ BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(uniqueAttrValue, bMName+"."+bMAttributeName, BulkImportResponse.ImportStatus.FAILED, failedTermMsgs, lineIndex);
+ bulkImportResponse.getFailedImportInfoList().add(importInfo);
+ }
+ return ret;
+ }
+
+ private boolean failedMsgCheck(String uniqueAttrValue, String bmAttribute, List<String> failedTermMsgList,BulkImportResponse bulkImportResponse,int lineIndex) {
+ boolean ret = false;
+ if(!failedTermMsgList.isEmpty()){
+ ret = true;
+ String failedTermMsg = StringUtils.join(failedTermMsgList, "\n");
+ BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(uniqueAttrValue, bmAttribute, BulkImportResponse.ImportStatus.FAILED, failedTermMsg, lineIndex);
+ bulkImportResponse.getFailedImportInfoList().add(importInfo);
+ }
+ return ret;
+ }
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
index bf13338..f393e51 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
@@ -29,6 +29,7 @@ import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.Status;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
+import org.apache.atlas.model.typedef.AtlasEnumDef;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdge;
@@ -39,9 +40,11 @@ import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
import org.apache.atlas.repository.graphdb.AtlasIndexQuery.Result;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasEnumType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.util.FileUtils;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.Configuration;
@@ -49,6 +52,7 @@ import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -62,11 +66,10 @@ import static org.apache.atlas.repository.Constants.CLASSIFICATION_NAMES_KEY;
import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_DEFAULT;
import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_PROPERTY;
-import static org.apache.atlas.repository.Constants.LABELS_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.PROPAGATED_CLASSIFICATION_NAMES_KEY;
import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.TYPE_NAME_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.TYPENAME_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.TYPE_NAME_PROPERTY_KEY;
import static org.apache.atlas.repository.graph.AtlasGraphProvider.getGraphInstance;
import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.SortOrder.ASC;
import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.SortOrder.DESC;
@@ -674,4 +677,128 @@ public class AtlasGraphUtilsV2 {
}
return classificationNames;
}
+
+ public static List<Date> dateParser(String[] arr, List failedTermMsgList, int lineIndex) {
+
+ List<Date> ret = new ArrayList();
+ for (String s : arr) {
+ try{
+ SimpleDateFormat formatter = new SimpleDateFormat("MM/dd/yyyy");
+ Date date = formatter.parse(s);
+ ret.add(date);
+ }
+ catch(Exception e){
+ LOG.error("Provided value "+s+" is not of Date type at line #"+lineIndex);
+ failedTermMsgList.add("Provided value "+s+" is not of Date type at line #"+lineIndex);
+ }
+ }
+ return ret;
+ }
+
+ public static List<Boolean> booleanParser(String[] arr, List failedTermMsgList, int lineIndex) {
+
+ List<Boolean> ret = new ArrayList();
+ for (String s : arr) {
+ try{
+ ret.add(Boolean.parseBoolean(s));
+ }
+ catch(Exception e){
+ LOG.error("Provided value "+s+" is not of Boolean type at line #"+lineIndex);
+ failedTermMsgList.add("Provided value "+s+" is not of Boolean type at line #"+lineIndex);
+ }
+ }
+ return ret;
+ }
+
+ public static List<Double> doubleParser(String[] arr, List failedTermMsgList, int lineIndex) {
+
+ List<Double> ret = new ArrayList();
+ for (String s : arr) {
+ try{
+ ret.add(Double.parseDouble(s));
+ }
+ catch(Exception e){
+ LOG.error("Provided value "+s+" is not of Double type at line #"+lineIndex);
+ failedTermMsgList.add("Provided value "+s+" is not of Double type at line #"+lineIndex);
+ }
+ }
+ return ret;
+ }
+
+ public static List<Short> shortParser(String[] arr, List failedTermMsgList, int lineIndex) {
+
+ List<Short> ret = new ArrayList();
+ for (String s : arr) {
+ try{
+ ret.add(Short.parseShort(s));
+ }
+ catch(Exception e){
+ LOG.error("Provided value "+s+" is not of Short type at line #"+lineIndex);
+ failedTermMsgList.add("Provided value "+s+" is not of Short type at line #"+lineIndex);
+ }
+ }
+ return ret;
+ }
+
+ public static List<Long> longParser(String[] arr, List failedTermMsgList, int lineIndex) {
+
+ List<Long> ret = new ArrayList();
+ for (String s : arr) {
+ try{
+ ret.add(Long.parseLong(s));
+ }
+ catch(Exception e){
+ LOG.error("Provided value "+s+" is not of Long type at line #"+lineIndex);
+ failedTermMsgList.add("Provided value "+s+" is not of Long type at line #"+lineIndex);
+ }
+ }
+ return ret;
+ }
+
+ public static List<Integer> intParser(String[] arr, List failedTermMsgList, int lineIndex) {
+
+ List<Integer> ret = new ArrayList();
+ for (String s : arr) {
+ try{
+ ret.add(Integer.parseInt(s));
+ }
+ catch(Exception e){
+ LOG.error("Provided value "+s+" is not of Integer type at line #"+lineIndex);
+ failedTermMsgList.add("Provided value "+s+" is Integer of Long type at line #"+lineIndex);
+ }
+ }
+ return ret;
+ }
+
+ public static List<Float> floatParser(String[] arr, List failedTermMsgList, int lineIndex) {
+
+ List<Float> ret = new ArrayList();
+ for (String s : arr) {
+ try{
+ ret.add(Float.parseFloat(s));
+ }
+ catch(Exception e){
+ LOG.error("Provided value "+s+" is Float of Long type at line #"+lineIndex);
+ failedTermMsgList.add("Provided value "+s+" is Float of Long type at line #"+lineIndex);
+ }
+ }
+ return ret;
+ }
+
+ public static List assignEnumValues(String bmAttributeValues, AtlasEnumType enumType, List<String> failedTermMsgList, int lineIndex) {
+ List<String> ret = new ArrayList<>();
+ String[] arr = bmAttributeValues.split(FileUtils.ESCAPE_CHARACTER + FileUtils.PIPE_CHARACTER);
+ AtlasEnumDef.AtlasEnumElementDef atlasEnumDef;
+ for(String s : arr){
+ atlasEnumDef = enumType.getEnumElementDef(s);
+ if(atlasEnumDef==null){
+ LOG.error("Provided value "+s+" is Enumeration of Long type at line #"+lineIndex);
+ failedTermMsgList.add("Provided value "+s+" is Enumeration of Long type at line #"+lineIndex);
+ }else{
+ ret.add(s);
+ }
+ }
+ return ret;
+ }
+
}
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/util/FileUtils.java b/repository/src/main/java/org/apache/atlas/util/FileUtils.java
index 7b992ce..66ade26 100644
--- a/repository/src/main/java/org/apache/atlas/util/FileUtils.java
+++ b/repository/src/main/java/org/apache/atlas/util/FileUtils.java
@@ -22,6 +22,7 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.Row;
@@ -36,13 +37,20 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import static org.apache.atlas.repository.Constants.GlossaryImportSupportedFileExtensions.*;
+import static org.apache.atlas.repository.Constants.SupportedFileExtensions.*;
public class FileUtils {
public static final String PIPE_CHARACTER = "|";
public static final String COLON_CHARACTER = ":";
public static final String ESCAPE_CHARACTER = "\\";
+ //BusinessMetadata attributes association uploads
+ public static final int TYPENAME_COLUMN_INDEX = 0;
+ public static final int UNIQUE_ATTR_VALUE_COLUMN_INDEX = 1;
+ public static final int BM_ATTR_NAME_COLUMN_INDEX = 2;
+ public static final int BM_ATTR_VALUE_COLUMN_INDEX = 3;
+ public static final int UNIQUE_ATTR_NAME_COLUMN_INDEX = 4;
+
public static List<String[]> readFileData(String fileName, InputStream inputStream) throws IOException, AtlasBaseException {
List<String[]> ret;
String extension = FilenameUtils.getExtension(fileName);
@@ -123,4 +131,16 @@ public class FileUtils {
return true;
}
+
+ public static String getBusinessMetadataHeaders() {
+ List<String> bMHeader = new ArrayList<>();
+
+ bMHeader.add("EntityType");
+ bMHeader.add("EntityUniqueAttributeValue");
+ bMHeader.add("BusinessAttributeName");
+ bMHeader.add("BusinessAttributeValue");
+ bMHeader.add("EntityUniqueAttributeName[optional]");
+
+ return StringUtils.join(bMHeader, ",");
+ }
}
\ No newline at end of file
diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2Test.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2Test.java
index 225b72c..38228a8 100644
--- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2Test.java
+++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2Test.java
@@ -22,6 +22,7 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.TestModules;
import org.apache.atlas.TestUtilsV2;
+import org.apache.atlas.bulkimport.BulkImportResponse;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
@@ -37,6 +38,7 @@ import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.util.FileUtils;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +49,9 @@ import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.inject.Inject;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -55,20 +60,27 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import static org.apache.atlas.AtlasErrorCode.*;
+import static org.apache.atlas.AtlasErrorCode.INVALID_CUSTOM_ATTRIBUTE_KEY_CHARACTERS;
+import static org.apache.atlas.AtlasErrorCode.INVALID_CUSTOM_ATTRIBUTE_KEY_LENGTH;
+import static org.apache.atlas.AtlasErrorCode.INVALID_CUSTOM_ATTRIBUTE_VALUE;
+import static org.apache.atlas.AtlasErrorCode.INVALID_LABEL_CHARACTERS;
+import static org.apache.atlas.AtlasErrorCode.INVALID_LABEL_LENGTH;
import static org.apache.atlas.TestUtilsV2.COLUMNS_ATTR_NAME;
import static org.apache.atlas.TestUtilsV2.COLUMN_TYPE;
import static org.apache.atlas.TestUtilsV2.NAME;
import static org.apache.atlas.TestUtilsV2.TABLE_TYPE;
+import static org.apache.atlas.TestUtilsV2.getFile;
import static org.apache.commons.lang.RandomStringUtils.randomAlphanumeric;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@Guice(modules = TestModules.TestOnlyModule.class)
public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV2Test.class);
+ public static final String CSV_FILES = "/csvFiles/";
private AtlasEntitiesWithExtInfo deptEntity;
private AtlasEntityWithExtInfo dbEntity;
@@ -208,15 +220,15 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
AtlasEntitiesWithExtInfo entitiesInfo = new AtlasEntitiesWithExtInfo(tableEntity);
AtlasEntity col1 = TestUtilsV2.createColumnEntity(tableEntity);
- col1.setAttribute(TestUtilsV2.NAME, "col1");
+ col1.setAttribute(NAME, "col1");
AtlasEntity col2 = TestUtilsV2.createColumnEntity(tableEntity);
- col2.setAttribute(TestUtilsV2.NAME, "col2");
+ col2.setAttribute(NAME, "col2");
columns.add(AtlasTypeUtil.getAtlasObjectId(col1));
columns.add(AtlasTypeUtil.getAtlasObjectId(col2));
- tableEntity.setAttribute(TestUtilsV2.COLUMNS_ATTR_NAME, columns);
+ tableEntity.setAttribute(COLUMNS_ATTR_NAME, columns);
entitiesInfo.addReferredEntity(dbEntity.getEntity());
entitiesInfo.addReferredEntity(col1);
@@ -230,14 +242,14 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
//Complete update. Add array elements - col3,col4
AtlasEntity col3 = TestUtilsV2.createColumnEntity(tableEntity);
- col3.setAttribute(TestUtilsV2.NAME, "col3");
+ col3.setAttribute(NAME, "col3");
AtlasEntity col4 = TestUtilsV2.createColumnEntity(tableEntity);
- col4.setAttribute(TestUtilsV2.NAME, "col4");
+ col4.setAttribute(NAME, "col4");
columns.add(AtlasTypeUtil.getAtlasObjectId(col3));
columns.add(AtlasTypeUtil.getAtlasObjectId(col4));
- tableEntity.setAttribute(TestUtilsV2.COLUMNS_ATTR_NAME, columns);
+ tableEntity.setAttribute(COLUMNS_ATTR_NAME, columns);
entitiesInfo.addReferredEntity(col3);
entitiesInfo.addReferredEntity(col4);
@@ -252,7 +264,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
columns.clear();
columns.add(AtlasTypeUtil.getAtlasObjectId(col4));
columns.add(AtlasTypeUtil.getAtlasObjectId(col3));
- tableEntity.setAttribute(TestUtilsV2.COLUMNS_ATTR_NAME, columns);
+ tableEntity.setAttribute(COLUMNS_ATTR_NAME, columns);
init();
response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false);
@@ -280,7 +292,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
AtlasEntity tableEntity = new AtlasEntity(tblEntity.getEntity());
AtlasEntitiesWithExtInfo entitiesInfo = new AtlasEntitiesWithExtInfo(tableEntity);
Map<String, AtlasStruct> partsMap = new HashMap<>();
- partsMap.put("part0", new AtlasStruct(TestUtilsV2.PARTITION_STRUCT_TYPE, TestUtilsV2.NAME, "test"));
+ partsMap.put("part0", new AtlasStruct(TestUtilsV2.PARTITION_STRUCT_TYPE, NAME, "test"));
tableEntity.setAttribute("partitionsMap", partsMap);
@@ -294,7 +306,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
Assert.assertTrue(partsMap.get("part0").equals(((Map<String, AtlasStruct>) updatedTableDef1.getAttribute("partitionsMap")).get("part0")));
//update map - add a map key
- partsMap.put("part1", new AtlasStruct(TestUtilsV2.PARTITION_STRUCT_TYPE, TestUtilsV2.NAME, "test1"));
+ partsMap.put("part1", new AtlasStruct(TestUtilsV2.PARTITION_STRUCT_TYPE, NAME, "test1"));
tableEntity.setAttribute("partitionsMap", partsMap);
init();
@@ -309,7 +321,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
//update map - remove a key and add another key
partsMap.remove("part0");
- partsMap.put("part2", new AtlasStruct(TestUtilsV2.PARTITION_STRUCT_TYPE, TestUtilsV2.NAME, "test2"));
+ partsMap.put("part2", new AtlasStruct(TestUtilsV2.PARTITION_STRUCT_TYPE, NAME, "test2"));
tableEntity.setAttribute("partitionsMap", partsMap);
init();
@@ -325,7 +337,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
//update struct value for existing map key
AtlasStruct partition2 = partsMap.get("part2");
- partition2.setAttribute(TestUtilsV2.NAME, "test2Updated");
+ partition2.setAttribute(NAME, "test2Updated");
init();
response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false);
@@ -340,7 +352,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
//Test map pointing to a class
- AtlasEntity col0 = new AtlasEntity(TestUtilsV2.COLUMN_TYPE, TestUtilsV2.NAME, "test1");
+ AtlasEntity col0 = new AtlasEntity(COLUMN_TYPE, NAME, "test1");
col0.setAttribute("type", "string");
col0.setAttribute("table", AtlasTypeUtil.getAtlasObjectId(tableEntity));
@@ -351,7 +363,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
init();
entityStore.createOrUpdate(new AtlasEntityStream(col0WithExtendedInfo), false);
- AtlasEntity col1 = new AtlasEntity(TestUtilsV2.COLUMN_TYPE, TestUtilsV2.NAME, "test2");
+ AtlasEntity col1 = new AtlasEntity(COLUMN_TYPE, NAME, "test2");
col1.setAttribute("type", "string");
col1.setAttribute("table", AtlasTypeUtil.getAtlasObjectId(tableEntity));
@@ -440,8 +452,8 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
AtlasEntitiesWithExtInfo entitiesInfo = new AtlasEntitiesWithExtInfo(tableEntity);
List<AtlasStruct> partitions = new ArrayList<AtlasStruct>(){{
- add(new AtlasStruct(TestUtilsV2.PARTITION_STRUCT_TYPE, TestUtilsV2.NAME, "part1"));
- add(new AtlasStruct(TestUtilsV2.PARTITION_STRUCT_TYPE, TestUtilsV2.NAME, "part2"));
+ add(new AtlasStruct(TestUtilsV2.PARTITION_STRUCT_TYPE, NAME, "part1"));
+ add(new AtlasStruct(TestUtilsV2.PARTITION_STRUCT_TYPE, NAME, "part2"));
}};
tableEntity.setAttribute("partitions", partitions);
@@ -451,7 +463,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
validateEntity(entitiesInfo, getEntityFromStore(updatedTable));
//add a new element to array of struct
- partitions.add(new AtlasStruct(TestUtilsV2.PARTITION_STRUCT_TYPE, TestUtilsV2.NAME, "part3"));
+ partitions.add(new AtlasStruct(TestUtilsV2.PARTITION_STRUCT_TYPE, NAME, "part3"));
tableEntity.setAttribute("partitions", partitions);
init();
response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false);
@@ -467,7 +479,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
validateEntity(entitiesInfo, getEntityFromStore(updatedTable));
//Update struct value within array of struct
- partitions.get(0).setAttribute(TestUtilsV2.NAME, "part4");
+ partitions.get(0).setAttribute(NAME, "part4");
tableEntity.setAttribute("partitions", partitions);
init();
response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false);
@@ -476,7 +488,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
//add a repeated element to array of struct
- partitions.add(new AtlasStruct(TestUtilsV2.PARTITION_STRUCT_TYPE, TestUtilsV2.NAME, "part4"));
+ partitions.add(new AtlasStruct(TestUtilsV2.PARTITION_STRUCT_TYPE, NAME, "part4"));
tableEntity.setAttribute("partitions", partitions);
init();
response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false);
@@ -499,11 +511,11 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
AtlasEntity tableEntity = new AtlasEntity(tblEntity.getEntity());
AtlasEntitiesWithExtInfo entitiesInfo = new AtlasEntitiesWithExtInfo(tableEntity);
- AtlasStruct serdeInstance = new AtlasStruct(TestUtilsV2.SERDE_TYPE, TestUtilsV2.NAME, "serde1Name");
+ AtlasStruct serdeInstance = new AtlasStruct(TestUtilsV2.SERDE_TYPE, NAME, "serde1Name");
serdeInstance.setAttribute("serde", "test");
serdeInstance.setAttribute("description", "testDesc");
tableEntity.setAttribute("serde1", serdeInstance);
- tableEntity.setAttribute("database", new AtlasObjectId(databaseEntity.getTypeName(), TestUtilsV2.NAME, databaseEntity.getAttribute(TestUtilsV2.NAME)));
+ tableEntity.setAttribute("database", new AtlasObjectId(databaseEntity.getTypeName(), NAME, databaseEntity.getAttribute(NAME)));
init();
EntityMutationResponse response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false);
@@ -547,7 +559,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
response = entityStore.createOrUpdate(new InMemoryMapEntityStream(tableCloneMap), false);
final AtlasEntityHeader tableDefinition = response.getFirstUpdatedEntityByTypeName(TABLE_TYPE);
AtlasEntity updatedTableDefinition = getEntityFromStore(tableDefinition);
- Assert.assertNotNull(updatedTableDefinition.getAttribute("database"));
+ assertNotNull(updatedTableDefinition.getAttribute("database"));
Assert.assertEquals(((AtlasObjectId) updatedTableDefinition.getAttribute("database")).getGuid(), dbCreated.getGuid());
}
@@ -563,7 +575,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
//The optional boolean attribute should have a non-null value
final String isReplicatedAttr = "isReplicated";
final String paramsAttr = "parameters";
- Assert.assertNotNull(firstEntityCreated.getAttribute(isReplicatedAttr));
+ assertNotNull(firstEntityCreated.getAttribute(isReplicatedAttr));
Assert.assertEquals(firstEntityCreated.getAttribute(isReplicatedAttr), Boolean.FALSE);
Assert.assertNull(firstEntityCreated.getAttribute(paramsAttr));
@@ -578,8 +590,8 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
response = entityStore.createOrUpdate(new AtlasEntityStream(dbEntity), false);
AtlasEntity firstEntityUpdated = getEntityFromStore(response.getFirstUpdatedEntityByTypeName(TestUtilsV2.DATABASE_TYPE));
- Assert.assertNotNull(firstEntityUpdated);
- Assert.assertNotNull(firstEntityUpdated.getAttribute(isReplicatedAttr));
+ assertNotNull(firstEntityUpdated);
+ assertNotNull(firstEntityUpdated.getAttribute(isReplicatedAttr));
Assert.assertEquals(firstEntityUpdated.getAttribute(isReplicatedAttr), Boolean.TRUE);
Assert.assertEquals(firstEntityUpdated.getAttribute(paramsAttr), params);
@@ -645,7 +657,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
//Update required attribute
Map<String, AtlasEntity> tableCloneMap = new HashMap<>();
AtlasEntity tableEntity = new AtlasEntity(TABLE_TYPE);
- tableEntity.setAttribute(TestUtilsV2.NAME, "table_" + TestUtilsV2.randomString());
+ tableEntity.setAttribute(NAME, "table_" + TestUtilsV2.randomString());
tableCloneMap.put(tableEntity.getGuid(), tableEntity);
entityStore.createOrUpdate(new InMemoryMapEntityStream(tableCloneMap), false);
@@ -713,14 +725,14 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
// create new column entity
AtlasEntity col1 = TestUtilsV2.createColumnEntity(tblEntity);
AtlasEntity col2 = TestUtilsV2.createColumnEntity(tblEntity);
- col1.setAttribute(TestUtilsV2.NAME, "col1");
- col2.setAttribute(TestUtilsV2.NAME, "col2");
+ col1.setAttribute(NAME, "col1");
+ col2.setAttribute(NAME, "col2");
List<AtlasObjectId> columns = new ArrayList<>();
columns.add(AtlasTypeUtil.getAtlasObjectId(col1));
columns.add(AtlasTypeUtil.getAtlasObjectId(col2));
- tblEntity.setAttribute(TestUtilsV2.COLUMNS_ATTR_NAME, columns);
+ tblEntity.setAttribute(COLUMNS_ATTR_NAME, columns);
AtlasEntitiesWithExtInfo tableEntityInfo = new AtlasEntitiesWithExtInfo(tblEntity);
tableEntityInfo.addReferredEntity(col1.getGuid(), col1);
@@ -731,16 +743,16 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
AtlasEntityHeader tblHeader = response.getFirstEntityCreated();
AtlasEntity createdTblEntity = getEntityFromStore(tblHeader);
- columns = (List<AtlasObjectId>) createdTblEntity.getAttribute(TestUtilsV2.COLUMNS_ATTR_NAME);
+ columns = (List<AtlasObjectId>) createdTblEntity.getAttribute(COLUMNS_ATTR_NAME);
assertEquals(columns.size(), 2);
// update - add 2 more columns to table
AtlasEntity col3 = TestUtilsV2.createColumnEntity(createdTblEntity);
- col3.setAttribute(TestUtilsV2.NAME, "col3");
+ col3.setAttribute(NAME, "col3");
col3.setAttribute("description", "description col3");
AtlasEntity col4 = TestUtilsV2.createColumnEntity(createdTblEntity);
- col4.setAttribute(TestUtilsV2.NAME, "col4");
+ col4.setAttribute(NAME, "col4");
col4.setAttribute("description", "description col4");
columns.clear();
@@ -749,7 +761,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
tblEntity = new AtlasEntity(TABLE_TYPE);
tblEntity.setGuid(createdTblEntity.getGuid());
- tblEntity.setAttribute(TestUtilsV2.COLUMNS_ATTR_NAME, columns);
+ tblEntity.setAttribute(COLUMNS_ATTR_NAME, columns);
tableEntityInfo = new AtlasEntitiesWithExtInfo(tblEntity);
tableEntityInfo.addReferredEntity(col3.getGuid(), col3);
@@ -760,7 +772,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
tblHeader = response.getFirstEntityPartialUpdated();
AtlasEntity updatedTblEntity = getEntityFromStore(tblHeader);
- columns = (List<AtlasObjectId>) updatedTblEntity.getAttribute(TestUtilsV2.COLUMNS_ATTR_NAME);
+ columns = (List<AtlasObjectId>) updatedTblEntity.getAttribute(COLUMNS_ATTR_NAME);
// deleted columns are included in the attribute; hence use >=
assertTrue(columns.size() >= 2);
}
@@ -1215,14 +1227,14 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
entityStore.removeLabels(tblEntityGuid, labels);
AtlasEntity tblEntity = getEntityFromStore(tblEntityGuid);
- Assert.assertNotNull(tblEntity.getLabels());
+ assertNotNull(tblEntity.getLabels());
Assert.assertEquals(tblEntity.getLabels().size(), 1);
labels.clear();
labels.add("label_4_add");
entityStore.removeLabels(tblEntityGuid, labels);
tblEntity = getEntityFromStore(tblEntityGuid);
- Assert.assertNotNull(tblEntity.getLabels());
+ assertNotNull(tblEntity.getLabels());
Assert.assertEquals(tblEntity.getLabels().size(), 1);
labels.clear();
@@ -1321,4 +1333,51 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
Assert.fail();
}
+ @Test
+ public void testGetTemplate() {
+ try {
+ String bMHeaderListAsString = FileUtils.getBusinessMetadataHeaders();
+
+ assertNotNull(bMHeaderListAsString);
+ assertEquals(bMHeaderListAsString, "EntityType,EntityUniqueAttributeValue,BusinessAttributeName,BusinessAttributeValue,EntityUniqueAttributeName[optional]");
+ } catch (Exception e) {
+ fail("The Template for BussinessMetadata Attributes should've been a success : ", e);
+ }
+ }
+
+ @Test
+ public void testEmptyFileException() {
+ InputStream inputStream = getFile(CSV_FILES, "empty.csv");
+
+ try {
+ entityStore.bulkCreateOrUpdateBusinessAttributes(inputStream, "empty.csv");
+ fail("Error occurred : Failed to recognize the empty file.");
+ } catch (AtlasBaseException e) {
+ assertEquals(e.getMessage(), "No data found in the uploaded file");
+ } finally {
+ if (inputStream != null) {
+ try {
+ inputStream.close();
+ } catch (Exception e) {
+ }
+ }
+ }
+ }
+
+ @Test(dependsOnMethods = "testCreate")
+ public void testBulkAddOrUpdateBusinessAttributes() {
+ try {
+ AtlasEntity hive_db_1 = getEntityFromStore(dbEntityGuid);
+ String dbName = (String) hive_db_1.getAttribute("name");
+ String data = TestUtilsV2.getFileData(CSV_FILES, "template_2.csv");
+ data = data.replaceAll("hive_db_1", dbName);
+ InputStream inputStream1 = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
+ BulkImportResponse bulkImportResponse = entityStore.bulkCreateOrUpdateBusinessAttributes(inputStream1, "template_2.csv");
+ assertEquals(CollectionUtils.isEmpty(bulkImportResponse.getSuccessImportInfoList()), false);
+ assertEquals(CollectionUtils.isEmpty(bulkImportResponse.getFailedImportInfoList()), true);
+
+ } catch (Exception e) {
+ fail("The BusinessMetadata Attribute should have been assigned " +e);
+ }
+ }
}
\ No newline at end of file
diff --git a/repository/src/test/resources/csvFiles/incorrectFile_2.csv b/repository/src/test/resources/csvFiles/incorrectFile_2.csv
new file mode 100644
index 0000000..3020396
--- /dev/null
+++ b/repository/src/test/resources/csvFiles/incorrectFile_2.csv
@@ -0,0 +1,2 @@
+TypeName,UniqueAttributeValue,BusinessAttributeName,BusinessAttributeValue,UniqueAttributeName[optional]
+incorrectEntityType,hive_db_1,bmWithAllTypes.attr8,"Awesome Attribute 1",name
diff --git a/repository/src/test/resources/csvFiles/template_1.csv b/repository/src/test/resources/csvFiles/template_1.csv
index c535e07..acb47b2 100644
--- a/repository/src/test/resources/csvFiles/template_1.csv
+++ b/repository/src/test/resources/csvFiles/template_1.csv
@@ -1,2 +1,2 @@
GlossaryName, TermName, ShortDescription, LongDescription, Examples, Abbreviation, Usage, AdditionalAttributes, TranslationTerms, ValidValuesFor, Synonyms, ReplacedBy, ValidValues, ReplacementTerms, SeeAlso, TranslatedTerms, IsA, Antonyms, Classifies, PreferredToTerms, PreferredTerms
-testBankingGlossary,BankBranch,SD4,LD4,"EXAMPLE","ABBREVIATION","USAGE",,,,,,,,,,,,,,
+testBankingGlossary,BankBranch,SD4,LD4,"EXAMPLE","ABBREVIATION","USAGE",,,,,,,,,,,,,,
\ No newline at end of file
diff --git a/repository/src/test/resources/csvFiles/template_2.csv b/repository/src/test/resources/csvFiles/template_2.csv
new file mode 100644
index 0000000..26609a9
--- /dev/null
+++ b/repository/src/test/resources/csvFiles/template_2.csv
@@ -0,0 +1,2 @@
+TypeName,UniqueAttributeValue,BusinessAttributeName,BusinessAttributeValue,UniqueAttributeName[optional]
+hive_database,hive_db_1,bmWithAllTypes.attr8,"Awesome Attribute 1",name
diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
index 5dcacb2..402a323 100644
--- a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
+++ b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
@@ -17,8 +17,11 @@
*/
package org.apache.atlas.web.rest;
+import com.sun.jersey.core.header.FormDataContentDisposition;
+import com.sun.jersey.multipart.FormDataParam;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.EntityAuditEvent;
+import org.apache.atlas.bulkimport.BulkImportResponse;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.audit.EntityAuditEventV2;
@@ -31,14 +34,15 @@ import org.apache.atlas.model.instance.ClassificationAssociateRequest;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.repository.audit.EntityAuditRepository;
-import org.apache.atlas.repository.store.graph.v2.ClassificationAssociator;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
+import org.apache.atlas.repository.store.graph.v2.ClassificationAssociator;
import org.apache.atlas.repository.store.graph.v2.EntityStream;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.util.FileUtils;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.atlas.web.util.Servlets;
import org.apache.commons.collections.CollectionUtils;
@@ -61,8 +65,14 @@ import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -1173,4 +1183,43 @@ public class EntityREST {
}
}
}
+
+ /**
+ * Get the sample Template for uploading/creating bulk BusinessMetaData
+ *
+ * @return Template File
+ * @throws AtlasBaseException
+ * @HTTP 400 If the provided fileType is not supported
+ */
+ @GET
+ @Path("/businessmetadata/import/template")
+ @Produces(MediaType.APPLICATION_OCTET_STREAM)
+ public Response produceTemplate() {
+ return Response.ok(new StreamingOutput() {
+ @Override
+ public void write(OutputStream outputStream) throws IOException, WebApplicationException {
+ outputStream.write(FileUtils.getBusinessMetadataHeaders().getBytes());
+ }
+ }).header("Content-Disposition", "attachment; filename=\"template_business_metadata\"").build();
+ }
+
+ /**
+ * Upload the file for creating Business Metadata in BULK
+ *
+ * @param uploadedInputStream InputStream of file
+ * @param fileDetail FormDataContentDisposition metadata of file
+ * @return
+ * @throws AtlasBaseException
+ * @HTTP 200 If Business Metadata creation was successful
+ * @HTTP 400 If Business Metadata definition has invalid or missing information
+ * @HTTP 409 If Business Metadata already exists (duplicate qualifiedName)
+ */
+ @POST
+ @Path("/businessmetadata/import")
+ @Consumes(MediaType.MULTIPART_FORM_DATA)
+ public BulkImportResponse importBMAttributes(@FormDataParam("file") InputStream uploadedInputStream,
+ @FormDataParam("file") FormDataContentDisposition fileDetail) throws AtlasBaseException {
+
+ return entitiesStore.bulkCreateOrUpdateBusinessAttributes(uploadedInputStream, fileDetail.getFileName());
+ }
}