You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2018/09/12 06:01:08 UTC

[1/2] atlas git commit: ATLAS-2870: Improvement to AddClassification transform to use filters.

Repository: atlas
Updated Branches:
  refs/heads/branch-0.8 833fb20a1 -> b888ade51


ATLAS-2870: Improvement to AddClassification transform to use filters.

Signed-off-by: Ashutosh Mestry <am...@hortonworks.com>


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/56b36f6f
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/56b36f6f
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/56b36f6f

Branch: refs/heads/branch-0.8
Commit: 56b36f6f06df6d86cfd18d7b58be647675b6718a
Parents: 833fb20
Author: Ashutosh Mestry <am...@hortonworks.com>
Authored: Tue Sep 11 17:06:43 2018 -0700
Committer: Ashutosh Mestry <am...@hortonworks.com>
Committed: Tue Sep 11 22:25:46 2018 -0700

----------------------------------------------------------------------
 .../atlas/repository/impexp/ImportService.java  |  2 +-
 .../repository/impexp/ImportTransformer.java    | 58 +++++++++++++++++++-
 .../impexp/ImportTransformsShaper.java          | 16 +++++-
 .../repository/impexp/ImportTransformsTest.java | 29 +++++++++-
 4 files changed, 97 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/56b36f6f/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index 095f60f..a88ba2b 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -112,7 +112,7 @@ public class ImportService {
             return;
         }
 
