You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2019/03/22 06:59:54 UTC
[atlas] branch master updated: ATLAS-3077: Handle java patches in
patch framework
This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new cd15f76 ATLAS-3077: Handle java patches in patch framework
cd15f76 is described below
commit cd15f768d6c0378925457f4447386a341c84f171
Author: Sarath Subramanian <ss...@hortonworks.com>
AuthorDate: Thu Mar 21 23:39:23 2019 -0700
ATLAS-3077: Handle java patches in patch framework
---
.../org/apache/atlas/model/patches/AtlasPatch.java | 6 +-
.../repository/graph/GraphBackedSearchIndexer.java | 14 +-
.../repository/patches/AtlasJavaPatchHandler.java | 138 +++++++++++++++++
.../atlas/repository/patches/PatchContext.java | 59 ++++++++
.../patches/UniqueAttributePatchHandler.java | 163 +++++++++++++++++++++
.../bootstrap/AtlasTypeDefStoreInitializer.java | 96 ++++++++----
.../store/graph/v2/AtlasGraphUtilsV2.java | 66 +++++----
.../apache/atlas/web/resources/AdminResource.java | 2 +-
8 files changed, 475 insertions(+), 69 deletions(-)
diff --git a/intg/src/main/java/org/apache/atlas/model/patches/AtlasPatch.java b/intg/src/main/java/org/apache/atlas/model/patches/AtlasPatch.java
index cdf2441..ae0255f 100644
--- a/intg/src/main/java/org/apache/atlas/model/patches/AtlasPatch.java
+++ b/intg/src/main/java/org/apache/atlas/model/patches/AtlasPatch.java
@@ -50,12 +50,12 @@ public class AtlasPatch implements Serializable {
private long updatedTime;
private PatchStatus status;
- public enum PatchStatus { APPLIED, SKIPPED, FAILED, UNKNOWN }
+ public enum PatchStatus { UNKNOWN, APPLIED, SKIPPED, FAILED }
public AtlasPatch() { }
- public AtlasPatch(String id, String patchName, String type, String action, PatchStatus status, String updatedBy,
- String createdBy, long createdTime, long updatedTime) {
+ public AtlasPatch(String id, String patchName, String type, String action, PatchStatus status,
+ String updatedBy, String createdBy, long createdTime, long updatedTime) {
this.id = id;
this.description = patchName;
this.type = type;
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index 4805435..c57f8e3 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -108,7 +108,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
private boolean recomputeIndexedKeys = true;
private Set<String> vertexIndexKeys = new HashSet<>();
- private enum UniqueKind { NONE, GLOBAL_UNIQUE, PER_TYPE_UNIQUE }
+ public enum UniqueKind { NONE, GLOBAL_UNIQUE, PER_TYPE_UNIQUE }
@Inject
public GraphBackedSearchIndexer(AtlasTypeRegistry typeRegistry) throws AtlasException {
@@ -431,7 +431,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
return type instanceof AtlasRelationshipType;
}
- private Class getPrimitiveClass(String attribTypeName) {
+ public Class getPrimitiveClass(String attribTypeName) {
String attributeTypeName = attribTypeName.toLowerCase();
switch (attributeTypeName) {
@@ -461,7 +461,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
throw new IllegalArgumentException(String.format("Unknown primitive typename %s", attribTypeName));
}
- private AtlasCardinality toAtlasCardinality(AtlasAttributeDef.Cardinality cardinality) {
+ public AtlasCardinality toAtlasCardinality(AtlasAttributeDef.Cardinality cardinality) {
switch (cardinality) {
case SINGLE:
return SINGLE;
@@ -500,8 +500,8 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
return propertyKey;
}
- private void createVertexIndex(AtlasGraphManagement management, String propertyName, UniqueKind uniqueKind, Class propertyClass,
- AtlasCardinality cardinality, boolean createCompositeIndex, boolean createCompositeIndexWithTypeAndSuperTypes) {
+ public void createVertexIndex(AtlasGraphManagement management, String propertyName, UniqueKind uniqueKind, Class propertyClass,
+ AtlasCardinality cardinality, boolean createCompositeIndex, boolean createCompositeIndexWithTypeAndSuperTypes) {
if (propertyName != null) {
AtlasPropertyKey propertyKey = management.getPropertyKey(propertyName);
@@ -704,7 +704,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
return !(INDEX_EXCLUSION_CLASSES.contains(propertyClass) || cardinality.isMany());
}
- private void commit(AtlasGraphManagement management) throws IndexException {
+ public void commit(AtlasGraphManagement management) throws IndexException {
try {
management.commit();
@@ -715,7 +715,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
}
}
- private void rollback(AtlasGraphManagement management) throws IndexException {
+ public void rollback(AtlasGraphManagement management) throws IndexException {
try {
management.rollback();
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasJavaPatchHandler.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasJavaPatchHandler.java
new file mode 100644
index 0000000..470ff10
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasJavaPatchHandler.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.patches;
+
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
+import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.collections.MapUtils;
+
+import java.util.Map;
+
+import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN;
+import static org.apache.atlas.repository.Constants.CREATED_BY_KEY;
+import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.MODIFIED_BY_KEY;
+import static org.apache.atlas.repository.Constants.PATCH_DESCRIPTION_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.PATCH_ID_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.PATCH_STATE_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.PATCH_TYPE_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.TIMESTAMP_PROPERTY_KEY;
+import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.findByPatchId;
+import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
+import static org.apache.atlas.repository.store.graph.v2.AtlasTypeDefGraphStoreV2.getCurrentUser;
+
+public abstract class AtlasJavaPatchHandler {
+ public final AtlasGraph graph;
+ public final AtlasTypeRegistry typeRegistry;
+ public final Map<String, PatchStatus> patchesRegistry;
+ public final EntityGraphRetriever entityRetriever;
+ public final GraphBackedSearchIndexer indexer;
+ public final PatchContext context;
+ public final String patchId;
+ public final String patchDescription;
+
+ private PatchStatus patchStatus;
+
+ public static final String JAVA_PATCH_TYPE = "JAVA_PATCH";
+
+ public AtlasJavaPatchHandler(PatchContext context, String patchId, String patchDescription) {
+ this.context = context;
+ this.graph = context.getGraph();
+ this.typeRegistry = context.getTypeRegistry();
+ this.indexer = context.getIndexer();
+ this.patchesRegistry = context.getPatchesRegistry();
+ this.patchId = patchId;
+ this.patchDescription = patchDescription;
+ this.patchStatus = getPatchStatus(patchesRegistry);
+ this.entityRetriever = new EntityGraphRetriever(typeRegistry);
+
+ init();
+ }
+
+ private void init() {
+ PatchStatus patchStatus = getPatchStatus();
+
+ if (patchStatus == UNKNOWN) {
+ AtlasVertex patchVertex = graph.addVertex();
+
+ setEncodedProperty(patchVertex, PATCH_ID_PROPERTY_KEY, patchId);
+ setEncodedProperty(patchVertex, PATCH_DESCRIPTION_PROPERTY_KEY, patchDescription);
+ setEncodedProperty(patchVertex, PATCH_TYPE_PROPERTY_KEY, JAVA_PATCH_TYPE);
+ setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, getPatchStatus().toString());
+ setEncodedProperty(patchVertex, TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
+ setEncodedProperty(patchVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
+ setEncodedProperty(patchVertex, CREATED_BY_KEY, getCurrentUser());
+ setEncodedProperty(patchVertex, MODIFIED_BY_KEY, getCurrentUser());
+
+ graph.commit();
+
+ addToPatchesRegistry(patchId, getPatchStatus());
+ }
+ }
+
+ private PatchStatus getPatchStatus(Map<String, PatchStatus> patchesRegistry) {
+ PatchStatus ret = UNKNOWN;
+
+ if (MapUtils.isNotEmpty(patchesRegistry) && patchesRegistry.containsKey(patchId)) {
+ ret = patchesRegistry.get(patchId);
+ }
+
+ return ret;
+ }
+
+ public void updatePatchVertex(PatchStatus patchStatus) {
+ AtlasVertex patchVertex = findByPatchId(patchId);
+
+ if (patchVertex != null) {
+ setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, patchStatus.toString());
+ setEncodedProperty(patchVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
+ setEncodedProperty(patchVertex, MODIFIED_BY_KEY, getCurrentUser());
+
+ graph.commit();
+
+ addToPatchesRegistry(getPatchId(), getPatchStatus());
+ }
+ }
+
+ public PatchStatus getPatchStatus() {
+ return patchStatus;
+ }
+
+ public void addToPatchesRegistry(String patchId, PatchStatus status) {
+ getPatchesRegistry().put(patchId, status);
+ }
+
+ public void setPatchStatus(PatchStatus patchStatus) {
+ this.patchStatus = patchStatus;
+ }
+
+ public String getPatchId() {
+ return patchId;
+ }
+
+ public Map<String, PatchStatus> getPatchesRegistry() {
+ return patchesRegistry;
+ }
+
+ public abstract void applyPatch();
+}
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/PatchContext.java b/repository/src/main/java/org/apache/atlas/repository/patches/PatchContext.java
new file mode 100644
index 0000000..a60422b
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/PatchContext.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.patches;
+
+import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
+import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.type.AtlasTypeRegistry;
+
+import java.util.Map;
+
+/**
+ * Patch context for typedef and java patches.
+ */
+public class PatchContext {
+ private final AtlasGraph graph;
+ private final AtlasTypeRegistry typeRegistry;
+ private final GraphBackedSearchIndexer indexer;
+ private final Map<String, PatchStatus> patchesRegistry;
+
+ public PatchContext(AtlasGraph graph, AtlasTypeRegistry typeRegistry, GraphBackedSearchIndexer indexer,
+ Map<String, PatchStatus> patchesRegistry) {
+ this.graph = graph;
+ this.typeRegistry = typeRegistry;
+ this.indexer = indexer;
+ this.patchesRegistry = patchesRegistry;
+ }
+
+ public AtlasGraph getGraph() {
+ return graph;
+ }
+
+ public AtlasTypeRegistry getTypeRegistry() {
+ return typeRegistry;
+ }
+
+ public GraphBackedSearchIndexer getIndexer() {
+ return indexer;
+ }
+
+ public Map<String, PatchStatus> getPatchesRegistry() {
+ return patchesRegistry;
+ }
+}
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatchHandler.java b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatchHandler.java
new file mode 100644
index 0000000..0c65ef1
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatchHandler.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.patches;
+
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.repository.IndexException;
+import org.apache.atlas.repository.graph.GraphBackedSearchIndexer.UniqueKind;
+import org.apache.atlas.repository.graphdb.AtlasCardinality;
+import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.commons.collections.MapUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
+import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.FAILED;
+import static org.apache.atlas.repository.graph.GraphHelper.getGuid;
+import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.findActiveEntityVerticesByType;
+import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
+
+public class UniqueAttributePatchHandler extends AtlasJavaPatchHandler {
+ private static final String PATCH_ID = "JAVA_PATCH_0000_001";
+ private static final String PATCH_DESCRIPTION = "Add new vertex property for each unique attribute of active entities";
+ private static final Logger LOG = LoggerFactory.getLogger(UniqueAttributePatchHandler.class);
+
+ public UniqueAttributePatchHandler(PatchContext context) {
+ super(context, PATCH_ID, PATCH_DESCRIPTION);
+ }
+
+ @Override
+ public void applyPatch() {
+ Collection<AtlasEntityType> allEntityTypes = typeRegistry.getAllEntityTypes();
+ boolean patchFailed = false;
+
+ for (AtlasEntityType entityType : allEntityTypes) {
+ String typeName = entityType.getTypeName();
+ Map<String, AtlasAttribute> uniqAttributes = entityType.getUniqAttributes();
+ int entitiesProcessed = 0;
+
+ LOG.info("Applying java patch: {} for type: {}", getPatchId(), typeName);
+
+ if (MapUtils.isNotEmpty(uniqAttributes)) {
+ Collection<AtlasAttribute> attributes = uniqAttributes.values();
+
+ try {
+ // register unique attribute property keys in graph
+ registerUniqueAttrPropertyKeys(attributes);
+
+ Iterator<AtlasVertex> iterator = findActiveEntityVerticesByType(typeName);
+
+ while (iterator.hasNext()) {
+ AtlasVertex entityVertex = iterator.next();
+ boolean patchApplied = false;
+
+ for (AtlasAttribute attribute : attributes) {
+ String uniquePropertyKey = attribute.getVertexUniquePropertyName();
+ Collection<? extends String> propertyKeys = entityVertex.getPropertyKeys();
+
+ if (!propertyKeys.contains(uniquePropertyKey)) {
+ String propertyKey = attribute.getVertexPropertyName();
+ AtlasAttributeDef attributeDef = attribute.getAttributeDef();
+ Object uniqAttrValue = entityRetriever.mapVertexToPrimitive(entityVertex, propertyKey, attributeDef);
+
+ // add the unique attribute property to vertex
+ setEncodedProperty(entityVertex, uniquePropertyKey, uniqAttrValue);
+
+ try {
+ graph.commit();
+
+ patchApplied = true;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: Added unique attribute property: {} to entity: {} ({})",
+ PATCH_ID, uniquePropertyKey, getGuid(entityVertex), typeName);
+ }
+ } catch (Throwable t) {
+ LOG.warn("Java patch ({}): failed to update entity guid: {}; typeName: {}; attrName: {}; attrValue: {}",
+ getPatchId(), getGuid(entityVertex), typeName, attribute.getName(), uniqAttrValue);
+
+ continue;
+ }
+ }
+ }
+
+ if (patchApplied) {
+ entitiesProcessed++;
+ }
+
+ if (entitiesProcessed % 1000 == 0) {
+ LOG.info("Java patch: {} : processed {} {} entities.", getPatchId(), entitiesProcessed, typeName);
+ }
+ }
+ } catch (IndexException e) {
+ LOG.error("Java patch: {} failed! error: {}", getPatchId(), e);
+
+ patchFailed = true;
+
+ break;
+ }
+ }
+
+ LOG.info("Applied java patch ({}) for type: {}; Total processed: {}", getPatchId(), typeName, entitiesProcessed);
+ }
+
+ if (patchFailed) {
+ setPatchStatus(FAILED);
+ } else {
+ setPatchStatus(APPLIED);
+ }
+
+ LOG.info("Applied java patch: {}; status: {}", getPatchId(), getPatchStatus());
+
+ updatePatchVertex(getPatchStatus());
+ }
+
+ private void registerUniqueAttrPropertyKeys(Collection<AtlasAttribute> attributes) throws IndexException {
+ AtlasGraphManagement management = graph.getManagementSystem();
+ boolean idxCreated = false;
+
+ for (AtlasAttribute attribute : attributes) {
+ String uniquePropertyName = attribute.getVertexUniquePropertyName();
+ boolean uniquePropertyNameExists = management.getPropertyKey(uniquePropertyName) != null;
+
+ if (!uniquePropertyNameExists) {
+ AtlasAttributeDef attributeDef = attribute.getAttributeDef();
+ boolean isIndexable = attributeDef.getIsIndexable();
+ String attribTypeName = attributeDef.getTypeName();
+ Class propertyClass = indexer.getPrimitiveClass(attribTypeName);
+ AtlasCardinality cardinality = indexer.toAtlasCardinality(attributeDef.getCardinality());
+
+ indexer.createVertexIndex(management, uniquePropertyName, UniqueKind.NONE, propertyClass, cardinality, isIndexable, true);
+
+ idxCreated = true;
+ }
+ }
+
+ //Commit indexes
+ if (idxCreated) {
+ indexer.commit(management);
+ }
+ }
+}
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
index 337a6db..78f3faf 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
@@ -42,8 +42,12 @@ import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
+import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.patches.AtlasJavaPatchHandler;
+import org.apache.atlas.repository.patches.PatchContext;
+import org.apache.atlas.repository.patches.UniqueAttributePatchHandler;
import org.apache.atlas.repository.store.graph.v2.AtlasTypeDefGraphStoreV2;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasEntityType;
@@ -90,7 +94,7 @@ import static org.apache.atlas.repository.Constants.PATCH_STATE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.PATCH_TYPE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.TIMESTAMP_PROPERTY_KEY;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.findByPatchId;
-import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.initPatchesRegistry;
+import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getPatchesRegistry;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
/**
@@ -105,18 +109,20 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
public static final String RELATIONSHIP_SWAP_ENDS = "swapEnds";
public static final String TYPEDEF_PATCH_TYPE = "TYPEDEF_PATCH";
- private final AtlasTypeDefStore atlasTypeDefStore;
- private final AtlasTypeRegistry atlasTypeRegistry;
- private final AtlasGraph atlasGraph;
- private final Configuration conf;
+ private final AtlasTypeDefStore typeDefStore;
+ private final AtlasTypeRegistry typeRegistry;
+ private final AtlasGraph graph;
+ private final Configuration conf;
+ private final GraphBackedSearchIndexer indexer;
@Inject
- public AtlasTypeDefStoreInitializer(AtlasTypeDefStore atlasTypeDefStore, AtlasTypeRegistry atlasTypeRegistry,
- AtlasGraph atlasGraph, Configuration conf) {
- this.atlasTypeDefStore = atlasTypeDefStore;
- this.atlasTypeRegistry = atlasTypeRegistry;
- this.atlasGraph = atlasGraph;
- this.conf = conf;
+ public AtlasTypeDefStoreInitializer(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry,
+ AtlasGraph graph, Configuration conf, GraphBackedSearchIndexer indexer) {
+ this.typeDefStore = typeDefStore;
+ this.typeRegistry = typeRegistry;
+ this.graph = graph;
+ this.conf = conf;
+ this.indexer = indexer;
}
@PostConstruct
@@ -124,7 +130,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
LOG.info("==> AtlasTypeDefStoreInitializer.init()");
if (!HAConfiguration.isHAEnabled(conf)) {
- atlasTypeDefStore.init();
+ typeDefStore.init();
loadBootstrapTypeDefs();
try {
@@ -149,8 +155,9 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
private void loadBootstrapTypeDefs() {
LOG.info("==> AtlasTypeDefStoreInitializer.loadBootstrapTypeDefs()");
- String atlasHomeDir = System.getProperty("atlas.home");
- String modelsDirName = (StringUtils.isEmpty(atlasHomeDir) ? "." : atlasHomeDir) + File.separator + "models";
+ String atlasHomeDir = System.getProperty("atlas.home");
+ String modelsDirName = (StringUtils.isEmpty(atlasHomeDir) ? "." : atlasHomeDir) + File.separator + "models";
+ PatchContext patchContext = initPatchContext();
if (modelsDirName == null || modelsDirName.length() == 0) {
LOG.info("Types directory {} does not exist or not readable or has no typedef files", modelsDirName);
@@ -158,7 +165,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
// look for folders we need to load models from
File topModeltypesDir = new File(modelsDirName);
File[] modelsDirContents = topModeltypesDir.exists() ? topModeltypesDir.listFiles() : null;
- Map<String, PatchStatus> patchesRegistry = initPatchesRegistry();
+
if (modelsDirContents != null && modelsDirContents.length > 0) {
Arrays.sort(modelsDirContents);
@@ -169,23 +176,49 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
continue;
} else if (!folder.getName().equals(PATCHES_FOLDER_NAME)){
// load the models alphabetically in the subfolders apart from patches
- loadModelsInFolder(folder, patchesRegistry);
+ loadModelsInFolder(folder, patchContext);
}
}
}
// load any files in the top models folder and any associated patches.
- loadModelsInFolder(topModeltypesDir, patchesRegistry);
+ loadModelsInFolder(topModeltypesDir, patchContext);
}
+
+ // apply java patches
+ applyJavaPatches(patchContext);
+
LOG.info("<== AtlasTypeDefStoreInitializer.loadBootstrapTypeDefs()");
}
+ private void applyJavaPatches(PatchContext context) {
+ // register java patches
+ AtlasJavaPatchHandler[] patches = new AtlasJavaPatchHandler[] { new UniqueAttributePatchHandler(context) };
+
+ // apply java patches
+ for (AtlasJavaPatchHandler patch : patches) {
+ PatchStatus patchStatus = patch.getPatchStatus();
+
+ if (patchStatus == APPLIED || patchStatus == SKIPPED) {
+ LOG.info("Ignoring java patch: {}; status: {}", patch.getPatchId(), patchStatus);
+ } else {
+ LOG.info("Applying java patch: {}; status: {}", patch.getPatchId(), patchStatus);
+
+ patch.applyPatch();
+ }
+ }
+ }
+
+ public PatchContext initPatchContext() {
+ return new PatchContext(graph, typeRegistry, indexer, getPatchesRegistry());
+ }
+
/**
* Load all the model files in the supplied folder followed by the contents of the patches folder.
* @param typesDir
- * @param patchesRegistry
+ * @param context
*/
- private void loadModelsInFolder(File typesDir, Map<String, PatchStatus> patchesRegistry) {
+ private void loadModelsInFolder(File typesDir, PatchContext context) {
LOG.info("==> AtlasTypeDefStoreInitializer({})", typesDir);
String typesDirName = typesDir.getName();
@@ -210,11 +243,11 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
continue;
}
- AtlasTypesDef typesToCreate = getTypesToCreate(typesDef, atlasTypeRegistry);
- AtlasTypesDef typesToUpdate = getTypesToUpdate(typesDef, atlasTypeRegistry, true);
+ AtlasTypesDef typesToCreate = getTypesToCreate(typesDef, typeRegistry);
+ AtlasTypesDef typesToUpdate = getTypesToUpdate(typesDef, typeRegistry, true);
if (!typesToCreate.isEmpty() || !typesToUpdate.isEmpty()) {
- atlasTypeDefStore.createUpdateTypesDef(typesToCreate, typesToUpdate);
+ typeDefStore.createUpdateTypesDef(typesToCreate, typesToUpdate);
LOG.info("Created/Updated types defined in file {}", typeDefFile.getAbsolutePath());
} else {
@@ -227,7 +260,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
}
}
- applyTypePatches(typesDir.getPath(), patchesRegistry);
+ applyTypePatches(typesDir.getPath(), context);
}
LOG.info("<== AtlasTypeDefStoreInitializer({})", typesDir);
}
@@ -367,7 +400,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
LOG.info("==> AtlasTypeDefStoreInitializer.instanceIsActive()");
try {
- atlasTypeDefStore.init();
+ typeDefStore.init();
loadBootstrapTypeDefs();
@@ -425,10 +458,11 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
return ret;
}
- private void applyTypePatches(String typesDirName, Map<String, PatchStatus> patchesRegistry) {
+ private void applyTypePatches(String typesDirName, PatchContext context) {
String typePatchesDirName = typesDirName + File.separator + PATCHES_FOLDER_NAME;
File typePatchesDir = new File(typePatchesDirName);
File[] typePatchFiles = typePatchesDir.exists() ? typePatchesDir.listFiles() : null;
+ Map<String, PatchStatus> patchesRegistry = context.getPatchesRegistry();
if (typePatchFiles == null || typePatchFiles.length == 0) {
LOG.info("Type patches directory {} does not exist or not readable or has no patches", typePatchesDirName);
@@ -439,11 +473,11 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
Arrays.sort(typePatchFiles);
PatchHandler[] patchHandlers = new PatchHandler[] {
- new AddAttributePatchHandler(atlasTypeDefStore, atlasTypeRegistry),
- new UpdateAttributePatchHandler(atlasTypeDefStore, atlasTypeRegistry),
- new RemoveLegacyRefAttributesPatchHandler(atlasTypeDefStore, atlasTypeRegistry),
- new UpdateTypeDefOptionsPatchHandler(atlasTypeDefStore, atlasTypeRegistry),
- new SetServiceTypePatchHandler(atlasTypeDefStore, atlasTypeRegistry)
+ new AddAttributePatchHandler(typeDefStore, typeRegistry),
+ new UpdateAttributePatchHandler(typeDefStore, typeRegistry),
+ new RemoveLegacyRefAttributesPatchHandler(typeDefStore, typeRegistry),
+ new UpdateTypeDefOptionsPatchHandler(typeDefStore, typeRegistry),
+ new SetServiceTypePatchHandler(typeDefStore, typeRegistry)
};
Map<String, PatchHandler> patchHandlerRegistry = new HashMap<>();
@@ -534,7 +568,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
private void createOrUpdatePatchVertex(TypeDefPatch patch, PatchStatus patchStatus, Map<String, PatchStatus> patchesRegistry) {
String patchId = patch.getId();
boolean isPatchRegistered = MapUtils.isNotEmpty(patchesRegistry) && patchesRegistry.containsKey(patchId);
- AtlasVertex patchVertex = isPatchRegistered ? findByPatchId(patchId) : atlasGraph.addVertex();
+ AtlasVertex patchVertex = isPatchRegistered ? findByPatchId(patchId) : graph.addVertex();
setEncodedProperty(patchVertex, PATCH_ID_PROPERTY_KEY, patchId);
setEncodedProperty(patchVertex, PATCH_DESCRIPTION_PROPERTY_KEY, patch.getDescription());
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
index 2d5fd97..dda324b 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
@@ -23,11 +23,11 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContext;
import org.apache.atlas.SortOrder;
-import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.discovery.SearchProcessor;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.Status;
import org.apache.atlas.model.patches.AtlasPatch;
import org.apache.atlas.model.patches.AtlasPatch.AtlasPatches;
import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
@@ -65,6 +65,7 @@ import java.util.Set;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN;
import static org.apache.atlas.repository.Constants.CREATED_BY_KEY;
+import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_DEFAULT;
import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_PROPERTY;
import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
@@ -74,8 +75,11 @@ import static org.apache.atlas.repository.Constants.PATCH_ID_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.PATCH_DESCRIPTION_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.PATCH_STATE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.PATCH_TYPE_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.TIMESTAMP_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.TYPE_NAME_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.VERTEX_INDEX;
+import static org.apache.atlas.repository.graph.AtlasGraphProvider.getGraphInstance;
import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.SortOrder.*;
/**
@@ -133,7 +137,7 @@ public class AtlasGraphUtilsV2 {
}
public static String getTypeName(AtlasElement element) {
- return element.getProperty(Constants.ENTITY_TYPE_PROPERTY_KEY, String.class);
+ return element.getProperty(ENTITY_TYPE_PROPERTY_KEY, String.class);
}
public static String getEdgeLabel(String fromNode, String toNode) {
@@ -341,7 +345,7 @@ public class AtlasGraphUtilsV2 {
public static AtlasVertex findByPatchId(String patchId) {
AtlasVertex ret = null;
String indexQuery = getIndexSearchPrefix() + "\"" + PATCH_ID_PROPERTY_KEY + "\" : ("+ patchId +")";
- Iterator<Result<Object, Object>> results = AtlasGraphProvider.getGraphInstance().indexQuery(VERTEX_INDEX, indexQuery).vertices();
+ Iterator<Result<Object, Object>> results = getGraphInstance().indexQuery(VERTEX_INDEX, indexQuery).vertices();
while (results != null && results.hasNext()) {
ret = results.next().getVertex();
@@ -358,7 +362,7 @@ public class AtlasGraphUtilsV2 {
AtlasVertex ret = GraphTransactionInterceptor.getVertexFromCache(guid);
if (ret == null) {
- AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
+ AtlasGraphQuery query = getGraphInstance().query()
.has(Constants.GUID_PROPERTY_KEY, guid);
Iterator<AtlasVertex> results = query.vertices().iterator();
@@ -386,9 +390,9 @@ public class AtlasGraphUtilsV2 {
}
public static boolean typeHasInstanceVertex(String typeName) throws AtlasBaseException {
- AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance()
+ AtlasGraphQuery query = getGraphInstance()
.query()
- .has(Constants.TYPE_NAME_PROPERTY_KEY, AtlasGraphQuery.ComparisionOperator.EQUAL, typeName);
+ .has(TYPE_NAME_PROPERTY_KEY, AtlasGraphQuery.ComparisionOperator.EQUAL, typeName);
Iterator<AtlasVertex> results = query.vertices().iterator();
@@ -404,8 +408,8 @@ public class AtlasGraphUtilsV2 {
public static AtlasVertex findByTypeAndUniquePropertyName(String typeName, String propertyName, Object attrVal) {
MetricRecorder metric = RequestContext.get().startMetricRecord("findByTypeAndUniquePropertyName");
- AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
- .has(Constants.ENTITY_TYPE_PROPERTY_KEY, typeName)
+ AtlasGraphQuery query = getGraphInstance().query()
+ .has(ENTITY_TYPE_PROPERTY_KEY, typeName)
.has(propertyName, attrVal);
Iterator<AtlasVertex> results = query.vertices().iterator();
@@ -420,7 +424,7 @@ public class AtlasGraphUtilsV2 {
public static AtlasVertex findBySuperTypeAndUniquePropertyName(String typeName, String propertyName, Object attrVal) {
MetricRecorder metric = RequestContext.get().startMetricRecord("findBySuperTypeAndUniquePropertyName");
- AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
+ AtlasGraphQuery query = getGraphInstance().query()
.has(Constants.SUPER_TYPES_PROPERTY_KEY, typeName)
.has(propertyName, attrVal);
@@ -436,10 +440,10 @@ public class AtlasGraphUtilsV2 {
public static AtlasVertex findByTypeAndPropertyName(String typeName, String propertyName, Object attrVal) {
MetricRecorder metric = RequestContext.get().startMetricRecord("findByTypeAndPropertyName");
- AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
- .has(Constants.ENTITY_TYPE_PROPERTY_KEY, typeName)
+ AtlasGraphQuery query = getGraphInstance().query()
+ .has(ENTITY_TYPE_PROPERTY_KEY, typeName)
.has(propertyName, attrVal)
- .has(Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name());
+ .has(STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name());
Iterator<AtlasVertex> results = query.vertices().iterator();
@@ -453,10 +457,10 @@ public class AtlasGraphUtilsV2 {
public static AtlasVertex findBySuperTypeAndPropertyName(String typeName, String propertyName, Object attrVal) {
MetricRecorder metric = RequestContext.get().startMetricRecord("findBySuperTypeAndPropertyName");
- AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
+ AtlasGraphQuery query = getGraphInstance().query()
.has(Constants.SUPER_TYPES_PROPERTY_KEY, typeName)
.has(propertyName, attrVal)
- .has(Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name());
+ .has(STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name());
Iterator<AtlasVertex> results = query.vertices().iterator();
@@ -467,9 +471,9 @@ public class AtlasGraphUtilsV2 {
return vertex;
}
- public static Map<String, PatchStatus> initPatchesRegistry() {
- Map<String, PatchStatus> ret = new HashMap<>();
- AtlasPatches patches = getPatches();
+ public static Map<String, PatchStatus> getPatchesRegistry() {
+ Map<String, PatchStatus> ret = new HashMap<>();
+ AtlasPatches patches = getAllPatches();
for (AtlasPatch patch : patches.getPatches()) {
String patchId = patch.getId();
@@ -483,7 +487,7 @@ public class AtlasGraphUtilsV2 {
return ret;
}
- public static AtlasPatches getPatches() {
+ public static AtlasPatches getAllPatches() {
List<AtlasPatch> ret = new ArrayList<>();
String idxQueryString = getIndexSearchPrefix() + "\"" + PATCH_ID_PROPERTY_KEY + "\" : (*)";
AtlasIndexQuery idxQuery = AtlasGraphProvider.getGraphInstance().indexQuery(VERTEX_INDEX, idxQueryString);
@@ -534,8 +538,8 @@ public class AtlasGraphUtilsV2 {
}
public static List<String> findEntityGUIDsByType(String typename, SortOrder sortOrder) {
- AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
- .has(Constants.ENTITY_TYPE_PROPERTY_KEY, typename);
+ AtlasGraphQuery query = getGraphInstance().query()
+ .has(ENTITY_TYPE_PROPERTY_KEY, typename);
if (sortOrder != null) {
AtlasGraphQuery.SortOrder qrySortOrder = sortOrder == SortOrder.ASCENDING ? ASC : DESC;
query.orderBy(Constants.QUALIFIED_NAME, qrySortOrder);
@@ -555,14 +559,22 @@ public class AtlasGraphUtilsV2 {
return ret;
}
+ public static Iterator<AtlasVertex> findActiveEntityVerticesByType(String typename) {
+ AtlasGraphQuery query = getGraphInstance().query()
+ .has(ENTITY_TYPE_PROPERTY_KEY, typename)
+ .has(STATE_PROPERTY_KEY, Status.ACTIVE.name());
+
+ return query.vertices().iterator();
+ }
+
public static List<String> findEntityGUIDsByType(String typename) {
return findEntityGUIDsByType(typename, null);
}
public static boolean relationshipTypeHasInstanceEdges(String typeName) throws AtlasBaseException {
- AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance()
+ AtlasGraphQuery query = getGraphInstance()
.query()
- .has(Constants.TYPE_NAME_PROPERTY_KEY, AtlasGraphQuery.ComparisionOperator.EQUAL, typeName);
+ .has(TYPE_NAME_PROPERTY_KEY, AtlasGraphQuery.ComparisionOperator.EQUAL, typeName);
Iterator<AtlasEdge> results = query.edges().iterator();
@@ -626,7 +638,7 @@ public class AtlasGraphUtilsV2 {
}
public static String getStateAsString(AtlasElement element) {
- return element.getProperty(Constants.STATE_PROPERTY_KEY, String.class);
+ return element.getProperty(STATE_PROPERTY_KEY, String.class);
}
private static boolean canUseIndexQuery(AtlasEntityType entityType, String attributeName) {
@@ -638,7 +650,7 @@ public class AtlasGraphUtilsV2 {
ret = typeAndSubTypesQryStr.length() <= SearchProcessor.MAX_QUERY_STR_LENGTH_TYPES;
if (ret) {
- Set<String> indexSet = AtlasGraphProvider.getGraphInstance().getVertexIndexKeys();
+ Set<String> indexSet = getGraphInstance().getVertexIndexKeys();
try {
ret = indexSet.contains(entityType.getQualifiedAttributeName(attributeName));
}
@@ -693,13 +705,13 @@ public class AtlasGraphUtilsV2 {
private static AtlasIndexQuery getIndexQuery(AtlasEntityType entityType, String propertyName, String value) {
StringBuilder sb = new StringBuilder();
- sb.append(INDEX_SEARCH_PREFIX + "\"").append(Constants.TYPE_NAME_PROPERTY_KEY).append("\":").append(entityType.getTypeAndAllSubTypesQryStr())
+ sb.append(INDEX_SEARCH_PREFIX + "\"").append(TYPE_NAME_PROPERTY_KEY).append("\":").append(entityType.getTypeAndAllSubTypesQryStr())
.append(" AND ")
.append(INDEX_SEARCH_PREFIX + "\"").append(propertyName).append("\":").append(AtlasAttribute.escapeIndexQueryValue(value))
.append(" AND ")
- .append(INDEX_SEARCH_PREFIX + "\"").append(Constants.STATE_PROPERTY_KEY).append("\":ACTIVE");
+ .append(INDEX_SEARCH_PREFIX + "\"").append(STATE_PROPERTY_KEY).append("\":ACTIVE");
- return AtlasGraphProvider.getGraphInstance().indexQuery(Constants.VERTEX_INDEX, sb.toString());
+ return getGraphInstance().indexQuery(Constants.VERTEX_INDEX, sb.toString());
}
public static String getIndexSearchPrefix() {
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index 01bdcf7..c5ceb9d 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -564,7 +564,7 @@ public class AdminResource {
LOG.debug("==> AdminResource.getAtlasPatches()");
}
- AtlasPatches ret = AtlasGraphUtilsV2.getPatches();
+ AtlasPatches ret = AtlasGraphUtilsV2.getAllPatches();
if (LOG.isDebugEnabled()) {
LOG.debug("<== AdminResource.getAtlasPatches()");