You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by pi...@apache.org on 2023/11/03 09:28:43 UTC
(atlas) branch master updated: ATLAS-4804: Set name field with qualifiedName for impala_process and impala_process_execution
This is an automated email from the ASF dual-hosted git repository.
pinal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 3574a60fd ATLAS-4804: Set name field with qualifiedName for impala_process and impala_process_execution
3574a60fd is described below
commit 3574a60fd2fad0e12454893d754f3c6ab009717a
Author: pareshD <pa...@cloudera.com>
AuthorDate: Fri Nov 3 12:18:05 2023 +0530
ATLAS-4804: Set name field with qualifiedName for impala_process and impala_process_execution
Signed-off-by: Pinal Shah <pi...@freestoneinfotech.com>
---
.../java/org/apache/atlas/AtlasConfiguration.java | 1 +
.../repository/patches/AtlasPatchManager.java | 1 +
.../repository/patches/ProcessImpalaNamePatch.java | 121 +++++++++++++++++++++
3 files changed, 123 insertions(+)
diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index df886753f..31ec605f3 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -84,6 +84,7 @@ public enum AtlasConfiguration {
STORAGE_CONSISTENCY_LOCK_ENABLED("atlas.graph.storage.consistency-lock.enabled", true),
REBUILD_INDEX("atlas.rebuild.index", false),
PROCESS_NAME_UPDATE_PATCH("atlas.process.name.update.patch", false),
+ PROCESS_IMPALA_NAME_UPDATE_PATCH("atlas.process.impala.name.update.patch", false),
STORE_DIFFERENTIAL_AUDITS("atlas.entity.audit.differential", false),
DSL_EXECUTOR_TRAVERSAL("atlas.dsl.executor.traversal", true),
DSL_CACHED_TRANSLATOR("atlas.dsl.cached.translator", true),
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java
index a9774ae58..e72a87713 100644
--- a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java
@@ -97,6 +97,7 @@ public class AtlasPatchManager {
handlers.add(new ProcessNamePatch(context));
handlers.add(new UpdateCompositeIndexStatusPatch(context));
handlers.add(new RelationshipTypeNamePatch(context));
+ handlers.add(new ProcessImpalaNamePatch(context));
LOG.info("<== AtlasPatchManager.init()");
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/ProcessImpalaNamePatch.java b/repository/src/main/java/org/apache/atlas/repository/patches/ProcessImpalaNamePatch.java
new file mode 100644
index 000000000..7eb918481
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/ProcessImpalaNamePatch.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.patches;
+
+import org.apache.atlas.AtlasConfiguration;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.pc.WorkItemManager;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.atlas.type.AtlasEntityType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+
+import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
+
+public class ProcessImpalaNamePatch extends AtlasPatchHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(ProcessImpalaNamePatch.class);
+
+ private static final String PATCH_ID = "JAVA_PATCH_0000_012";
+ private static final String PATCH_DESCRIPTION = "Set name to qualifiedName";
+
+ private final PatchContext context;
+
+ public ProcessImpalaNamePatch(PatchContext context) {
+ super(context.getPatchRegistry(), PATCH_ID, PATCH_DESCRIPTION);
+
+ this.context = context;
+ }
+
+ @Override
+ public void apply() throws AtlasBaseException {
+ if (AtlasConfiguration.PROCESS_IMPALA_NAME_UPDATE_PATCH.getBoolean() == false) {
+ LOG.info("ProcessImpalaNamePatch: Skipped, since not enabled!");
+ return;
+ }
+ ConcurrentPatchProcessor patchProcessor = new ProcessImpalaNamePatchProcessor(context);
+
+ patchProcessor.apply();
+
+ setStatus(APPLIED);
+
+ LOG.info("ProcessImpalaNamePatch.apply(): patchId={}, status={}", getPatchId(), getStatus());
+ }
+
+ public static class ProcessImpalaNamePatchProcessor extends ConcurrentPatchProcessor {
+ private static final String TYPE_NAME_IMPALA_PROCESS = "impala_process";
+ private static final String TYPE_NAME_IMPALA_PROCESS_EXECUTION = "impala_process_execution";
+ private static final String ATTR_NAME_QUALIFIED_NAME = "qualifiedName";
+ private static final String ATTR_NAME_NAME = "name";
+ private static final String[] processTypes = {TYPE_NAME_IMPALA_PROCESS, TYPE_NAME_IMPALA_PROCESS_EXECUTION};
+
+ public ProcessImpalaNamePatchProcessor(PatchContext context) {
+ super(context);
+ }
+
+ @Override
+ protected void prepareForExecution() {
+ }
+
+ @Override
+ public void submitVerticesToUpdate(WorkItemManager manager) {
+ AtlasGraph graph = getGraph();
+
+ for (String typeName : processTypes) {
+ LOG.info("finding entities of type {}", typeName);
+
+ Iterable<Object> iterable = graph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, typeName).vertexIds();
+ int count = 0;
+
+ for (Iterator<Object> iter = iterable.iterator(); iter.hasNext(); ) {
+ Object vertexId = iter.next();
+
+ manager.checkProduce(vertexId);
+
+ count++;
+
+ }
+
+ LOG.info("found {} entities of type {}", count, typeName);
+ }
+ }
+
+ @Override
+ protected void processVertexItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("processItem(typeName={}, vertexId={})", typeName, vertexId);
+ }
+
+ try {
+ String qualifiedName = AtlasGraphUtilsV2.getProperty(vertex, entityType.getVertexPropertyName(ATTR_NAME_QUALIFIED_NAME), String.class);
+ AtlasGraphUtilsV2.setEncodedProperty(vertex, entityType.getVertexPropertyName(ATTR_NAME_NAME), qualifiedName);
+ } catch (AtlasBaseException e) {
+ LOG.error("Error updating: {}", vertexId);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("processItem(typeName={}, vertexId={}): Done!", typeName, vertexId);
+ }
+ }
+ }
+}