-        importTransformsShaper.shape(importTransform);
+        importTransformsShaper.shape(importTransform, source.getExportResult().getRequest());
 
         source.setImportTransform(importTransform);
         if(LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/atlas/blob/56b36f6f/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java
index 213539d..ca5e7bf 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java
@@ -21,10 +21,14 @@ import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.commons.lang.StringUtils;
+import scala.Tuple3;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 
 
 public abstract class ImportTransformer {
@@ -70,8 +74,8 @@ public abstract class ImportTransformer {
         } else if (key.equals(TRANSFORMER_SET_DELETED)) {
             ret = new SetDeleted();
         } else if (key.equals(TRANSFORMER_NAME_ADD_CLASSIFICATION)) {
-            String name = (params == null || params.length < 1) ? "" : StringUtils.join(params, ":", 1, params.length);
-            ret = new AddClassification(name);
+            String name = (params == null || params.length < 1) ? "" : params[1];
+            ret = new AddClassification(name, (params != null && params.length == 3) ? params[2] : "");
         } else {
             throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "Error creating ImportTransformer. Unknown transformer: {}.", transformerSpec);
         }
@@ -150,12 +154,22 @@ public abstract class ImportTransformer {
     }
 
     static class AddClassification extends ImportTransformer {
+        private static final String FILTER_SCOPE_TOP_LEVEL = "topLevel";
+
+        private final String scope;
         private final String classificationName;
+        private List<AtlasObjectId> filters;
 
-        public AddClassification(String name) {
+        public AddClassification(String name, String scope) {
             super(TRANSFORMER_NAME_REMOVE_CLASSIFICATION);
 
             this.classificationName = name;
+            this.scope = scope;
+            filters = new ArrayList<>();
+        }
+
+        public void addFilter(AtlasObjectId objectId) {
+            filters.add(objectId);
         }
 
         @Override
@@ -165,6 +179,10 @@ public abstract class ImportTransformer {
             }
 
             AtlasEntity entity = (AtlasEntity) o;
+            if(!passThruFilters(entity)) {
+                return o;
+            }
+
             if(entity.getClassifications() == null) {
                 entity.setClassifications(new ArrayList<AtlasClassification>());
             }
@@ -179,6 +197,40 @@ public abstract class ImportTransformer {
             return entity;
         }
 
+        private boolean passThruFilters(AtlasEntity entity) {
+            if(StringUtils.isEmpty(scope) || !scope.equals(FILTER_SCOPE_TOP_LEVEL)) {
+                return true;
+            }
+
+            for (AtlasObjectId filter : filters) {
+                if(isMatch(filter, entity)) {
+                    return true;
+                }
+            }
+
+            return false;
+        }
+
+        private boolean isMatch(AtlasObjectId objectId, AtlasEntity entity) {
+            boolean ret = true;
+            if (StringUtils.isEmpty(objectId.getGuid())) {
+                ret = Objects.equals(objectId.getTypeName(), entity.getTypeName());
+                if (ret) {
+                    for (Map.Entry<String, Object> entry : objectId.getUniqueAttributes().entrySet()) {
+                        ret = ret && Objects.equals(entity.getAttribute(entry.getKey()), entry.getValue());
+                        if (!ret) {
+                            break;
+                        }
+                    }
+                }
+
+                return ret;
+
+            } else {
+                return Objects.equals(objectId.getGuid(), entity.getGuid());
+            }
+        }
+
         @Override
         public String toString() {
             return String.format("%s=%s", "AddClassification", classificationName);

http://git-wip-us.apache.org/repos/asf/atlas/blob/56b36f6f/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformsShaper.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformsShaper.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformsShaper.java
index 62eba45..ce0c8f1 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformsShaper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformsShaper.java
@@ -19,6 +19,8 @@
 package org.apache.atlas.repository.impexp;
 
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.typedef.AtlasClassificationDef;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
 import org.apache.atlas.store.AtlasTypeDefStore;
@@ -32,6 +34,7 @@ import javax.inject.Inject;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 @Component
 public class ImportTransformsShaper {
@@ -46,12 +49,12 @@ public class ImportTransformsShaper {
         this.typeDefStore = typeDefStore;
     }
 
-    public void shape(ImportTransforms importTransform) throws AtlasBaseException {
-        getCreateClassifications(importTransform);
+    public void shape(ImportTransforms importTransform, AtlasExportRequest request) throws AtlasBaseException {
+        getCreateClassifications(importTransform, request);
         updateTransformsWithSubTypes(importTransform);
     }
 
-    private void getCreateClassifications(ImportTransforms importTransform) throws AtlasBaseException {
+    private void getCreateClassifications(ImportTransforms importTransform, AtlasExportRequest request) throws AtlasBaseException {
         Map<String, Map<String, List<ImportTransformer>>> mapMapList = importTransform.getTransforms();
         for (Map<String, List<ImportTransformer>> mapList : mapMapList.values()) {
             for (List<ImportTransformer> list : mapList.values()) {
@@ -59,6 +62,7 @@ public class ImportTransformsShaper {
                     if((importTransformer instanceof ImportTransformer.AddClassification)) {
 
                         ImportTransformer.AddClassification addClassification = (ImportTransformer.AddClassification) importTransformer;
+                        addFilters(request, addClassification);
                         getCreateTag(addClassification.getClassificationName());
                     }
                 }
@@ -66,6 +70,12 @@ public class ImportTransformsShaper {
         }
     }
 
+    private void addFilters(AtlasExportRequest request, ImportTransformer.AddClassification transformer) {
+        for(AtlasObjectId objectId : request.getItemsToExport()) {
+            transformer.addFilter(objectId);
+        }
+    }
+
     private void updateTransformsWithSubTypes(ImportTransforms importTransforms) {
         String[] transformTypes = importTransforms.getTypes().toArray(new String[importTransforms.getTypes().size()]);
         for (int i = 0; i < transformTypes.length; i++) {

http://git-wip-us.apache.org/repos/asf/atlas/blob/56b36f6f/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java
index cd623d0..1959576 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java
@@ -21,6 +21,7 @@ import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasObjectId;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
@@ -37,6 +38,8 @@ import static org.testng.Assert.assertTrue;
 
 public class ImportTransformsTest {
     private final String ATTR_NAME_QUALIFIED_NAME =  "qualifiedName";
+    private final String COLUMN_QUALIFIED_NAME_FORMAT = "col%s.TABLE1.default@cl1";
+
     private final String lowerCaseCL1   = "@cl1";
     private final String lowerCaseCL2   = "@cl2";
     private final String jsonLowerCaseReplace = "{ \"hive_table\": { \"qualifiedName\":[ \"lowercase\", \"replace:@cl1:@cl2\" ] } }";
@@ -48,6 +51,7 @@ public class ImportTransformsTest {
     private final String jsonSetDeleted = "{ \"hive_table\": { \"*\":[ \"setDeleted\" ] } }";
     private final String jsonAddClasification = "{ \"hive_table\": { \"*\":[ \"addClassification:REPLICATED\" ] } }";
     private final String jsonAddClasification2 = "{ \"hive_table\": { \"*\":[ \"addClassification:REPLICATED_2\" ] } }";
+    private final String jsonAddClasificationScoped = "{ \"hive_column\": { \"*\":[ \"addClassification:REPLICATED_2:topLevel\" ] } }";
 
     private ImportTransforms transform;
     private String HIVE_TABLE_ATTR_SYNC_INFO = "hive_table.syncInfo";
@@ -210,6 +214,29 @@ public class ImportTransformsTest {
         addClassification_MultipleClassificationsAreAdded(entity);
     }
 
+    @Test
+    public void addScopedClassification() throws AtlasBaseException {
+        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = getAtlasEntityWithExtInfo();
+        AtlasEntity entity = entityWithExtInfo.getReferredEntities().get("2");
+
+        int existingClassificationsCount =  entityWithExtInfo.getEntity().getClassifications() != null ? entity.getClassifications().size() : 0;
+        ImportTransforms t = ImportTransforms.fromJson(jsonAddClasificationScoped);
+
+        assertTrue(t.getTransforms().size() > 0);
+
+        ImportTransformer.AddClassification classification = (ImportTransformer.AddClassification) t.getTransforms().get("hive_column").get("*").get(0);
+        AtlasObjectId objectId = new AtlasObjectId("hive_column", ATTR_NAME_QUALIFIED_NAME, String.format(COLUMN_QUALIFIED_NAME_FORMAT, 2));
+        classification.addFilter(objectId);
+        t.apply(entityWithExtInfo);
+
+        assertNotNull(t);
+
+        assertNull(entityWithExtInfo.getEntity().getClassifications());
+        assertNull(entityWithExtInfo.getReferredEntities().get("0").getClassifications());
+        assertEquals(entityWithExtInfo.getReferredEntities().get("1").getClassifications().size(), existingClassificationsCount + 1);
+        assertNull(entityWithExtInfo.getReferredEntities().get("2").getClassifications());
+    }
+
     private void addClassification_ExistingClassificationsAreHandled(AtlasEntity entity) throws AtlasBaseException {
         int existingClassificationsCount =  entity.getClassifications() != null ? entity.getClassifications().size() : 0;
         assertTrue(existingClassificationsCount > 0);
@@ -270,7 +297,7 @@ public class ImportTransformsTest {
         AtlasEntity entity = new AtlasEntity("hive_column");
 
         Map<String, Object> attributes = new HashMap<>();
-        attributes.put(ATTR_NAME_QUALIFIED_NAME, String.format("col%s.TABLE1.default@cl1", index));
+        attributes.put(ATTR_NAME_QUALIFIED_NAME, String.format(COLUMN_QUALIFIED_NAME_FORMAT, index));
         attributes.put("name", "col" + index);
 
         entity.setAttributes(attributes);


[2/2] atlas git commit: ATLAS-2869: Hdfs_path if requested are created and then proceeds with export.

Posted by am...@apache.org.
ATLAS-2869: Hdfs_path if requested are created and then proceeds with export.

Signed-off-by: Ashutosh Mestry <am...@hortonworks.com>


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/b888ade5
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/b888ade5
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/b888ade5

Branch: refs/heads/branch-0.8
Commit: b888ade513ce6b72d9361b5969c8cf55262e5e84
Parents: 56b36f6
Author: Ashutosh Mestry <am...@hortonworks.com>
Authored: Tue Sep 11 15:29:02 2018 -0700
Committer: Ashutosh Mestry <am...@hortonworks.com>
Committed: Tue Sep 11 22:26:04 2018 -0700

----------------------------------------------------------------------
 .../atlas/repository/impexp/ExportService.java  |  12 +-
 .../impexp/HdfsPathEntityCreator.java           | 131 +++++++++++++++++++
 .../impexp/HdfsPathEntityCreatorTest.java       |  81 ++++++++++++
 3 files changed, 221 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/b888ade5/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
index aded67c..612549d 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
@@ -74,14 +74,16 @@ public class ExportService {
     private final EntityGraphRetriever      entityGraphRetriever;
     private final AtlasGremlinQueryProvider gremlinQueryProvider;
     private       ExportTypeProcessor       exportTypeProcessor;
-
+    private final HdfsPathEntityCreator     hdfsPathEntityCreator;
     @Inject
-    public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph, AuditsWriter auditsWriter) {
+    public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph,
+                         AuditsWriter auditsWriter, HdfsPathEntityCreator hdfsPathEntityCreator) {
         this.typeRegistry         = typeRegistry;
         this.entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry);
         this.atlasGraph           = atlasGraph;
         this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE;
-        this.auditsWriter = auditsWriter;
+        this.auditsWriter         = auditsWriter;
+        this.hdfsPathEntityCreator = hdfsPathEntityCreator;
     }
 
     public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName,
@@ -237,6 +239,10 @@ public class ExportService {
     private List<String> getStartingEntity(AtlasObjectId item, ExportContext context) throws AtlasBaseException {
         List<String> ret = null;
 
+        if(item.getTypeName().equalsIgnoreCase(HdfsPathEntityCreator.HDFS_PATH_TYPE)) {
+            hdfsPathEntityCreator.getCreateEntity(item);
+        }
+
         if (StringUtils.isNotEmpty(item.getGuid())) {
             ret = Collections.singletonList(item.getGuid());
         } else if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_FOR_TYPE) && StringUtils.isNotEmpty(item.getTypeName())) {

http://git-wip-us.apache.org/repos/asf/atlas/blob/b888ade5/repository/src/main/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreator.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreator.java b/repository/src/main/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreator.java
new file mode 100644
index 0000000..fddd60b
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreator.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1;
+import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.atlas.repository.impexp.AuditsWriter.getCurrentClusterName;
+
+@Component
+public class HdfsPathEntityCreator {
+    protected static final Logger LOG = LoggerFactory.getLogger(HdfsPathEntityCreator.class);
+
+    public static final String HDFS_PATH_TYPE = "hdfs_path";
+    public static final String HDFS_PATH_ATTRIBUTE_NAME_NAME = "name";
+    public static final String HDFS_PATH_ATTRIBUTE_NAME_CLUSTER_NAME = "clusterName";
+    public static final String HDFS_PATH_ATTRIBUTE_NAME_PATH = "path";
+    public static final String HDFS_PATH_ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
+
+    private static final String QUALIFIED_NAME_FORMAT = "%s@%s";
+    private final String PATH_SEPARATOR = "/";
+
+    private AtlasTypeRegistry typeRegistry;
+    private AtlasEntityStoreV1 entityStore;
+
+    @Inject
+    public HdfsPathEntityCreator(AtlasTypeRegistry typeRegistry, AtlasEntityStoreV1 entityStore) {
+        this.typeRegistry = typeRegistry;
+        this.entityStore = entityStore;
+    }
+
+    public AtlasEntity.AtlasEntityWithExtInfo getCreateEntity(AtlasObjectId item) throws AtlasBaseException {
+        if(!item.getUniqueAttributes().containsKey(HDFS_PATH_ATTRIBUTE_NAME_PATH)) {
+            return null;
+        }
+
+        return getCreateEntity((String) item.getUniqueAttributes().get(HDFS_PATH_ATTRIBUTE_NAME_PATH));
+    }
+
+    public AtlasEntity.AtlasEntityWithExtInfo getCreateEntity(String path) throws AtlasBaseException {
+        return getCreateEntity(path, getCurrentClusterName());
+    }
+
+    public AtlasEntity.AtlasEntityWithExtInfo getCreateEntity(String path, String clusterName) throws AtlasBaseException {
+        String pathWithTrailingSeparator = getPathWithTrailingSeparator(path);
+        AtlasEntityType hdfsPathEntityType = getHdfsPathEntityType();
+        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = getHDFSPathEntity(hdfsPathEntityType, pathWithTrailingSeparator, clusterName);
+        if(entityWithExtInfo != null) {
+            return entityWithExtInfo;
+        }
+
+        AtlasEntity entity = createHDFSPathEntity(hdfsPathEntityType, pathWithTrailingSeparator, clusterName);
+        AtlasEntityStream entityStream = new AtlasEntityStream(entity);
+        EntityMutationResponse entityMutationResponse = entityStore.createOrUpdate(entityStream, false);
+        if(entityMutationResponse.getCreatedEntities().size() == 0) {
+            return null;
+        }
+
+        return getHDFSPathEntity(hdfsPathEntityType, pathWithTrailingSeparator, clusterName);
+    }
+
+    private AtlasEntity createHDFSPathEntity(AtlasEntityType hdfsPathEntityType, String path, String clusterName) {
+        AtlasEntity entity = hdfsPathEntityType.createDefaultValue();
+
+        entity.setAttribute(HDFS_PATH_ATTRIBUTE_QUALIFIED_NAME, getQualifiedName(path, clusterName));
+        entity.setAttribute(HDFS_PATH_ATTRIBUTE_NAME_PATH, path);
+        entity.setAttribute(HDFS_PATH_ATTRIBUTE_NAME_NAME, path);
+        entity.setAttribute(HDFS_PATH_ATTRIBUTE_NAME_CLUSTER_NAME, clusterName);
+
+        return entity;
+    }
+
+    private AtlasEntity.AtlasEntityWithExtInfo getHDFSPathEntity(AtlasEntityType hdfsPathEntityType, String path, String clusterName) {
+        try {
+            return entityStore.getByUniqueAttributes(hdfsPathEntityType, getUniqueAttributes(path, clusterName));
+        } catch (AtlasBaseException e) {
+            return null;
+        }
+    }
+
+    private AtlasEntityType getHdfsPathEntityType() throws AtlasBaseException {
+        return (AtlasEntityType) typeRegistry.getType(HDFS_PATH_TYPE);
+    }
+
+    private Map<String,Object> getUniqueAttributes(String path, String clusterName) {
+        Map<String,Object>  ret = new HashMap<String, Object>();
+        ret.put(HDFS_PATH_ATTRIBUTE_QUALIFIED_NAME, getQualifiedName(path, clusterName));
+        return ret;
+    }
+
+    private String getPathWithTrailingSeparator(String path) {
+        if(path.endsWith(PATH_SEPARATOR)) {
+            return path;
+        }
+
+        return path + PATH_SEPARATOR;
+    }
+
+    public static String getQualifiedName(String path, String clusterName) {
+        return String.format(QUALIFIED_NAME_FORMAT, path, clusterName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/b888ade5/repository/src/test/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreatorTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreatorTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreatorTest.java
new file mode 100644
index 0000000..1863b8d
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreatorTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.TestModules;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import javax.inject.Inject;
+import java.io.IOException;
+
+import static org.apache.atlas.repository.impexp.HdfsPathEntityCreator.HDFS_PATH_ATTRIBUTE_NAME_CLUSTER_NAME;
+import static org.apache.atlas.repository.impexp.HdfsPathEntityCreator.HDFS_PATH_ATTRIBUTE_NAME_NAME;
+import static org.apache.atlas.repository.impexp.HdfsPathEntityCreator.HDFS_PATH_ATTRIBUTE_QUALIFIED_NAME;
+import static org.apache.atlas.repository.impexp.HdfsPathEntityCreator.getQualifiedName;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadFsModel;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class HdfsPathEntityCreatorTest extends ExportImportTestBase {
+
+    @Inject
+    AtlasTypeRegistry typeRegistry;
+
+    @Inject
+    private AtlasTypeDefStore typeDefStore;
+
+    @Inject
+    private HdfsPathEntityCreator hdfsPathEntityCreator;
+
+    private static final String expectedPath = "hdfs://server-name/warehouse/hr";
+    private static final String expectedClusterName = "cl1";
+
+    @BeforeClass
+    public void setup() throws IOException, AtlasBaseException {
+        basicSetup(typeDefStore, typeRegistry);
+        loadFsModel(typeDefStore, typeRegistry);
+    }
+
+    @Test
+    public void verifyCreate() throws AtlasBaseException {
+
+        String expectedQualifiedName = getQualifiedName(expectedPath + "/", expectedClusterName);
+        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = hdfsPathEntityCreator.getCreateEntity(expectedPath, expectedClusterName);
+
+        assertNotNull(entityWithExtInfo);
+        AtlasEntity entity = entityWithExtInfo.getEntity();
+        assertEquals(entity.getAttribute(HdfsPathEntityCreator.HDFS_PATH_ATTRIBUTE_NAME_PATH), expectedPath + "/");
+        assertEquals(entity.getAttribute(HDFS_PATH_ATTRIBUTE_QUALIFIED_NAME),expectedQualifiedName);
+        assertEquals(entity.getAttribute(HDFS_PATH_ATTRIBUTE_NAME_NAME), expectedPath + "/");
+        assertEquals(entity.getAttribute(HDFS_PATH_ATTRIBUTE_NAME_CLUSTER_NAME), expectedClusterName);
+    }
+
+    @Test(dependsOnMethods = "verifyCreate")
+    public void verifyGet() throws AtlasBaseException {
+        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = hdfsPathEntityCreator.getCreateEntity(expectedPath, expectedClusterName);
+
+        assertNotNull(entityWithExtInfo);
+    }
+}