You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2019/04/16 09:21:31 UTC
[atlas] branch master updated: ATLAS-3132: performance improvements
in UniqueAttributesPatch
This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new efc4beb ATLAS-3132: performance improvements in UniqueAttributesPatch
efc4beb is described below
commit efc4bebc1623c9d00fe4fdf0df424918654a73df
Author: Ashutosh Mestry <am...@hortonworks.com>
AuthorDate: Sun Apr 14 18:31:28 2019 -0700
ATLAS-3132: performance improvements in UniqueAttributesPatch
Signed-off-by: Madhan Neethiraj <ma...@apache.org>
---
.../java/org/apache/atlas/pc/WorkItemConsumer.java | 19 +-
.../java/org/apache/atlas/pc/WorkItemManager.java | 26 +-
.../apache/atlas/kafka/EmbeddedKafkaServer.java | 2 +-
.../org/apache/atlas/kafka/KafkaNotification.java | 2 +-
.../repository/patches/AtlasJavaPatchHandler.java | 138 --------
.../repository/patches/AtlasPatchHandler.java | 68 ++++
.../repository/patches/AtlasPatchManager.java | 72 +++++
.../repository/patches/AtlasPatchRegistry.java | 220 +++++++++++++
.../repository/patches/AtlasPatchService.java | 54 ++++
.../atlas/repository/patches/PatchContext.java | 34 +-
.../repository/patches/UniqueAttributePatch.java | 356 +++++++++++++++++++++
.../patches/UniqueAttributePatchHandler.java | 164 ----------
.../bootstrap/AtlasTypeDefStoreInitializer.java | 134 ++------
.../store/graph/v2/AtlasGraphUtilsV2.java | 133 ++------
.../store/graph/v2/EntityGraphRetriever.java | 2 +-
.../atlas/patches/AtlasPatchRegistryTest.java | 78 +++++
.../notification/NotificationHookConsumer.java | 2 +-
.../apache/atlas/web/resources/AdminResource.java | 15 +-
.../atlas/web/resources/AdminResourceTest.java | 4 +-
19 files changed, 944 insertions(+), 579 deletions(-)
diff --git a/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java b/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
index b7eb4d8..8351b7c 100644
--- a/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
+++ b/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
@@ -30,31 +30,35 @@ import java.util.concurrent.atomic.AtomicLong;
public abstract class WorkItemConsumer<T> implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(WorkItemConsumer.class);
- private static final int POLLING_DURATION_SECONDS = 5;
+ private static final int POLLING_DURATION_SECONDS = 30;
private static final int DEFAULT_COMMIT_TIME_IN_MS = 15000;
private final BlockingQueue<T> queue;
- private AtomicBoolean isDirty = new AtomicBoolean(false);
- private AtomicLong maxCommitTimeInMs = new AtomicLong(0);
- private CountDownLatch countdownLatch;
- private BlockingQueue<Object> results;
+ private final AtomicBoolean isDirty = new AtomicBoolean(false);
+ private final AtomicLong maxCommitTimeInMs = new AtomicLong(DEFAULT_COMMIT_TIME_IN_MS);
+ private CountDownLatch countdownLatch;
+ private BlockingQueue<Object> results;
public WorkItemConsumer(BlockingQueue<T> queue) {
- this.queue = queue;
+ this.queue = queue;
+ this.countdownLatch = null;
}
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
-
T item = queue.poll(POLLING_DURATION_SECONDS, TimeUnit.SECONDS);
if (item == null) {
+ LOG.warn("WorkItemConsumer.run(): no more items found in the queue. Will exit after committing");
+
commitDirty();
+
return;
}
isDirty.set(true);
+
processItem(item);
}
} catch (InterruptedException e) {
@@ -67,6 +71,7 @@ public abstract class WorkItemConsumer<T> implements Runnable {
public long getMaxCommitTimeInMs() {
long commitTime = this.maxCommitTimeInMs.get();
+
return ((commitTime > DEFAULT_COMMIT_TIME_IN_MS) ? commitTime : DEFAULT_COMMIT_TIME_IN_MS);
}
diff --git a/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java b/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java
index 0e7d3f2..a7ba67c 100644
--- a/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java
+++ b/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java
@@ -33,20 +33,20 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
private final ExecutorService service;
private final List<U> consumers = new ArrayList<>();
private CountDownLatch countdownLatch;
- private BlockingQueue<Object> resultsQueue;
+ private BlockingQueue<Object> resultsQueue;
public WorkItemManager(WorkItemBuilder builder, String namePrefix, int batchSize, int numWorkers, boolean collectResults) {
this.numWorkers = numWorkers;
- workQueue = new LinkedBlockingQueue<>(batchSize * numWorkers);
- service = Executors.newFixedThreadPool(numWorkers,
- new ThreadFactoryBuilder().setNameFormat(namePrefix + "-%d").build());
+ this.workQueue = new LinkedBlockingQueue<>(batchSize * numWorkers);
+ this.service = Executors.newFixedThreadPool(numWorkers, new ThreadFactoryBuilder().setNameFormat(namePrefix + "-%d").build());
createConsumers(builder, numWorkers, collectResults);
- execute();
+
+ start();
}
public WorkItemManager(WorkItemBuilder builder, int batchSize, int numWorkers) {
- this(builder, "workItem", batchSize, numWorkers, false);
+ this(builder, "workItemConsumer", batchSize, numWorkers, false);
}
public void setResultsCollection(BlockingQueue<Object> resultsQueue) {
@@ -60,6 +60,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
for (int i = 0; i < numWorkers; i++) {
U c = (U) builder.build(workQueue);
+
consumers.add(c);
if (collectResults) {
@@ -68,10 +69,12 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
}
}
- private void execute() {
+ public void start() {
this.countdownLatch = new CountDownLatch(numWorkers);
+
for (U c : consumers) {
c.setCountDownLatch(countdownLatch);
+
service.execute(c);
}
}
@@ -85,9 +88,14 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
}
public void checkProduce(T item) {
- if (countdownLatch.getCount() == 0) {
- execute();
+ if (countdownLatch.getCount() < numWorkers) {
+ LOG.info("Fewer workers detected: {}", countdownLatch.getCount());
+
+ drain();
+
+ start();
}
+
produce(item);
}
diff --git a/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java b/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java
index 32b597f..235b7ce 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java
@@ -49,7 +49,7 @@ import java.util.concurrent.TimeUnit;
@Component
-@Order(2)
+@Order(3)
public class EmbeddedKafkaServer implements Service {
public static final Logger LOG = LoggerFactory.getLogger(EmbeddedKafkaServer.class);
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 1d0a273..449eb6f 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -48,7 +48,7 @@ import java.util.concurrent.Future;
* Kafka specific access point to the Atlas notification framework.
*/
@Component
-@Order(3)
+@Order(4)
public class KafkaNotification extends AbstractNotification implements Service {
public static final Logger LOG = LoggerFactory.getLogger(KafkaNotification.class);
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasJavaPatchHandler.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasJavaPatchHandler.java
deleted file mode 100644
index 9153d49..0000000
--- a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasJavaPatchHandler.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.repository.patches;
-
-import org.apache.atlas.RequestContext;
-import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
-import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
-import org.apache.atlas.repository.graphdb.AtlasGraph;
-import org.apache.atlas.repository.graphdb.AtlasVertex;
-import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
-import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.commons.collections.MapUtils;
-
-import java.util.Map;
-
-import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN;
-import static org.apache.atlas.repository.Constants.CREATED_BY_KEY;
-import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.MODIFIED_BY_KEY;
-import static org.apache.atlas.repository.Constants.PATCH_DESCRIPTION_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.PATCH_ID_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.PATCH_STATE_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.PATCH_TYPE_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.TIMESTAMP_PROPERTY_KEY;
-import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.findByPatchId;
-import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
-import static org.apache.atlas.repository.store.graph.v2.AtlasTypeDefGraphStoreV2.getCurrentUser;
-
-public abstract class AtlasJavaPatchHandler {
- public final AtlasGraph graph;
- public final AtlasTypeRegistry typeRegistry;
- public final Map<String, PatchStatus> patchesRegistry;
- public final EntityGraphRetriever entityRetriever;
- public final GraphBackedSearchIndexer indexer;
- public final PatchContext context;
- public final String patchId;
- public final String patchDescription;
-
- private PatchStatus patchStatus;
-
- public static final String JAVA_PATCH_TYPE = "JAVA_PATCH";
-
- public AtlasJavaPatchHandler(PatchContext context, String patchId, String patchDescription) {
- this.context = context;
- this.graph = context.getGraph();
- this.typeRegistry = context.getTypeRegistry();
- this.indexer = context.getIndexer();
- this.patchesRegistry = context.getPatchesRegistry();
- this.patchId = patchId;
- this.patchDescription = patchDescription;
- this.patchStatus = getPatchStatus(patchesRegistry);
- this.entityRetriever = new EntityGraphRetriever(typeRegistry);
-
- init();
- }
-
- private void init() {
- PatchStatus patchStatus = getPatchStatus();
-
- if (patchStatus == UNKNOWN) {
- AtlasVertex patchVertex = graph.addVertex();
-
- setEncodedProperty(patchVertex, PATCH_ID_PROPERTY_KEY, patchId);
- setEncodedProperty(patchVertex, PATCH_DESCRIPTION_PROPERTY_KEY, patchDescription);
- setEncodedProperty(patchVertex, PATCH_TYPE_PROPERTY_KEY, JAVA_PATCH_TYPE);
- setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, getPatchStatus().toString());
- setEncodedProperty(patchVertex, TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
- setEncodedProperty(patchVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
- setEncodedProperty(patchVertex, CREATED_BY_KEY, getCurrentUser());
- setEncodedProperty(patchVertex, MODIFIED_BY_KEY, getCurrentUser());
-
- addToPatchesRegistry(patchId, getPatchStatus());
- }
-
- graph.commit();
- }
-
- private PatchStatus getPatchStatus(Map<String, PatchStatus> patchesRegistry) {
- PatchStatus ret = UNKNOWN;
-
- if (MapUtils.isNotEmpty(patchesRegistry) && patchesRegistry.containsKey(patchId)) {
- ret = patchesRegistry.get(patchId);
- }
-
- return ret;
- }
-
- public void updatePatchVertex(PatchStatus patchStatus) {
- AtlasVertex patchVertex = findByPatchId(patchId);
-
- if (patchVertex != null) {
- setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, patchStatus.toString());
- setEncodedProperty(patchVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
- setEncodedProperty(patchVertex, MODIFIED_BY_KEY, getCurrentUser());
-
- addToPatchesRegistry(getPatchId(), getPatchStatus());
- }
-
- graph.commit();
- }
-
- public PatchStatus getPatchStatus() {
- return patchStatus;
- }
-
- public void addToPatchesRegistry(String patchId, PatchStatus status) {
- getPatchesRegistry().put(patchId, status);
- }
-
- public void setPatchStatus(PatchStatus patchStatus) {
- this.patchStatus = patchStatus;
- }
-
- public String getPatchId() {
- return patchId;
- }
-
- public Map<String, PatchStatus> getPatchesRegistry() {
- return patchesRegistry;
- }
-
- public abstract void applyPatch();
-}
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchHandler.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchHandler.java
new file mode 100644
index 0000000..d8dcfef
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchHandler.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.patches;
+
+import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
+
+import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN;
+
+public abstract class AtlasPatchHandler {
+ public static final String JAVA_PATCH_TYPE = "JAVA_PATCH";
+
+ private final String patchId;
+ private final String patchDescription;
+ private final AtlasPatchRegistry patchRegistry;
+ private PatchStatus status;
+
+ public AtlasPatchHandler(AtlasPatchRegistry patchRegistry, String patchId, String patchDescription) {
+ this.patchId = patchId;
+ this.patchDescription = patchDescription;
+ this.patchRegistry = patchRegistry;
+ this.status = getStatusFromRegistry();
+
+ register();
+ }
+
+ private void register() {
+ PatchStatus patchStatus = getStatus();
+
+ if (patchStatus == UNKNOWN) {
+ patchRegistry.register(patchId, patchDescription, JAVA_PATCH_TYPE, getStatus());
+ }
+ }
+
+ public PatchStatus getStatusFromRegistry() {
+ return patchRegistry.getStatus(patchId);
+ }
+
+ public PatchStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(PatchStatus status) {
+ this.status = status;
+
+ patchRegistry.updateStatus(patchId, status);
+ }
+
+ public String getPatchId() {
+ return patchId;
+ }
+
+ public abstract void apply();
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java
new file mode 100644
index 0000000..629215d
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.patches;
+
+import org.apache.atlas.model.patches.AtlasPatch;
+import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
+import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+
+import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
+import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.SKIPPED;
+
+@Component
+public class AtlasPatchManager {
+ private static final Logger LOG = LoggerFactory.getLogger(AtlasPatchManager.class);
+
+ private final PatchContext context;
+
+ @Inject
+ public AtlasPatchManager(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry, GraphBackedSearchIndexer indexer) {
+ this.context = new PatchContext(atlasGraph, typeRegistry, indexer);
+ }
+
+ public AtlasPatch.AtlasPatches getAllPatches() {
+ return context.getPatchRegistry().getAllPatches();
+ }
+
+ public void applyAll() {
+ final AtlasPatchHandler handlers[] = {
+ new UniqueAttributePatch(context)
+ };
+
+ try {
+ for (AtlasPatchHandler handler : handlers) {
+ PatchStatus patchStatus = handler.getStatusFromRegistry();
+
+ if (patchStatus == APPLIED || patchStatus == SKIPPED) {
+ LOG.info("Ignoring java handler: {}; status: {}", handler.getPatchId(), patchStatus);
+ } else {
+ LOG.info("Applying java handler: {}; status: {}", handler.getPatchId(), patchStatus);
+
+ handler.apply();
+ }
+ }
+ }
+ catch (Exception ex) {
+ LOG.error("Error applying patches.", ex);
+ }
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchRegistry.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchRegistry.java
new file mode 100644
index 0000000..df57959
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchRegistry.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.patches;
+
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.model.patches.AtlasPatch;
+import org.apache.atlas.model.patches.AtlasPatch.AtlasPatches;
+import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
+import org.apache.atlas.repository.graphdb.AtlasIndexQuery.Result;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.atlas.repository.store.graph.v2.AtlasTypeDefGraphStoreV2;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.FAILED;
+import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN;
+import static org.apache.atlas.repository.Constants.*;
+import static org.apache.atlas.repository.graph.AtlasGraphProvider.getGraphInstance;
+import static org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer.TYPEDEF_PATCH_TYPE;
+import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getEncodedProperty;
+import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIndexSearchPrefix;
+import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
+import static org.apache.atlas.repository.store.graph.v2.AtlasTypeDefGraphStoreV2.getCurrentUser;
+
+public class AtlasPatchRegistry {
+ private static final Logger LOG = LoggerFactory.getLogger(AtlasPatchRegistry.class);
+
+ private final Map<String, PatchStatus> patchNameStatusMap;
+ private final AtlasGraph graph;
+
+ public AtlasPatchRegistry(AtlasGraph graph) {
+ this.graph = graph;
+ this.patchNameStatusMap = getPatchNameStatusForAllRegistered(graph);
+ }
+
+ public boolean isApplicable(String incomingId, String patchFile, int index) {
+ String patchId = getId(incomingId, patchFile, index);
+
+ if (MapUtils.isEmpty(patchNameStatusMap) || !patchNameStatusMap.containsKey(patchId)) {
+ return true;
+ }
+
+ PatchStatus status = patchNameStatusMap.get(patchId);
+
+ if (status == FAILED || status == UNKNOWN) {
+ return true;
+ }
+
+ return false;
+ }
+
+ public PatchStatus getStatus(String id) {
+ return patchNameStatusMap.get(id);
+ }
+
+ public void register(String patchId, String description, String action, PatchStatus patchStatus) {
+ createOrUpdatePatchVertex(graph, patchId, description, action, patchStatus);
+ }
+
+ public void updateStatus(String patchId, PatchStatus patchStatus) {
+ AtlasVertex patchVertex = findByPatchId(patchId);
+
+ if (patchVertex == null) {
+ return;
+ }
+
+ setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, patchStatus.toString());
+ setEncodedProperty(patchVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
+ setEncodedProperty(patchVertex, MODIFIED_BY_KEY, getCurrentUser());
+ setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, patchStatus.toString());
+
+ patchNameStatusMap.put(patchId, patchStatus);
+
+ graph.commit();
+ }
+
+ private static String getId(String incomingId, String patchFile, int index) {
+ String patchId = incomingId;
+
+ if (StringUtils.isEmpty(patchId)) {
+ return String.format("%s_%s", patchFile, index);
+ }
+
+ return patchId;
+ }
+
+ public AtlasPatches getAllPatches() {
+ return getAllPatches(graph);
+ }
+
+ private void createOrUpdatePatchVertex(AtlasGraph graph, String patchId,
+ String description, String action, PatchStatus patchStatus) {
+ boolean isPatchRegistered = MapUtils.isNotEmpty(patchNameStatusMap) && patchNameStatusMap.containsKey(patchId);
+ AtlasVertex patchVertex = isPatchRegistered ? findByPatchId(patchId) : graph.addVertex();
+
+ setEncodedProperty(patchVertex, PATCH_ID_PROPERTY_KEY, patchId);
+ setEncodedProperty(patchVertex, PATCH_DESCRIPTION_PROPERTY_KEY, description);
+ setEncodedProperty(patchVertex, PATCH_TYPE_PROPERTY_KEY, TYPEDEF_PATCH_TYPE);
+ setEncodedProperty(patchVertex, PATCH_ACTION_PROPERTY_KEY, action);
+ setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, patchStatus.toString());
+ setEncodedProperty(patchVertex, TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
+ setEncodedProperty(patchVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
+ setEncodedProperty(patchVertex, CREATED_BY_KEY, AtlasTypeDefGraphStoreV2.getCurrentUser());
+ setEncodedProperty(patchVertex, MODIFIED_BY_KEY, AtlasTypeDefGraphStoreV2.getCurrentUser());
+
+ graph.commit();
+ }
+
+ private static Map<String, PatchStatus> getPatchNameStatusForAllRegistered(AtlasGraph graph) {
+ Map<String, PatchStatus> ret = new HashMap<>();
+ AtlasPatches patches = getAllPatches(graph);
+
+ for (AtlasPatch patch : patches.getPatches()) {
+ String patchId = patch.getId();
+ PatchStatus patchStatus = patch.getStatus();
+
+ if (patchId != null && patchStatus != null) {
+ ret.put(patchId, patchStatus);
+ }
+ }
+
+ return ret;
+ }
+
+ private static AtlasPatches getAllPatches(AtlasGraph graph) {
+ List<AtlasPatch> ret = new ArrayList<>();
+ String idxQueryString = getIndexSearchPrefix() + "\"" + PATCH_ID_PROPERTY_KEY + "\" : (*)";
+ AtlasIndexQuery idxQuery = graph.indexQuery(VERTEX_INDEX, idxQueryString);
+
+ try {
+ Iterator<Result<Object, Object>> results = idxQuery.vertices();
+
+ while (results != null && results.hasNext()) {
+ AtlasVertex patchVertex = results.next().getVertex();
+ AtlasPatch patch = toAtlasPatch(patchVertex);
+
+ ret.add(patch);
+ }
+
+ if (CollectionUtils.isNotEmpty(ret)) {
+ Collections.sort(ret, Comparator.comparing(AtlasPatch::getId));
+ }
+ } catch (Throwable t) {
+ LOG.warn("getAllPatches(): Returned empty result!");
+ }
+
+ graph.commit();
+
+ return new AtlasPatches(ret);
+ }
+
+
+ private static AtlasPatch toAtlasPatch(AtlasVertex vertex) {
+ AtlasPatch ret = new AtlasPatch();
+
+ ret.setId(getEncodedProperty(vertex, PATCH_ID_PROPERTY_KEY, String.class));
+ ret.setDescription(getEncodedProperty(vertex, PATCH_DESCRIPTION_PROPERTY_KEY, String.class));
+ ret.setType(getEncodedProperty(vertex, PATCH_TYPE_PROPERTY_KEY, String.class));
+ ret.setAction(getEncodedProperty(vertex, PATCH_ACTION_PROPERTY_KEY, String.class));
+ ret.setCreatedBy(getEncodedProperty(vertex, CREATED_BY_KEY, String.class));
+ ret.setUpdatedBy(getEncodedProperty(vertex, MODIFIED_BY_KEY, String.class));
+ ret.setCreatedTime(getEncodedProperty(vertex, TIMESTAMP_PROPERTY_KEY, Long.class));
+ ret.setUpdatedTime(getEncodedProperty(vertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class));
+ ret.setStatus(getPatchStatus(vertex));
+
+ return ret;
+ }
+
+ private static AtlasVertex findByPatchId(String patchId) {
+ AtlasVertex ret = null;
+ String indexQuery = getIndexSearchPrefix() + "\"" + PATCH_ID_PROPERTY_KEY + "\" : (" + patchId + ")";
+ Iterator<Result<Object, Object>> results = getGraphInstance().indexQuery(VERTEX_INDEX, indexQuery).vertices();
+
+ while (results != null && results.hasNext()) {
+ ret = results.next().getVertex();
+
+ if (ret != null) {
+ break;
+ }
+ }
+
+ return ret;
+ }
+
+ private static PatchStatus getPatchStatus(AtlasVertex vertex) {
+ String patchStatus = AtlasGraphUtilsV2.getEncodedProperty(vertex, PATCH_STATE_PROPERTY_KEY, String.class);
+
+ return patchStatus != null ? PatchStatus.valueOf(patchStatus) : UNKNOWN;
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchService.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchService.java
new file mode 100644
index 0000000..fc21285
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchService.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.patches;
+
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.service.Service;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+
+@Component
+@Order(2)
+public class AtlasPatchService implements Service {
+ private static final Logger LOG = LoggerFactory.getLogger(AtlasPatchService.class);
+
+ private final AtlasPatchManager patchManager;
+
+
+ @Inject
+ public AtlasPatchService(AtlasPatchManager patchManager) {
+ this.patchManager = patchManager;
+ }
+
+ @Override
+ public void start() throws AtlasException {
+ LOG.info("PatchService: Started.");
+
+ patchManager.applyAll();
+ }
+
+ @Override
+ public void stop() throws AtlasException {
+ LOG.info("PatchService: Stopped.");
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/PatchContext.java b/repository/src/main/java/org/apache/atlas/repository/patches/PatchContext.java
index a60422b..3508c74 100644
--- a/repository/src/main/java/org/apache/atlas/repository/patches/PatchContext.java
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/PatchContext.java
@@ -6,39 +6,33 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.atlas.repository.patches;
-import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.type.AtlasTypeRegistry;
-import java.util.Map;
-
-/**
- * Patch context for typedef and java patches.
- */
public class PatchContext {
private final AtlasGraph graph;
private final AtlasTypeRegistry typeRegistry;
private final GraphBackedSearchIndexer indexer;
- private final Map<String, PatchStatus> patchesRegistry;
-
- public PatchContext(AtlasGraph graph, AtlasTypeRegistry typeRegistry, GraphBackedSearchIndexer indexer,
- Map<String, PatchStatus> patchesRegistry) {
- this.graph = graph;
- this.typeRegistry = typeRegistry;
- this.indexer = indexer;
- this.patchesRegistry = patchesRegistry;
+ private final AtlasPatchRegistry patchRegistry;
+
+ public PatchContext(AtlasGraph graph, AtlasTypeRegistry typeRegistry, GraphBackedSearchIndexer indexer) {
+ this.graph = graph;
+ this.typeRegistry = typeRegistry;
+ this.indexer = indexer;
+ this.patchRegistry = new AtlasPatchRegistry(this.graph);
}
public AtlasGraph getGraph() {
@@ -53,7 +47,7 @@ public class PatchContext {
return indexer;
}
- public Map<String, PatchStatus> getPatchesRegistry() {
- return patchesRegistry;
+ public AtlasPatchRegistry getPatchRegistry() {
+ return patchRegistry;
}
-}
\ No newline at end of file
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
new file mode 100644
index 0000000..c5af500
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.patches;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.pc.WorkItemBuilder;
+import org.apache.atlas.pc.WorkItemConsumer;
+import org.apache.atlas.pc.WorkItemManager;
+import org.apache.atlas.repository.IndexException;
+import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
+import org.apache.atlas.repository.graph.GraphBackedSearchIndexer.UniqueKind;
+import org.apache.atlas.repository.graphdb.*;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
+import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex;
+
+public class UniqueAttributePatch extends AtlasPatchHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(UniqueAttributePatch.class);
+
+ private static final String PATCH_ID = "JAVA_PATCH_0000_001";
+ private static final String PATCH_DESCRIPTION = "Add __u_ property for each unique attribute of active entities";
+
+ private final AtlasGraph graph;
+ private final GraphBackedSearchIndexer indexer;
+ private final AtlasTypeRegistry typeRegistry;
+
+ public UniqueAttributePatch(PatchContext context) {
+ super(context.getPatchRegistry(), PATCH_ID, PATCH_DESCRIPTION);
+
+ this.graph = context.getGraph();
+ this.indexer = context.getIndexer();
+ this.typeRegistry = context.getTypeRegistry();
+ }
+
+ @Override
+ public void apply() {
+ TypeNameAttributeCache typeNameAttributeCache = registerUniqueAttributeForTypes();
+ UniqueAttributePatchProcessor patchProcessor = new UniqueAttributePatchProcessor(this.graph);
+
+ patchProcessor.apply(typeNameAttributeCache.getAll());
+
+ setStatus(APPLIED);
+
+ LOG.info("UniqueAttributePatch: {}; status: {}", getPatchId(), getStatus());
+ }
+
+ private TypeNameAttributeCache registerUniqueAttributeForTypes() {
+ TypeNameAttributeCache ret = new TypeNameAttributeCache();
+
+ for (AtlasEntityType entityType : typeRegistry.getAllEntityTypes()) {
+ createIndexForUniqueAttributes(entityType.getTypeName(), entityType.getUniqAttributes().values());
+
+ ret.add(entityType, entityType.getUniqAttributes().values());
+ }
+
+ return ret;
+ }
+
+ private boolean createIndexForUniqueAttributes(String typeName, Collection<AtlasAttribute> attributes) {
+ try {
+ AtlasGraphManagement management = graph.getManagementSystem();
+
+ for (AtlasAttribute attribute : attributes) {
+ String uniquePropertyName = attribute.getVertexUniquePropertyName();
+
+ if (management.getPropertyKey(uniquePropertyName) != null) {
+ continue;
+ }
+
+ AtlasAttributeDef attributeDef = attribute.getAttributeDef();
+ boolean isIndexable = attributeDef.getIsIndexable();
+ String attribTypeName = attributeDef.getTypeName();
+ Class propertyClass = indexer.getPrimitiveClass(attribTypeName);
+ AtlasCardinality cardinality = indexer.toAtlasCardinality(attributeDef.getCardinality());
+
+ indexer.createVertexIndex(management, uniquePropertyName, UniqueKind.PER_TYPE_UNIQUE, propertyClass, cardinality, isIndexable, true);
+ }
+
+ indexer.commit(management);
+ graph.commit();
+
+ LOG.info("Unique attributes: type: {}: Registered!", typeName);
+
+ return true;
+ } catch (IndexException e) {
+ LOG.error("Error creating index: type: {}", typeName, e);
+ return false;
+ }
+ }
+
+ public static class UniqueAttributePatchProcessor {
+ private static final String NUM_WORKERS_PROPERTY = "atlas.patch.unique_attribute_patch.numWorkers";
+ private static final String BATCH_SIZE_PROPERTY = "atlas.patch.unique_attribute_patch.batchSize";
+ private static final String ATLAS_SOLR_SHARDS = "ATLAS_SOLR_SHARDS";
+ private static final int NUM_WORKERS;
+ private static final int BATCH_SIZE;
+
+ private final AtlasGraph graph;
+
+ static {
+ int numWorkers = 3;
+ int batchSize = 300;
+
+ try {
+ numWorkers = ApplicationProperties.get().getInt(NUM_WORKERS_PROPERTY, getDefaultNumWorkers());
+ batchSize = ApplicationProperties.get().getInt(BATCH_SIZE_PROPERTY, 300);
+
+ LOG.info("UniqueAttributePatch: {}={}, {}={}", NUM_WORKERS_PROPERTY, numWorkers, BATCH_SIZE_PROPERTY, batchSize);
+ } catch (Exception e) {
+ LOG.error("Error retrieving configuration.", e);
+ }
+
+ NUM_WORKERS = numWorkers;
+ BATCH_SIZE = batchSize;
+ }
+
+ public UniqueAttributePatchProcessor(AtlasGraph graph) {
+ this.graph = graph;
+ }
+
+ public void apply(final Map<String, Collection<AtlasAttribute>> typeUniqueAttributeCache) {
+ WorkItemManager manager = null;
+
+ try {
+ Iterator<AtlasVertex> iterator = graph.getVertices().iterator();
+
+ if (iterator.hasNext()) {
+ manager = new WorkItemManager<>(new ConsumerBuilder(graph), BATCH_SIZE, NUM_WORKERS);
+
+ LOG.info("Processing: Started...");
+
+ while (iterator.hasNext()) {
+ AtlasVertex vertex = iterator.next();
+
+ if (!AtlasGraphUtilsV2.isEntityVertex(vertex)) {
+ continue;
+ }
+
+ String typeName = AtlasGraphUtilsV2.getTypeName(vertex);
+
+ submitForProcessing(typeName, vertex, manager, typeUniqueAttributeCache.get(typeName));
+ }
+
+ manager.drain();
+ }
+ } catch (Exception ex) {
+ LOG.error("Error: ", ex);
+ } finally {
+ if (manager != null) {
+ try {
+ manager.shutdown();
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted", e);
+ }
+ }
+ }
+ }
+
+ private static int getDefaultNumWorkers() throws AtlasException {
+ return ApplicationProperties.get().getInt(ATLAS_SOLR_SHARDS, 1) * 3;
+ }
+
+ private void submitForProcessing(String typeName, AtlasVertex vertex, WorkItemManager manager, Collection<AtlasAttribute> uniqAttributes) {
+ WorkItem workItem = new WorkItem(typeName, (Long) vertex.getId(), uniqAttributes);
+
+ manager.checkProduce(workItem);
+ }
+
+
+ private static class WorkItem {
+ private final String typeName;
+ private final long id;
+ private final Collection<AtlasAttribute> uniqueAttributeValues;
+
+ public WorkItem(String typeName, long id, Collection<AtlasAttribute> uniqueAttributeValues) {
+ this.typeName = typeName;
+ this.id = id;
+ this.uniqueAttributeValues = uniqueAttributeValues;
+ }
+ }
+
+ private static class Consumer extends WorkItemConsumer<WorkItem> {
+ private static int MAX_COMMIT_RETRY_COUNT = 3;
+
+ private final AtlasGraph graph;
+ private final AtomicLong counter;
+
+ public Consumer(AtlasGraph graph, BlockingQueue<WorkItem> queue) {
+ super(queue);
+
+ this.graph = graph;
+ this.counter = new AtomicLong(0);
+ }
+
+ @Override
+ protected void doCommit() {
+ if (counter.get() % BATCH_SIZE == 0) {
+ LOG.info("Processed: {}", counter.get());
+
+ attemptCommit();
+ }
+ }
+
+ @Override
+ protected void commitDirty() {
+ attemptCommit();
+
+ LOG.info("Total: Commit: {}", counter.get());
+
+ super.commitDirty();
+ }
+
+ private void attemptCommit() {
+ for (int retryCount = 1; retryCount <= MAX_COMMIT_RETRY_COUNT; retryCount++) {
+ try {
+ graph.commit();
+
+ break;
+ } catch(Exception ex) {
+ LOG.error("Commit exception: ", retryCount, ex);
+
+ try {
+ Thread.currentThread().sleep(300 * retryCount);
+ } catch (InterruptedException e) {
+ LOG.error("Commit exception: Pause: Interrputed!", e);
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void processItem(WorkItem wi) {
+ counter.incrementAndGet();
+
+ String typeName = wi.typeName;
+
+ if(wi.uniqueAttributeValues == null) {
+ return;
+ }
+
+ AtlasVertex vertex = graph.getVertex(Long.toString(wi.id));
+
+ if (vertex == null) {
+ LOG.warn("processItem: AtlasVertex with id: ({}): not found!", wi.id);
+
+ return;
+ }
+
+ if (AtlasGraphUtilsV2.isTypeVertex(vertex)) {
+ return;
+ }
+
+ AtlasEntity.Status status = AtlasGraphUtilsV2.getState(vertex);
+
+ if (status != AtlasEntity.Status.ACTIVE) {
+ return;
+ }
+
+ try {
+ LOG.debug("processItem: {}", wi.id);
+
+ for (AtlasAttribute attribute : wi.uniqueAttributeValues) {
+ String uniquePropertyKey = attribute.getVertexUniquePropertyName();
+ Collection<? extends String> propertyKeys = vertex.getPropertyKeys();
+ Object uniqAttrValue = null;
+
+ if (propertyKeys != null && propertyKeys.contains(uniquePropertyKey)) {
+ LOG.debug("processItem: {}: Skipped!", wi.id);
+ } else {
+ try {
+ String propertyKey = attribute.getVertexPropertyName();
+
+ uniqAttrValue = EntityGraphRetriever.mapVertexToPrimitive(vertex, propertyKey, attribute.getAttributeDef());
+
+ AtlasGraphUtilsV2.setEncodedProperty(vertex, uniquePropertyKey, uniqAttrValue);
+ } catch(AtlasSchemaViolationException ex) {
+ LOG.error("Duplicates detected: {}:{}:{}", typeName, uniqAttrValue, getIdFromVertex(vertex));
+ }
+ }
+
+ commit();
+ }
+
+ LOG.debug("processItem: {}: Done!", wi.id);
+ } catch (Exception ex) {
+ LOG.error("Error found: {}: {}", typeName, wi.id, ex);
+ }
+ }
+ }
+
+ private class ConsumerBuilder implements WorkItemBuilder<Consumer, WorkItem> {
+ private final AtlasGraph graph;
+
+ public ConsumerBuilder(AtlasGraph graph) {
+ this.graph = graph;
+ }
+
+ @Override
+ public Consumer build(BlockingQueue<WorkItem> queue) {
+ return new Consumer(graph, queue);
+ }
+ }
+ }
+
+ public static class TypeNameAttributeCache {
+ private Map<String, Collection<AtlasAttribute>> typeUniqueAttributeCache = new HashMap<>();
+
+ public void add(AtlasEntityType entityType, Collection<AtlasAttribute> values) {
+ typeUniqueAttributeCache.put(entityType.getTypeName(), values);
+ }
+
+ public Collection<AtlasAttribute> get(String typeName) {
+ return typeUniqueAttributeCache.get(typeName);
+ }
+
+ public boolean has(String typeName) {
+ return typeUniqueAttributeCache.containsKey(typeName);
+ }
+
+ public Map<String, Collection<AtlasAttribute>> getAll() {
+ return typeUniqueAttributeCache;
+ }
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatchHandler.java b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatchHandler.java
deleted file mode 100644
index f2238f1..0000000
--- a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatchHandler.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.repository.patches;
-
-import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
-import org.apache.atlas.repository.IndexException;
-import org.apache.atlas.repository.graph.GraphBackedSearchIndexer.UniqueKind;
-import org.apache.atlas.repository.graphdb.AtlasCardinality;
-import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
-import org.apache.atlas.repository.graphdb.AtlasIndexQuery.Result;
-import org.apache.atlas.repository.graphdb.AtlasVertex;
-import org.apache.atlas.type.AtlasEntityType;
-import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
-import org.apache.commons.collections.MapUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-
-import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
-import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.FAILED;
-import static org.apache.atlas.repository.graph.GraphHelper.getGuid;
-import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.findActiveEntityVerticesByType;
-import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
-
-public class UniqueAttributePatchHandler extends AtlasJavaPatchHandler {
- private static final String PATCH_ID = "JAVA_PATCH_0000_001";
- private static final String PATCH_DESCRIPTION = "Add new vertex property for each unique attribute of active entities";
- private static final Logger LOG = LoggerFactory.getLogger(UniqueAttributePatchHandler.class);
-
- public UniqueAttributePatchHandler(PatchContext context) {
- super(context, PATCH_ID, PATCH_DESCRIPTION);
- }
-
- @Override
- public void applyPatch() {
- Collection<AtlasEntityType> allEntityTypes = typeRegistry.getAllEntityTypes();
- boolean patchFailed = false;
-
- for (AtlasEntityType entityType : allEntityTypes) {
- String typeName = entityType.getTypeName();
- Map<String, AtlasAttribute> uniqAttributes = entityType.getUniqAttributes();
- int patchAppliedCount = 0;
-
- LOG.info("Applying java patch: {} for type: {}", getPatchId(), typeName);
-
- if (MapUtils.isNotEmpty(uniqAttributes)) {
- Collection<AtlasAttribute> attributes = uniqAttributes.values();
-
- try {
- // register unique attribute property keys in graph
- registerUniqueAttrPropertyKeys(attributes);
-
- Iterator<Result<Object, Object>> iterator = findActiveEntityVerticesByType(typeName);
-
- int entityCount = 0;
-
- while (iterator != null && iterator.hasNext()) {
- AtlasVertex entityVertex = iterator.next().getVertex();
- boolean patchApplied = false;
-
- entityCount++;
-
- for (AtlasAttribute attribute : attributes) {
- String uniquePropertyKey = attribute.getVertexUniquePropertyName();
- Collection<? extends String> propertyKeys = entityVertex.getPropertyKeys();
-
- if (!propertyKeys.contains(uniquePropertyKey)) {
- String propertyKey = attribute.getVertexPropertyName();
- AtlasAttributeDef attributeDef = attribute.getAttributeDef();
- Object uniqAttrValue = entityRetriever.mapVertexToPrimitive(entityVertex, propertyKey, attributeDef);
-
- // add the unique attribute property to vertex
- setEncodedProperty(entityVertex, uniquePropertyKey, uniqAttrValue);
-
- try {
- graph.commit();
-
- patchApplied = true;
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: Added unique attribute property: {} to entity: {} ({})",
- PATCH_ID, uniquePropertyKey, getGuid(entityVertex), typeName);
- }
- } catch (Throwable t) {
- LOG.warn("Java patch ({}): failed to update entity guid: {}; typeName: {}; attrName: {}; attrValue: {}",
- getPatchId(), getGuid(entityVertex), typeName, attribute.getName(), uniqAttrValue);
-
- continue;
- }
- }
- }
-
- if (patchApplied) {
- patchAppliedCount++;
- }
-
- if (entityCount % 1000 == 0) {
- LOG.info("Java patch: {} : applied {}; processed {} {} entities.", getPatchId(), patchAppliedCount, entityCount, typeName);
- }
- }
- } catch (IndexException e) {
- LOG.error("Java patch: {} failed! error: {}", getPatchId(), e);
-
- patchFailed = true;
-
- break;
- }
- }
-
- LOG.info("Applied java patch ({}) for type: {}; Total processed: {}", getPatchId(), typeName, patchAppliedCount);
- }
-
- if (patchFailed) {
- setPatchStatus(FAILED);
- } else {
- setPatchStatus(APPLIED);
- }
-
- LOG.info("Applied java patch: {}; status: {}", getPatchId(), getPatchStatus());
-
- updatePatchVertex(getPatchStatus());
- }
-
- private void registerUniqueAttrPropertyKeys(Collection<AtlasAttribute> attributes) throws IndexException {
- AtlasGraphManagement management = graph.getManagementSystem();
-
- for (AtlasAttribute attribute : attributes) {
- String uniquePropertyName = attribute.getVertexUniquePropertyName();
- boolean uniquePropertyNameExists = management.getPropertyKey(uniquePropertyName) != null;
-
- if (!uniquePropertyNameExists) {
- AtlasAttributeDef attributeDef = attribute.getAttributeDef();
- boolean isIndexable = attributeDef.getIsIndexable();
- String attribTypeName = attributeDef.getTypeName();
- Class propertyClass = indexer.getPrimitiveClass(attribTypeName);
- AtlasCardinality cardinality = indexer.toAtlasCardinality(attributeDef.getCardinality());
-
- indexer.createVertexIndex(management, uniquePropertyName, UniqueKind.NONE, propertyClass, cardinality, isIndexable, true);
- }
- }
-
- //Commit indexes
- indexer.commit(management);
- graph.commit();
- }
-}
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
index 78f3faf..662edc9 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
@@ -21,7 +21,6 @@ package org.apache.atlas.repository.store.bootstrap;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
@@ -41,14 +40,8 @@ import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
-import org.apache.atlas.repository.graph.AtlasGraphProvider;
-import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graphdb.AtlasGraph;
-import org.apache.atlas.repository.graphdb.AtlasVertex;
-import org.apache.atlas.repository.patches.AtlasJavaPatchHandler;
-import org.apache.atlas.repository.patches.PatchContext;
-import org.apache.atlas.repository.patches.UniqueAttributePatchHandler;
-import org.apache.atlas.repository.store.graph.v2.AtlasTypeDefGraphStoreV2;
+import org.apache.atlas.repository.patches.AtlasPatchRegistry;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
@@ -83,19 +76,6 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.FAILED;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.SKIPPED;
-import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN;
-import static org.apache.atlas.repository.Constants.CREATED_BY_KEY;
-import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.MODIFIED_BY_KEY;
-import static org.apache.atlas.repository.Constants.PATCH_ACTION_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.PATCH_ID_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.PATCH_DESCRIPTION_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.PATCH_STATE_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.PATCH_TYPE_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.TIMESTAMP_PROPERTY_KEY;
-import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.findByPatchId;
-import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getPatchesRegistry;
-import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
/**
* Class that handles initial loading of models and patches into typedef store
@@ -111,18 +91,16 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
private final AtlasTypeDefStore typeDefStore;
private final AtlasTypeRegistry typeRegistry;
- private final AtlasGraph graph;
private final Configuration conf;
- private final GraphBackedSearchIndexer indexer;
+ private final AtlasPatchRegistry patchRegistry;
@Inject
public AtlasTypeDefStoreInitializer(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry,
- AtlasGraph graph, Configuration conf, GraphBackedSearchIndexer indexer) {
- this.typeDefStore = typeDefStore;
- this.typeRegistry = typeRegistry;
- this.graph = graph;
- this.conf = conf;
- this.indexer = indexer;
+ AtlasGraph graph, Configuration conf) {
+ this.typeDefStore = typeDefStore;
+ this.typeRegistry = typeRegistry;
+ this.conf = conf;
+ this.patchRegistry = new AtlasPatchRegistry(graph);
}
@PostConstruct
@@ -157,7 +135,6 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
String atlasHomeDir = System.getProperty("atlas.home");
String modelsDirName = (StringUtils.isEmpty(atlasHomeDir) ? "." : atlasHomeDir) + File.separator + "models";
- PatchContext patchContext = initPatchContext();
if (modelsDirName == null || modelsDirName.length() == 0) {
LOG.info("Types directory {} does not exist or not readable or has no typedef files", modelsDirName);
@@ -176,49 +153,23 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
continue;
} else if (!folder.getName().equals(PATCHES_FOLDER_NAME)){
// load the models alphabetically in the subfolders apart from patches
- loadModelsInFolder(folder, patchContext);
+ loadModelsInFolder(folder);
}
}
}
// load any files in the top models folder and any associated patches.
- loadModelsInFolder(topModeltypesDir, patchContext);
+ loadModelsInFolder(topModeltypesDir);
}
- // apply java patches
- applyJavaPatches(patchContext);
-
LOG.info("<== AtlasTypeDefStoreInitializer.loadBootstrapTypeDefs()");
}
- private void applyJavaPatches(PatchContext context) {
- // register java patches
- AtlasJavaPatchHandler[] patches = new AtlasJavaPatchHandler[] { new UniqueAttributePatchHandler(context) };
-
- // apply java patches
- for (AtlasJavaPatchHandler patch : patches) {
- PatchStatus patchStatus = patch.getPatchStatus();
-
- if (patchStatus == APPLIED || patchStatus == SKIPPED) {
- LOG.info("Ignoring java patch: {}; status: {}", patch.getPatchId(), patchStatus);
- } else {
- LOG.info("Applying java patch: {}; status: {}", patch.getPatchId(), patchStatus);
-
- patch.applyPatch();
- }
- }
- }
-
- public PatchContext initPatchContext() {
- return new PatchContext(graph, typeRegistry, indexer, getPatchesRegistry());
- }
-
/**
* Load all the model files in the supplied folder followed by the contents of the patches folder.
* @param typesDir
- * @param context
*/
- private void loadModelsInFolder(File typesDir, PatchContext context) {
+ private void loadModelsInFolder(File typesDir) {
LOG.info("==> AtlasTypeDefStoreInitializer({})", typesDir);
String typesDirName = typesDir.getName();
@@ -260,7 +211,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
}
}
- applyTypePatches(typesDir.getPath(), context);
+ applyTypePatches(typesDir.getPath());
}
LOG.info("<== AtlasTypeDefStoreInitializer({})", typesDir);
}
@@ -458,11 +409,10 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
return ret;
}
- private void applyTypePatches(String typesDirName, PatchContext context) {
- String typePatchesDirName = typesDirName + File.separator + PATCHES_FOLDER_NAME;
- File typePatchesDir = new File(typePatchesDirName);
- File[] typePatchFiles = typePatchesDir.exists() ? typePatchesDir.listFiles() : null;
- Map<String, PatchStatus> patchesRegistry = context.getPatchesRegistry();
+ private void applyTypePatches(String typesDirName) {
+ String typePatchesDirName = typesDirName + File.separator + PATCHES_FOLDER_NAME;
+ File typePatchesDir = new File(typePatchesDirName);
+ File[] typePatchFiles = typePatchesDir.exists() ? typePatchesDir.listFiles() : null;
if (typePatchFiles == null || typePatchFiles.length == 0) {
LOG.info("Type patches directory {} does not exist or not readable or has no patches", typePatchesDirName);
@@ -505,7 +455,6 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
}
int patchIndex = 0;
-
for (TypeDefPatch patch : patches.getPatches()) {
PatchHandler patchHandler = patchHandlerRegistry.get(patch.getAction());
@@ -514,15 +463,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
continue;
}
- String patchId = patch.getId();
-
- if (StringUtils.isEmpty(patchId)) {
- patchId = typePatchFile.getName() + "_" + patchIndex;
-
- patch.setId(patchId);
- }
-
- if (isPatchApplicable(patchId, patchesRegistry)) {
+ if (patchRegistry.isApplicable(patch.getId(), patchFile, patchIndex++)) {
PatchStatus status;
try {
@@ -531,17 +472,14 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
status = FAILED;
LOG.error("Failed to apply {} (status: {}; action: {}) in file: {}. Ignored.",
- patchId, status.toString(), patch.getAction(), patchFile);
+ patch.getId(), status.toString(), patch.getAction(), patchFile);
}
- createOrUpdatePatchVertex(patch, status, patchesRegistry);
-
- LOG.info("{} (status: {}; action: {}) in file: {}", patchId, status.toString(), patch.getAction(), patchFile);
+ patchRegistry.register(patch.id, patch.description, patch.action, status);
+ LOG.info("{} (status: {}; action: {}) in file: {}", patch.getId(), status.toString(), patch.getAction(), patchFile);
} else {
- LOG.info("{} in file: {} already {}. Ignoring.", patchId, patchFile, patchesRegistry.get(patchId).toString());
+ LOG.info("{} in file: {} already {}. Ignoring.", patch.getId(), patchFile, patchRegistry.getStatus(patch.getId()).toString());
}
-
- patchIndex++;
}
} catch (Throwable t) {
LOG.error("Failed to apply patches in file {}. Ignored", patchFile, t);
@@ -551,38 +489,6 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
}
}
- private boolean isPatchApplicable(String patchId, Map<String, PatchStatus> patchesRegistry) {
- if (MapUtils.isEmpty(patchesRegistry) || !patchesRegistry.containsKey(patchId)) {
- return true;
- }
-
- PatchStatus status = patchesRegistry.get(patchId);
-
- if (status == FAILED || status == UNKNOWN) {
- return true;
- }
-
- return false;
- }
-
- private void createOrUpdatePatchVertex(TypeDefPatch patch, PatchStatus patchStatus, Map<String, PatchStatus> patchesRegistry) {
- String patchId = patch.getId();
- boolean isPatchRegistered = MapUtils.isNotEmpty(patchesRegistry) && patchesRegistry.containsKey(patchId);
- AtlasVertex patchVertex = isPatchRegistered ? findByPatchId(patchId) : graph.addVertex();
-
- setEncodedProperty(patchVertex, PATCH_ID_PROPERTY_KEY, patchId);
- setEncodedProperty(patchVertex, PATCH_DESCRIPTION_PROPERTY_KEY, patch.getDescription());
- setEncodedProperty(patchVertex, PATCH_TYPE_PROPERTY_KEY, TYPEDEF_PATCH_TYPE);
- setEncodedProperty(patchVertex, PATCH_ACTION_PROPERTY_KEY, patch.getAction());
- setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, patchStatus.toString());
- setEncodedProperty(patchVertex, TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
- setEncodedProperty(patchVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
- setEncodedProperty(patchVertex, CREATED_BY_KEY, AtlasTypeDefGraphStoreV2.getCurrentUser());
- setEncodedProperty(patchVertex, MODIFIED_BY_KEY, AtlasTypeDefGraphStoreV2.getCurrentUser());
-
- AtlasGraphProvider.getGraphInstance().commit();
- }
-
/**
* typedef patch details
*/
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
index 80141b4..70b01a5 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
@@ -28,15 +28,12 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.Status;
-import org.apache.atlas.model.patches.AtlasPatch;
-import org.apache.atlas.model.patches.AtlasPatch.AtlasPatches;
-import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.repository.Constants;
-import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasElement;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
import org.apache.atlas.repository.graphdb.AtlasIndexQuery.Result;
@@ -46,7 +43,6 @@ import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
@@ -57,30 +53,20 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN;
-import static org.apache.atlas.repository.Constants.CREATED_BY_KEY;
import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_DEFAULT;
import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_PROPERTY;
-import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.MODIFIED_BY_KEY;
-import static org.apache.atlas.repository.Constants.PATCH_ACTION_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.PATCH_ID_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.PATCH_DESCRIPTION_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.PATCH_STATE_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.PATCH_TYPE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.TIMESTAMP_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.TYPE_NAME_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.VERTEX_INDEX;
+import static org.apache.atlas.repository.Constants.TYPENAME_PROPERTY_KEY;
import static org.apache.atlas.repository.graph.AtlasGraphProvider.getGraphInstance;
-import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.SortOrder.*;
+import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.SortOrder.ASC;
+import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.SortOrder.DESC;
/**
* Utility methods for Graph.
@@ -163,6 +149,10 @@ public class AtlasGraphUtilsV2 {
return StringUtils.isNotEmpty(getIdFromVertex(vertex)) && StringUtils.isNotEmpty(getTypeName(vertex));
}
+ public static boolean isTypeVertex(AtlasVertex vertex) {
+ return vertex.getProperty(TYPENAME_PROPERTY_KEY, String.class) != null;
+ }
+
public static boolean isReference(AtlasType type) {
return isReference(type.getTypeCategory());
}
@@ -228,11 +218,12 @@ public class AtlasGraphUtilsV2 {
Object existingValue = element.getProperty(propertyName, Object.class);
- if (value == null) {
+ if (value == null || (value instanceof Collection && ((Collection)value).isEmpty())) {
if (existingValue != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing property {} from {}", propertyName, toString(element));
}
+
element.removeProperty(propertyName);
}
} else {
@@ -241,7 +232,7 @@ public class AtlasGraphUtilsV2 {
LOG.debug("Setting property {} in {}", propertyName, toString(element));
}
- if (value instanceof Date) {
+ if ( value instanceof Date) {
Long encodedValue = ((Date) value).getTime();
element.setProperty(propertyName, encodedValue);
} else {
@@ -341,22 +332,6 @@ public class AtlasGraphUtilsV2 {
return vertex;
}
- public static AtlasVertex findByPatchId(String patchId) {
- AtlasVertex ret = null;
- String indexQuery = getIndexSearchPrefix() + "\"" + PATCH_ID_PROPERTY_KEY + "\" : ("+ patchId +")";
- Iterator<Result<Object, Object>> results = getGraphInstance().indexQuery(VERTEX_INDEX, indexQuery).vertices();
-
- while (results != null && results.hasNext()) {
- ret = results.next().getVertex();
-
- if (ret != null) {
- break;
- }
- }
-
- return ret;
- }
-
public static AtlasVertex findByGuid(String guid) {
AtlasVertex ret = GraphTransactionInterceptor.getVertexFromCache(guid);
@@ -470,80 +445,12 @@ public class AtlasGraphUtilsV2 {
return vertex;
}
- public static Map<String, PatchStatus> getPatchesRegistry() {
- Map<String, PatchStatus> ret = new HashMap<>();
- AtlasPatches patches = getAllPatches();
-
- for (AtlasPatch patch : patches.getPatches()) {
- String patchId = patch.getId();
- PatchStatus patchStatus = patch.getStatus();
-
- if (patchId != null && patchStatus != null) {
- ret.put(patchId, patchStatus);
- }
- }
-
- return ret;
- }
-
- public static AtlasPatches getAllPatches() {
- List<AtlasPatch> ret = new ArrayList<>();
- String idxQueryString = getIndexSearchPrefix() + "\"" + PATCH_ID_PROPERTY_KEY + "\" : (*)";
- AtlasIndexQuery idxQuery = AtlasGraphProvider.getGraphInstance().indexQuery(VERTEX_INDEX, idxQueryString);
- Iterator<Result<Object, Object>> results;
-
- try {
- results = idxQuery.vertices();
-
- while (results != null && results.hasNext()) {
- AtlasVertex patchVertex = results.next().getVertex();
- AtlasPatch patch = toAtlasPatch(patchVertex);
-
- ret.add(patch);
- }
-
- // Sort the patches based on patch id
- if (CollectionUtils.isNotEmpty(ret)) {
- Collections.sort(ret, (p1, p2) -> p1.getId().compareTo(p2.getId()));
- }
- } catch (Throwable t) {
- // first time idx query is fired, returns no field exists in solr exception
- LOG.warn("getPatches() returned empty result!");
- }
-
- getGraphInstance().commit();
-
- return new AtlasPatches(ret);
- }
-
public int getOpenTransactions() {
Set openTransactions = getGraphInstance().getOpenTransactions();
return (openTransactions != null) ? openTransactions.size() : 0;
}
- private static AtlasPatch toAtlasPatch(AtlasVertex vertex) {
- AtlasPatch ret = new AtlasPatch();
-
- ret.setId(getEncodedProperty(vertex, PATCH_ID_PROPERTY_KEY, String.class));
- ret.setDescription(getEncodedProperty(vertex, PATCH_DESCRIPTION_PROPERTY_KEY, String.class));
- ret.setType(getEncodedProperty(vertex, PATCH_TYPE_PROPERTY_KEY, String.class));
- ret.setAction(getEncodedProperty(vertex, PATCH_ACTION_PROPERTY_KEY, String.class));
- ret.setCreatedBy(getEncodedProperty(vertex, CREATED_BY_KEY, String.class));
- ret.setUpdatedBy(getEncodedProperty(vertex, MODIFIED_BY_KEY, String.class));
- ret.setCreatedTime(getEncodedProperty(vertex, TIMESTAMP_PROPERTY_KEY, Long.class));
- ret.setUpdatedTime(getEncodedProperty(vertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class));
- ret.setStatus(getPatchStatus(vertex));
-
- return ret;
- }
-
- private static PatchStatus getPatchStatus(AtlasVertex vertex) {
- String patchStatus = AtlasGraphUtilsV2.getEncodedProperty(vertex, PATCH_STATE_PROPERTY_KEY, String.class);
-
- return patchStatus != null ? PatchStatus.valueOf(patchStatus) : UNKNOWN;
- }
-
public static List<String> findEntityGUIDsByType(String typename, SortOrder sortOrder) {
AtlasGraphQuery query = getGraphInstance().query()
.has(ENTITY_TYPE_PROPERTY_KEY, typename);
@@ -566,20 +473,16 @@ public class AtlasGraphUtilsV2 {
return ret;
}
- public static Iterator<Result<Object, Object>> findActiveEntityVerticesByType(String typename) {
- AtlasIndexQuery indexQuery = getActiveEntityIndexQuery(typename);
-
- return indexQuery.vertices();
+ public static Iterator<AtlasVertex> findActiveEntityVerticesByType(String typename) {
+ return findActiveEntityVerticesByType(getGraphInstance(), typename);
}
- private static AtlasIndexQuery getActiveEntityIndexQuery(String typename) {
- StringBuilder sb = new StringBuilder();
-
- sb.append(INDEX_SEARCH_PREFIX + "\"").append(TYPE_NAME_PROPERTY_KEY).append("\":").append(typename)
- .append(" AND ")
- .append(INDEX_SEARCH_PREFIX + "\"").append(STATE_PROPERTY_KEY).append("\":").append(Status.ACTIVE.name());
+ public static Iterator<AtlasVertex> findActiveEntityVerticesByType(AtlasGraph graph, String typename) {
+ AtlasGraphQuery query = graph.query()
+ .has(ENTITY_TYPE_PROPERTY_KEY, typename)
+ .has(STATE_PROPERTY_KEY, Status.ACTIVE.name());
- return getGraphInstance().indexQuery(VERTEX_INDEX, sb.toString());
+ return query.vertices().iterator();
}
public static List<String> findEntityGUIDsByType(String typename) {
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
index 03d2c06..3e1e023 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
@@ -947,7 +947,7 @@ public class EntityGraphRetriever {
return ret;
}
- public Object mapVertexToPrimitive(AtlasElement entityVertex, final String vertexPropertyName, AtlasAttributeDef attrDef) {
+ public static Object mapVertexToPrimitive(AtlasElement entityVertex, final String vertexPropertyName, AtlasAttributeDef attrDef) {
Object ret = null;
if (AtlasGraphUtilsV2.getEncodedProperty(entityVertex, vertexPropertyName, Object.class) == null) {
diff --git a/repository/src/test/java/org/apache/atlas/patches/AtlasPatchRegistryTest.java b/repository/src/test/java/org/apache/atlas/patches/AtlasPatchRegistryTest.java
new file mode 100644
index 0000000..58396e5
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/patches/AtlasPatchRegistryTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.patches;
+
+import org.apache.atlas.TestModules;
+import org.apache.atlas.model.patches.AtlasPatch;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.patches.AtlasPatchRegistry;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import javax.inject.Inject;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class AtlasPatchRegistryTest {
+ @Inject
+ private AtlasGraph graph;
+
+ @Test
+ public void noPatchesRegistered() {
+ AtlasPatchRegistry registry = new AtlasPatchRegistry(graph);
+
+ assertPatches(registry, 0);
+ }
+
+ @Test(dependsOnMethods = "noPatchesRegistered")
+ public void registerPatch() {
+ AtlasPatchRegistry registry = new AtlasPatchRegistry(graph);
+
+ registry.register("1", "test patch", "apply", AtlasPatch.PatchStatus.UNKNOWN);
+
+ assertPatches(registry, 1);
+ }
+
+ @Test(dependsOnMethods = "registerPatch")
+ public void updateStatusForPatch() {
+ final AtlasPatch.PatchStatus expectedStatus = AtlasPatch.PatchStatus.APPLIED;
+ String patchId = "1";
+
+ AtlasPatchRegistry registry = new AtlasPatchRegistry(graph);
+
+ registry.updateStatus(patchId, expectedStatus);
+
+ AtlasPatch.AtlasPatches patches = assertPatches(registry, 1);
+
+ assertEquals(patches.getPatches().get(0).getId(), patchId);
+ assertEquals(patches.getPatches().get(0).getStatus(), expectedStatus);
+ }
+
+
+ private AtlasPatch.AtlasPatches assertPatches(AtlasPatchRegistry registry, int i) {
+ AtlasPatch.AtlasPatches patches = registry.getAllPatches();
+
+ assertNotNull(patches);
+ assertEquals(patches.getPatches().size(), i);
+
+ return patches;
+ }
+}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index ce2d76f..fcfbd21 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -100,7 +100,7 @@ import static org.apache.atlas.notification.preprocessor.EntityPreprocessor.TYPE
* Consumer of notifications from hooks e.g., hive hook etc.
*/
@Component
-@Order(4)
+@Order(5)
@DependsOn(value = {"atlasTypeDefStoreInitializer", "atlasTypeDefGraphStoreV2"})
public class NotificationHookConsumer implements Service, ActiveStateChangeHandler {
private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index c5ceb9d..2716873 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -23,16 +23,16 @@ import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.authorize.AtlasAdminAccessRequest;
+import org.apache.atlas.authorize.AtlasAuthorizationUtils;
import org.apache.atlas.authorize.AtlasEntityAccessRequest;
import org.apache.atlas.authorize.AtlasPrivilege;
-import org.apache.atlas.authorize.AtlasAuthorizationUtils;
import org.apache.atlas.discovery.SearchContext;
import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.impexp.AtlasServer;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AtlasServer;
import org.apache.atlas.model.impexp.ExportImportAuditEntry;
import org.apache.atlas.model.impexp.MigrationStatus;
import org.apache.atlas.model.instance.AtlasCheckStateRequest;
@@ -46,14 +46,14 @@ import org.apache.atlas.repository.impexp.ImportService;
import org.apache.atlas.repository.impexp.MigrationProgressService;
import org.apache.atlas.repository.impexp.ZipSink;
import org.apache.atlas.repository.impexp.ZipSource;
+import org.apache.atlas.repository.patches.AtlasPatchManager;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
-import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.services.MetricsService;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.SearchTracker;
-import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.atlas.utils.AtlasJson;
+import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.atlas.web.filters.AtlasCSRFPreventionFilter;
import org.apache.atlas.web.service.ServiceState;
import org.apache.atlas.web.util.Servlets;
@@ -138,6 +138,7 @@ public class AdminResource {
private final ExportImportAuditService exportImportAuditService;
private final AtlasServerService atlasServerService;
private final AtlasEntityStore entityStore;
+ private final AtlasPatchManager patchManager;
static {
try {
@@ -152,7 +153,8 @@ public class AdminResource {
ExportService exportService, ImportService importService, SearchTracker activeSearches,
MigrationProgressService migrationProgressService,
AtlasServerService serverService,
- ExportImportAuditService exportImportAuditService, AtlasEntityStore entityStore) {
+ ExportImportAuditService exportImportAuditService, AtlasEntityStore entityStore,
+ AtlasPatchManager patchManager) {
this.serviceState = serviceState;
this.metricsService = metricsService;
this.exportService = exportService;
@@ -164,6 +166,7 @@ public class AdminResource {
this.entityStore = entityStore;
this.exportImportAuditService = exportImportAuditService;
this.importExportOperationLock = new ReentrantLock();
+ this.patchManager = patchManager;
}
/**
@@ -564,7 +567,7 @@ public class AdminResource {
LOG.debug("==> AdminResource.getAtlasPatches()");
}
- AtlasPatches ret = AtlasGraphUtilsV2.getAllPatches();
+ AtlasPatches ret = patchManager.getAllPatches();
if (LOG.isDebugEnabled()) {
LOG.debug("<== AdminResource.getAtlasPatches()");
diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
index 223a90a..563a16f 100644
--- a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
@@ -51,7 +51,7 @@ public class AdminResourceTest {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
- AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null);
+ AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null);
Response response = adminResource.getStatus();
assertEquals(response.getStatus(), HttpServletResponse.SC_OK);
JsonNode entity = AtlasJson.parseToV1JsonNode((String) response.getEntity());
@@ -62,7 +62,7 @@ public class AdminResourceTest {
public void testResourceGetsValueFromServiceState() throws IOException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
- AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null);
+ AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null);
Response response = adminResource.getStatus();
verify(serviceState).getState();