You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2018/11/01 23:01:58 UTC
[12/50] [abbrv] atlas git commit: ATLAS-2869: Hdfs_path if requested
are created and then proceeds with export.
ATLAS-2869: Hdfs_path if requested are created and then proceeds with export.
Signed-off-by: Ashutosh Mestry <am...@hortonworks.com>
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/bf240459
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/bf240459
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/bf240459
Branch: refs/heads/branch-1.0
Commit: bf240459b2e39aa9471f42e77dd83b2f22a091db
Parents: e441415
Author: Ashutosh Mestry <am...@hortonworks.com>
Authored: Tue Sep 11 15:29:02 2018 -0700
Committer: Ashutosh Mestry <am...@hortonworks.com>
Committed: Thu Nov 1 15:42:55 2018 -0700
----------------------------------------------------------------------
.../atlas/repository/impexp/ExportService.java | 12 +-
.../impexp/HdfsPathEntityCreator.java | 131 +++++++++++++++++++
.../impexp/HdfsPathEntityCreatorTest.java | 81 ++++++++++++
3 files changed, 221 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/bf240459/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
index d3cff78..f10d615 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
@@ -81,14 +81,16 @@ public class ExportService {
private final EntityGraphRetriever entityGraphRetriever;
private final AtlasGremlinQueryProvider gremlinQueryProvider;
private ExportTypeProcessor exportTypeProcessor;
-
+ private final HdfsPathEntityCreator hdfsPathEntityCreator;
@Inject
- public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph, AuditsWriter auditsWriter) {
+ public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph,
+ AuditsWriter auditsWriter, HdfsPathEntityCreator hdfsPathEntityCreator) {
this.typeRegistry = typeRegistry;
this.entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry);
this.atlasGraph = atlasGraph;
this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE;
- this.auditsWriter = auditsWriter;
+ this.auditsWriter = auditsWriter;
+ this.hdfsPathEntityCreator = hdfsPathEntityCreator;
}
public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName,
@@ -244,6 +246,10 @@ public class ExportService {
private List<String> getStartingEntity(AtlasObjectId item, ExportContext context) throws AtlasBaseException {
List<String> ret = null;
+ if(item.getTypeName().equalsIgnoreCase(HdfsPathEntityCreator.HDFS_PATH_TYPE)) {
+ hdfsPathEntityCreator.getCreateEntity(item);
+ }
+
if (StringUtils.isNotEmpty(item.getGuid())) {
ret = Collections.singletonList(item.getGuid());
} else if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_FOR_TYPE) && StringUtils.isNotEmpty(item.getTypeName())) {
http://git-wip-us.apache.org/repos/asf/atlas/blob/bf240459/repository/src/main/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreator.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreator.java b/repository/src/main/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreator.java
new file mode 100644
index 0000000..fddd60b
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreator.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1;
+import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.atlas.repository.impexp.AuditsWriter.getCurrentClusterName;
+
+@Component
+public class HdfsPathEntityCreator {
+ protected static final Logger LOG = LoggerFactory.getLogger(HdfsPathEntityCreator.class);
+
+ public static final String HDFS_PATH_TYPE = "hdfs_path";
+ public static final String HDFS_PATH_ATTRIBUTE_NAME_NAME = "name";
+ public static final String HDFS_PATH_ATTRIBUTE_NAME_CLUSTER_NAME = "clusterName";
+ public static final String HDFS_PATH_ATTRIBUTE_NAME_PATH = "path";
+ public static final String HDFS_PATH_ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
+
+ private static final String QUALIFIED_NAME_FORMAT = "%s@%s";
+ private final String PATH_SEPARATOR = "/";
+
+ private AtlasTypeRegistry typeRegistry;
+ private AtlasEntityStoreV1 entityStore;
+
+ @Inject
+ public HdfsPathEntityCreator(AtlasTypeRegistry typeRegistry, AtlasEntityStoreV1 entityStore) {
+ this.typeRegistry = typeRegistry;
+ this.entityStore = entityStore;
+ }
+
+ public AtlasEntity.AtlasEntityWithExtInfo getCreateEntity(AtlasObjectId item) throws AtlasBaseException {
+ if(!item.getUniqueAttributes().containsKey(HDFS_PATH_ATTRIBUTE_NAME_PATH)) {
+ return null;
+ }
+
+ return getCreateEntity((String) item.getUniqueAttributes().get(HDFS_PATH_ATTRIBUTE_NAME_PATH));
+ }
+
+ public AtlasEntity.AtlasEntityWithExtInfo getCreateEntity(String path) throws AtlasBaseException {
+ return getCreateEntity(path, getCurrentClusterName());
+ }
+
+ public AtlasEntity.AtlasEntityWithExtInfo getCreateEntity(String path, String clusterName) throws AtlasBaseException {
+ String pathWithTrailingSeparator = getPathWithTrailingSeparator(path);
+ AtlasEntityType hdfsPathEntityType = getHdfsPathEntityType();
+ AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = getHDFSPathEntity(hdfsPathEntityType, pathWithTrailingSeparator, clusterName);
+ if(entityWithExtInfo != null) {
+ return entityWithExtInfo;
+ }
+
+ AtlasEntity entity = createHDFSPathEntity(hdfsPathEntityType, pathWithTrailingSeparator, clusterName);
+ AtlasEntityStream entityStream = new AtlasEntityStream(entity);
+ EntityMutationResponse entityMutationResponse = entityStore.createOrUpdate(entityStream, false);
+ if(entityMutationResponse.getCreatedEntities().size() == 0) {
+ return null;
+ }
+
+ return getHDFSPathEntity(hdfsPathEntityType, pathWithTrailingSeparator, clusterName);
+ }
+
+ private AtlasEntity createHDFSPathEntity(AtlasEntityType hdfsPathEntityType, String path, String clusterName) {
+ AtlasEntity entity = hdfsPathEntityType.createDefaultValue();
+
+ entity.setAttribute(HDFS_PATH_ATTRIBUTE_QUALIFIED_NAME, getQualifiedName(path, clusterName));
+ entity.setAttribute(HDFS_PATH_ATTRIBUTE_NAME_PATH, path);
+ entity.setAttribute(HDFS_PATH_ATTRIBUTE_NAME_NAME, path);
+ entity.setAttribute(HDFS_PATH_ATTRIBUTE_NAME_CLUSTER_NAME, clusterName);
+
+ return entity;
+ }
+
+ private AtlasEntity.AtlasEntityWithExtInfo getHDFSPathEntity(AtlasEntityType hdfsPathEntityType, String path, String clusterName) {
+ try {
+ return entityStore.getByUniqueAttributes(hdfsPathEntityType, getUniqueAttributes(path, clusterName));
+ } catch (AtlasBaseException e) {
+ return null;
+ }
+ }
+
+ private AtlasEntityType getHdfsPathEntityType() throws AtlasBaseException {
+ return (AtlasEntityType) typeRegistry.getType(HDFS_PATH_TYPE);
+ }
+
+ private Map<String,Object> getUniqueAttributes(String path, String clusterName) {
+ Map<String,Object> ret = new HashMap<String, Object>();
+ ret.put(HDFS_PATH_ATTRIBUTE_QUALIFIED_NAME, getQualifiedName(path, clusterName));
+ return ret;
+ }
+
+ private String getPathWithTrailingSeparator(String path) {
+ if(path.endsWith(PATH_SEPARATOR)) {
+ return path;
+ }
+
+ return path + PATH_SEPARATOR;
+ }
+
+ public static String getQualifiedName(String path, String clusterName) {
+ return String.format(QUALIFIED_NAME_FORMAT, path, clusterName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/atlas/blob/bf240459/repository/src/test/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreatorTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreatorTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreatorTest.java
new file mode 100644
index 0000000..1863b8d
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreatorTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.TestModules;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import javax.inject.Inject;
+import java.io.IOException;
+
+import static org.apache.atlas.repository.impexp.HdfsPathEntityCreator.HDFS_PATH_ATTRIBUTE_NAME_CLUSTER_NAME;
+import static org.apache.atlas.repository.impexp.HdfsPathEntityCreator.HDFS_PATH_ATTRIBUTE_NAME_NAME;
+import static org.apache.atlas.repository.impexp.HdfsPathEntityCreator.HDFS_PATH_ATTRIBUTE_QUALIFIED_NAME;
+import static org.apache.atlas.repository.impexp.HdfsPathEntityCreator.getQualifiedName;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadFsModel;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class HdfsPathEntityCreatorTest extends ExportImportTestBase {
+
+ @Inject
+ AtlasTypeRegistry typeRegistry;
+
+ @Inject
+ private AtlasTypeDefStore typeDefStore;
+
+ @Inject
+ private HdfsPathEntityCreator hdfsPathEntityCreator;
+
+ private static final String expectedPath = "hdfs://server-name/warehouse/hr";
+ private static final String expectedClusterName = "cl1";
+
+ @BeforeClass
+ public void setup() throws IOException, AtlasBaseException {
+ basicSetup(typeDefStore, typeRegistry);
+ loadFsModel(typeDefStore, typeRegistry);
+ }
+
+ @Test
+ public void verifyCreate() throws AtlasBaseException {
+
+ String expectedQualifiedName = getQualifiedName(expectedPath + "/", expectedClusterName);
+ AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = hdfsPathEntityCreator.getCreateEntity(expectedPath, expectedClusterName);
+
+ assertNotNull(entityWithExtInfo);
+ AtlasEntity entity = entityWithExtInfo.getEntity();
+ assertEquals(entity.getAttribute(HdfsPathEntityCreator.HDFS_PATH_ATTRIBUTE_NAME_PATH), expectedPath + "/");
+ assertEquals(entity.getAttribute(HDFS_PATH_ATTRIBUTE_QUALIFIED_NAME),expectedQualifiedName);
+ assertEquals(entity.getAttribute(HDFS_PATH_ATTRIBUTE_NAME_NAME), expectedPath + "/");
+ assertEquals(entity.getAttribute(HDFS_PATH_ATTRIBUTE_NAME_CLUSTER_NAME), expectedClusterName);
+ }
+
+ @Test(dependsOnMethods = "verifyCreate")
+ public void verifyGet() throws AtlasBaseException {
+ AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = hdfsPathEntityCreator.getCreateEntity(expectedPath, expectedClusterName);
+
+ assertNotNull(entityWithExtInfo);
+ }
+}