You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2019/01/22 04:38:35 UTC

[atlas] branch master updated: ATLAS-3020: Audit APIs for classification updates.

This is an automated email from the ASF dual-hosted git repository.

amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new cb2421b  ATLAS-3020: Audit APIs for classification updates.
     new cd51cef  Merge branch 'master' of https://gitbox.apache.org/repos/asf/atlas
cb2421b is described below

commit cb2421bc0eb4f6a9c2d9f6dadf3b9ba0080e3e82
Author: Ashutosh Mestry <am...@hortonworks.com>
AuthorDate: Mon Jan 21 20:34:50 2019 -0800

    ATLAS-3020: Audit APIs for classification updates.
---
 .../main/java/org/apache/atlas/AtlasClientV2.java  |  21 +-
 .../atlas/model/instance/AtlasEntityHeaders.java   |  56 ++++
 .../audit/CassandraBasedAuditRepository.java       |   7 +
 .../repository/audit/EntityAuditRepository.java    |   9 +
 .../audit/HBaseBasedAuditRepository.java           |  52 +++-
 .../audit/InMemoryEntityAuditRepository.java       |  17 ++
 .../audit/NoopEntityAuditRepository.java           |   7 +
 .../repository/store/graph/AtlasEntityStore.java   |   3 +
 .../store/graph/v2/AtlasEntityStoreV2.java         |   7 +
 .../store/graph/v2/ClassificationAssociator.java   | 316 +++++++++++++++++++++
 .../store/graph/v2/EntityGraphRetriever.java       |   2 +-
 .../graph/v2/ClassificationAssociatorTest.java     | 235 +++++++++++++++
 .../col-entity-None.json                           |  10 +
 .../col-entity-PII-FIN_PII.json                    |  32 +++
 .../classification-association/col-entity-PII.json |  22 ++
 .../col-entity-T1-prop-Tn.json                     |  34 +++
 .../classification-association/header-FIN_PII.json |  32 +++
 .../classification-association/header-None.json    |  21 ++
 .../header-PII-VENDOR_PII.json                     |  42 +++
 .../classification-association/header-PII.json     |  32 +++
 .../header-Tx-prop-T1.json                         |  42 +++
 .../json/classification-association/header-Tx.json |  26 ++
 .../classification-association/header-empty.json   |   3 +
 .../java/org/apache/atlas/web/rest/EntityREST.java |  59 +++-
 24 files changed, 1081 insertions(+), 6 deletions(-)

diff --git a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
index 7c8caee..33466e5 100644
--- a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
+++ b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
@@ -27,7 +27,9 @@ import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasClassification.AtlasClassifications;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.AtlasRelationship;
+import org.apache.atlas.model.instance.AtlasEntityHeaders;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.model.lineage.AtlasLineageInfo;
 import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
@@ -72,6 +74,8 @@ public class AtlasClientV2 extends AtlasBaseClient {
 
     // Relationships APIs
     private static final String RELATIONSHIPS_URI  = BASE_URI + "v2/relationship/";
+    private static final String BULK_HEADERS = "bulk/headers";
+    private static final String BULK_SET_CLASSIFICATIONS = "bulk/setClassifications";
 
     public AtlasClientV2(String[] baseUrl, String[] basicAuthUserNamePassword) {
         super(baseUrl, basicAuthUserNamePassword);
@@ -326,13 +330,26 @@ public class AtlasClientV2 extends AtlasBaseClient {
     }
 
     public void deleteClassifications(String guid, List<AtlasClassification> classifications) throws AtlasServiceException {
-        callAPI(formatPathParameters(API_V2.GET_CLASSIFICATIONS, guid), AtlasClassifications.class, classifications);
+        for (AtlasClassification c : classifications) {
+            callAPI(formatPathParameters(API_V2.DELETE_CLASSIFICATION, guid, c.getTypeName()), AtlasClassifications.class, classifications);
+        }
     }
 
     public void deleteClassification(String guid, String classificationName) throws AtlasServiceException {
         callAPI(formatPathParameters(API_V2.DELETE_CLASSIFICATION, guid, classificationName), null, null);
     }
 
+    public AtlasEntityHeaders getEntityHeaders(long tagUpdateStartTime) throws AtlasServiceException {
+        MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl();
+        queryParams.add("tagUpdateStartTime", Long.toString(tagUpdateStartTime));
+
+        return callAPI(API_V2.GET_BULK_HEADERS, AtlasEntityHeaders.class, queryParams);
+    }
+
+    public String setClassifications(AtlasEntityHeaders entityHeaders) throws AtlasServiceException {
+        return callAPI(API_V2.UPDATE_BULK_SET_CLASSIFICATIONS, String.class, entityHeaders);
+    }
+
     /* Discovery calls */
     public AtlasSearchResult dslSearch(final String query) throws AtlasServiceException {
         MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl();
@@ -480,6 +497,8 @@ public class AtlasClientV2 extends AtlasBaseClient {
         public static final API_V2 DELETE_RELATIONSHIP_BY_GUID = new API_V2(RELATIONSHIPS_URI + "guid/", HttpMethod.DELETE, Response.Status.NO_CONTENT);
         public static final API_V2 CREATE_RELATIONSHIP         = new API_V2(RELATIONSHIPS_URI , HttpMethod.POST, Response.Status.OK);
         public static final API_V2 UPDATE_RELATIONSHIP         = new API_V2(RELATIONSHIPS_URI , HttpMethod.PUT, Response.Status.OK);
+        public static final API_V2 GET_BULK_HEADERS = new API_V2(ENTITY_API + BULK_HEADERS, HttpMethod.GET, Response.Status.OK);
+        public static final API_V2 UPDATE_BULK_SET_CLASSIFICATIONS = new API_V2(ENTITY_API + AtlasClientV2.BULK_SET_CLASSIFICATIONS, HttpMethod.POST, Response.Status.OK);
 
         private API_V2(String path, String method, Response.Status status) {
             super(path, method, status);
diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeaders.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeaders.java
new file mode 100644
index 0000000..11c6fc7
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeaders.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.model.instance;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import java.util.Map;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown=true)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.PROPERTY)
+public class AtlasEntityHeaders {
+    Map<String, AtlasEntityHeader> guidHeaderMap;
+
+    public AtlasEntityHeaders() {
+    }
+
+    public AtlasEntityHeaders(Map<String, AtlasEntityHeader> guidEntityHeaderMap) {
+        guidHeaderMap = guidEntityHeaderMap;
+    }
+
+    public void setGuidHeaderMap(Map<String, AtlasEntityHeader> guidHeaderMap) {
+        this.guidHeaderMap = guidHeaderMap;
+    }
+
+    public Map<String, AtlasEntityHeader> getGuidHeaderMap() {
+        return guidHeaderMap;
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/CassandraBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/CassandraBasedAuditRepository.java
index eb78f8f..b8131bd 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/CassandraBasedAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/CassandraBasedAuditRepository.java
@@ -30,6 +30,7 @@ import org.apache.atlas.EntityAuditEvent;
 import org.apache.atlas.annotation.ConditionalOnAtlasProperty;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.audit.EntityAuditEventV2;
+import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,6 +40,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import javax.inject.Singleton;
 
@@ -188,6 +190,11 @@ public class CassandraBasedAuditRepository extends AbstractStorageBasedAuditRepo
   }
 
   @Override
+  public Set<String> getEntitiesWithTagChanges(long fromTimestamp, long toTimestamp) throws AtlasBaseException {
+    throw new NotImplementedException();
+  }
+
+  @Override
   public void start() throws AtlasException {
     initApplicationProperties();
     initializeSettings();
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
index aab2d5b..2a47e39 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
@@ -24,6 +24,7 @@ import org.apache.atlas.model.audit.EntityAuditEventV2;
 import org.apache.atlas.exception.AtlasBaseException;
 
 import java.util.List;
+import java.util.Set;
 
 /**
  * Interface for repository for storing entity audit events
@@ -77,6 +78,14 @@ public interface EntityAuditRepository {
      */
     List<EntityAuditEventV2> listEventsV2(String entityId, String startKey, short n) throws AtlasBaseException;
 
+    /***
+     * List events for given time range where classifications have been added, deleted or updated.
+     * @param fromTimestamp from timestamp
+     * @param toTimestamp to timestamp
+     * @return events that match the range
+     * @throws AtlasBaseException
+     */
+    Set<String> getEntitiesWithTagChanges(long fromTimestamp, long toTimestamp) throws AtlasBaseException;
 
     /**
      * List events for the given entity id in decreasing order of timestamp, from the given timestamp. Returns n results
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
index 6f4415f..5f01293 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
@@ -50,7 +50,10 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter;
 import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.regionserver.BloomType;
@@ -64,9 +67,10 @@ import javax.inject.Singleton;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 
 import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TAG_ADD;
 import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TAG_DELETE;
@@ -546,6 +550,52 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito
     }
 
     @Override
+    public Set<String> getEntitiesWithTagChanges(long fromTimestamp, long toTimestamp) throws AtlasBaseException {
+        final String classificationUpdatesAction = "CLASSIFICATION_";
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Listing events for fromTimestamp {}, toTimestamp {}, action {}", fromTimestamp, toTimestamp);
+        }
+
+        Table table = null;
+        ResultScanner scanner = null;
+
+        try {
+            Set<String> guids = new HashSet<>();
+
+            table = connection.getTable(tableName);
+
+            byte[] filterValue = Bytes.toBytes(classificationUpdatesAction);
+            BinaryPrefixComparator binaryPrefixComparator = new BinaryPrefixComparator(filterValue);
+            SingleColumnValueFilter filter = new SingleColumnValueFilter(COLUMN_FAMILY, COLUMN_ACTION, CompareFilter.CompareOp.EQUAL, binaryPrefixComparator);
+            Scan scan = new Scan().setFilter(filter).setTimeRange(fromTimestamp, toTimestamp);
+
+            Result result;
+            scanner = table.getScanner(scan);
+            while ((result = scanner.next()) != null) {
+                EntityAuditEvent event = fromKey(result.getRow());
+
+                if (event == null) {
+                    continue;
+                }
+
+                guids.add(event.getEntityId());
+            }
+
+            return guids;
+        } catch (IOException e) {
+            throw new AtlasBaseException(e);
+        } finally {
+            try {
+                close(scanner);
+                close(table);
+            } catch (AtlasException e) {
+                throw new AtlasBaseException(e);
+            }
+        }
+    }
+
+    @Override
     public void start() throws AtlasException {
         Configuration configuration = ApplicationProperties.get();
         startInternal(configuration, getHBaseConfiguration(configuration));
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
index dca3b85..ad6ec94 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
@@ -21,6 +21,7 @@ package org.apache.atlas.repository.audit;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.EntityAuditEvent;
 import org.apache.atlas.annotation.ConditionalOnAtlasProperty;
+import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.audit.EntityAuditEventV2;
 import org.apache.commons.collections.CollectionUtils;
 import org.springframework.stereotype.Component;
@@ -28,7 +29,9 @@ import org.springframework.stereotype.Component;
 import javax.inject.Singleton;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
@@ -119,6 +122,20 @@ public class InMemoryEntityAuditRepository implements EntityAuditRepository {
     }
 
     @Override
+    public Set<String> getEntitiesWithTagChanges(long fromTimestamp, long toTimestamp) throws AtlasBaseException {
+        Set<String> events = new HashSet<>();
+
+        for (EntityAuditEventV2 event : auditEventsV2.values()) {
+            long timestamp = event.getTimestamp();
+            if (timestamp > fromTimestamp && timestamp <= toTimestamp) {
+                events.add(event.getEntityId());
+            }
+        }
+
+        return events;
+    }
+
+    @Override
     public List<Object> listEvents(String entityId, String startKey, short maxResults) {
         List events = listEventsV2(entityId, startKey, maxResults);
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java
index e3a6078..4bb68d5 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java
@@ -20,12 +20,14 @@ package org.apache.atlas.repository.audit;
 
 import org.apache.atlas.EntityAuditEvent;
 import org.apache.atlas.annotation.ConditionalOnAtlasProperty;
+import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.audit.EntityAuditEventV2;
 import org.springframework.stereotype.Component;
 
 import javax.inject.Singleton;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 
 /**
  * Implementation that completely disables the audit repository.
@@ -66,6 +68,11 @@ public class NoopEntityAuditRepository implements EntityAuditRepository {
     }
 
     @Override
+    public Set<String> getEntitiesWithTagChanges(long fromTimestamp, long toTimestamp) throws AtlasBaseException {
+        return Collections.emptySet();
+    }
+
+    @Override
     public List<Object> listEvents(String entityId, String startKey, short n) {
         return Collections.emptyList();
     }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
index 750fa17..1da1138 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
@@ -24,6 +24,7 @@ import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.AtlasEntityHeaders;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.repository.store.graph.v2.EntityStream;
@@ -216,4 +217,6 @@ public interface AtlasEntityStore {
     List<AtlasClassification> getClassifications(String guid) throws AtlasBaseException;
 
     AtlasClassification getClassification(String guid, String classificationName) throws AtlasBaseException;
+
+    String setClassifications(AtlasEntityHeaders entityHeaders);
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index 35aa3af..e8e5400 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -657,6 +657,13 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
         return ret;
     }
 
+    @Override
+    @GraphTransaction
+    public String setClassifications(AtlasEntityHeaders entityHeaders) {
+        ClassificationAssociator.Updater associator = new ClassificationAssociator.Updater(typeRegistry, this);
+        return associator.setClassifications(entityHeaders.getGuidHeaderMap());
+    }
+
     private EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate, boolean replaceClassifications) throws AtlasBaseException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("==> createOrUpdate()");
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/ClassificationAssociator.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/ClassificationAssociator.java
new file mode 100644
index 0000000..11d008f
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/ClassificationAssociator.java
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.store.graph.v2;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.AtlasEntityHeaders;
+import org.apache.atlas.repository.audit.EntityAuditRepository;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Component
+public class ClassificationAssociator {
+    private static final Logger LOG = LoggerFactory.getLogger(ClassificationAssociator.class);
+
+    public static class Retriever {
+        private final EntityAuditRepository auditRepository;
+        private final EntityGraphRetriever entityRetriever;
+
+        public Retriever(AtlasTypeRegistry typeRegistry, EntityAuditRepository auditRepository) {
+            this.entityRetriever = new EntityGraphRetriever(typeRegistry);
+            this.auditRepository = auditRepository;
+        }
+
+        Retriever(EntityGraphRetriever entityGraphRetriever, EntityAuditRepository auditRepository) {
+            this.entityRetriever = entityGraphRetriever;
+            this.auditRepository = auditRepository;
+        }
+
+        public AtlasEntityHeaders get(long fromTimestamp, long toTimestamp) throws AtlasBaseException {
+            toTimestamp = incrementTimestamp(toTimestamp);
+            Set<String> guids = auditRepository.getEntitiesWithTagChanges(fromTimestamp, toTimestamp);
+
+            Map<String, AtlasEntityHeader> guidEntityHeaderMap = new HashMap<>();
+            for (String guid : guids) {
+                AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid);
+
+                guidEntityHeaderMap.put(guid, entityHeader);
+            }
+
+            guids.clear();
+            return new AtlasEntityHeaders(guidEntityHeaderMap);
+        }
+
+        private long incrementTimestamp(long t) {
+            return t + 1;
+        }
+    }
+
+    public static class Updater {
+        static final String ATTR_NAME_QUALIFIED_NAME = "qualifiedName";
+        static final String STATUS_DONE = "(Done)";
+        static final String STATUS_SKIPPED = "(Skipped)";
+        static final String STATUS_PARTIAL = "(Partial)";
+
+        private static final String PROCESS_FORMAT = "%s:%s:%s:%s -> %s:%s";
+        static final String PROCESS_ADD = "Add";
+        static final String PROCESS_UPDATE = "Update";
+        static final String PROCESS_DELETE = "Delete";
+        static final String JSONIFY_STRING_FORMAT = "\"%s\",";
+
+        private final AtlasTypeRegistry typeRegistry;
+        private final AtlasEntityStore entitiesStore;
+        private final EntityGraphRetriever entityRetriever;
+        private StringBuilder actionSummary = new StringBuilder();
+
+        public Updater(AtlasTypeRegistry typeRegistry, AtlasEntityStore entitiesStore) {
+            this.typeRegistry = typeRegistry;
+            this.entitiesStore = entitiesStore;
+            entityRetriever = new EntityGraphRetriever(typeRegistry);
+        }
+
+        public String setClassifications(Map<String, AtlasEntityHeader> map) {
+            for (AtlasEntityHeader incomingEntityHeader : map.values()) {
+                String typeName = incomingEntityHeader.getTypeName();
+                AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
+                if (entityType == null) {
+                    LOG.warn("Entity type: {}: Not found: {}!", typeName, STATUS_SKIPPED);
+                    summarizeFormat("%s: %s", typeName, STATUS_SKIPPED);
+                    continue;
+                }
+
+                String qualifiedName = getQualifiedName(incomingEntityHeader);
+                AtlasEntityHeader entityToBeChanged = getByUniqueAttributes(entityType, qualifiedName, incomingEntityHeader.getAttributes());
+                if (entityToBeChanged == null) {
+                    summarizeFormat("Entity:%s:%s:[Not found]:%s", entityType.getTypeName(), qualifiedName, STATUS_SKIPPED);
+                    continue;
+                }
+
+
+                String guid = entityToBeChanged.getGuid();
+                Map<String, List<AtlasClassification>> operationListMap = computeChanges(incomingEntityHeader, entityToBeChanged);
+                commitChanges(guid, typeName, qualifiedName, operationListMap);
+            }
+
+            return getJsonArray(actionSummary);
+        }
+
+        private void commitChanges(String entityGuid, String typeName, String qualifiedName,
+                                                                     Map<String, List<AtlasClassification>> operationListMap) {
+            if (MapUtils.isEmpty(operationListMap)) {
+                return;
+            }
+
+            deleteClassifications(entityGuid, typeName, qualifiedName, operationListMap.get(PROCESS_DELETE));
+            updateClassifications(entityGuid, typeName, qualifiedName, operationListMap.get(PROCESS_UPDATE));
+            addClassifications(entityGuid, typeName, qualifiedName, operationListMap.get(PROCESS_ADD));
+
+            operationListMap.clear();
+        }
+
+        private Map<String, List<AtlasClassification>> computeChanges(AtlasEntityHeader incomingEntityHeader, AtlasEntityHeader entityToBeUpdated) {
+            if (incomingEntityHeader == null || entityToBeUpdated == null) {
+                return null;
+            }
+
+            ListOps<AtlasClassification> listOps = new ListOps<>();
+            List<AtlasClassification> incomingClassifications = listOps.filter(incomingEntityHeader.getGuid(), incomingEntityHeader.getClassifications());
+            List<AtlasClassification> entityClassifications = listOps.filter(entityToBeUpdated.getGuid(), entityToBeUpdated.getClassifications());
+
+            if (CollectionUtils.isEmpty(incomingClassifications) && CollectionUtils.isEmpty(entityClassifications)) {
+                return null;
+            }
+
+            Map<String, List<AtlasClassification>> operationListMap = new HashMap<>();
+
+            bucket(PROCESS_DELETE, operationListMap, listOps.subtract(entityClassifications, incomingClassifications));
+            bucket(PROCESS_UPDATE, operationListMap, listOps.intersect(incomingClassifications, entityClassifications));
+            bucket(PROCESS_ADD, operationListMap, listOps.subtract(incomingClassifications, entityClassifications));
+
+            return operationListMap;
+        }
+
+        private void bucket(String op, Map<String, List<AtlasClassification>> operationListMap, List<AtlasClassification> results) {
+            if (CollectionUtils.isEmpty(results)) {
+                return;
+            }
+
+            operationListMap.put(op, results);
+        }
+
+        private void addClassifications(String entityToBeChangedGuid, String typeName, String qualifiedName, List<AtlasClassification> list) {
+            if (CollectionUtils.isEmpty(list)) {
+                return;
+            }
+
+            String status = STATUS_DONE;
+            String classificationNames = getClassificationNames(list);
+            try {
+                entitiesStore.addClassifications(entityToBeChangedGuid, list);
+            } catch (AtlasBaseException e) {
+                status = STATUS_PARTIAL;
+                LOG.warn("{}:{}:{} -> {}: {}.", PROCESS_UPDATE, typeName, qualifiedName, classificationNames, status);
+            }
+
+            summarize(PROCESS_ADD, entityToBeChangedGuid, typeName, qualifiedName, classificationNames, status);
+        }
+
+        private void updateClassifications(String entityToBeChangedGuid, String typeName, String qualifiedName, List<AtlasClassification> list) {
+            if (CollectionUtils.isEmpty(list)) {
+                return;
+            }
+
+            String status = STATUS_DONE;
+            String classificationNames = getClassificationNames(list);
+
+            try {
+                entitiesStore.updateClassifications(entityToBeChangedGuid, list);
+            } catch (AtlasBaseException e) {
+                status = STATUS_PARTIAL;
+                LOG.warn("{}:{}:{} -> {}: {}.", PROCESS_UPDATE, typeName, qualifiedName, classificationNames, status);
+            }
+
+            summarize(PROCESS_UPDATE, entityToBeChangedGuid, typeName, qualifiedName, classificationNames, status);
+        }
+
+        private void deleteClassifications(String typeName, String entityGuid, String qualifiedName, List<AtlasClassification> list) {
+            if (CollectionUtils.isEmpty(list)) {
+                return;
+            }
+
+            String status = STATUS_DONE;
+            String classificationTypeName = getClassificationNames(list);
+            for (AtlasClassification c : list) {
+                try {
+                    entitiesStore.deleteClassification(entityGuid, c.getTypeName());
+                } catch (AtlasBaseException e) {
+                    status = STATUS_PARTIAL;
+                    LOG.warn("{}:{}:{} -> {}: Skipped!", entityGuid, typeName, qualifiedName, c.getTypeName());
+                }
+            }
+
+            summarize(PROCESS_DELETE, entityGuid, typeName, qualifiedName, classificationTypeName, status);
+        }
+
+        AtlasEntityHeader getByUniqueAttributes(AtlasEntityType entityType, String qualifiedName, Map<String, Object> attrValues) {
+            try {
+                AtlasVertex vertex = AtlasGraphUtilsV2.findByUniqueAttributes(entityType, attrValues);
+                if (vertex == null) {
+                    return null;
+                }
+
+                return entityRetriever.toAtlasEntityHeaderWithClassifications(vertex);
+            } catch (AtlasBaseException e) {
+                LOG.warn("{}:{} could not be processed!", entityType, qualifiedName);
+                return null;
+            } catch (Exception ex) {
+                LOG.error("{}:{} could not be processed!", entityType, qualifiedName, ex);
+                return null;
+            }
+        }
+
+        private String getClassificationNames(List<AtlasClassification> list) {
+            return list.stream().map(AtlasClassification::getTypeName).collect(Collectors.joining(", "));
+        }
+
+        private String getQualifiedName(AtlasEntityHeader entityHeader) {
+            return (String) entityHeader.getAttribute(ATTR_NAME_QUALIFIED_NAME);
+        }
+
+        private void summarize(String... s) {
+            summarizeFormat(PROCESS_FORMAT, s);
+        }
+
+        private void summarizeFormat(String format, String... s) {
+            summarize(String.format(format, s));
+        }
+
+        private void summarize(String s) {
+            actionSummary.append(String.format(JSONIFY_STRING_FORMAT, s));
+        }
+
+        private String getJsonArray(StringBuilder actionSummary) {
+            return "[" + StringUtils.removeEnd(actionSummary.toString(), ",") + "]";
+        }
+    }
+
+    private static class ListOps<V extends AtlasClassification> {
+        public List<V> intersect(List<V> lhs, List<V> rhs) {
+            if (CollectionUtils.isEmpty(rhs)) {
+                return null;
+            }
+
+            List<V> result = new ArrayList<>();
+            for (V c : rhs) {
+                V found = findFrom(lhs, c);
+                if (found != null) {
+                    result.add(found);
+                }
+            }
+
+            return result;
+        }
+
+        public List<V> subtract(List<V> lhs, List<V> rhs) {
+            if (CollectionUtils.isEmpty(lhs)) {
+                return null;
+            }
+
+            List<V> result = new ArrayList<>();
+            for (V c : lhs) {
+                V found = findFrom(rhs, c);
+                if (found == null) {
+                    result.add(c);
+                }
+            }
+
+            return result;
+        }
+
+        private V findFrom(List<V> reference, V check) {
+            return (V) CollectionUtils.find(reference, ox ->
+                    ((V) ox).getTypeName().equals(check.getTypeName()));
+        }
+
+        public List<V> filter(String guid, List<V> list) {
+            if (CollectionUtils.isEmpty(list)) {
+                return list;
+            }
+
+            return list.stream().filter(x -> x.getEntityGuid().equals(guid)).collect(Collectors.toList());
+        }
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
index 066abc1..79216d5 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
@@ -95,7 +95,7 @@ import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelation
 import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
 
 @Component
-public final class EntityGraphRetriever {
+public class EntityGraphRetriever {
     private static final Logger LOG = LoggerFactory.getLogger(EntityGraphRetriever.class);
 
     private static final String TERM_RELATION_NAME = "AtlasGlossarySemanticAssignment";
diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/ClassificationAssociatorTest.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/ClassificationAssociatorTest.java
new file mode 100644
index 0000000..ab5bb2b
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/ClassificationAssociatorTest.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.store.graph.v2;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.AtlasEntityHeaders;
+import org.apache.atlas.repository.audit.EntityAuditRepository;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.utils.AtlasJson;
+import org.apache.atlas.utils.TestResourceFileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.atlas.repository.store.graph.v2.ClassificationAssociator.Updater.PROCESS_ADD;
+import static org.apache.atlas.repository.store.graph.v2.ClassificationAssociator.Updater.PROCESS_DELETE;
+import static org.apache.atlas.repository.store.graph.v2.ClassificationAssociator.Updater.PROCESS_UPDATE;
+import static org.apache.atlas.repository.store.graph.v2.ClassificationAssociator.Updater.STATUS_DONE;
+import static org.apache.atlas.repository.store.graph.v2.ClassificationAssociator.Updater.STATUS_SKIPPED;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.FileAssert.fail;
+
+public class ClassificationAssociatorTest {
+    private static final String TABLE_GUID = "df122fc3-5555-40f8-a30f-3090b8a622f8";
+    private static String TEST_FILES_SUBDIR = "classification-association";
+    private static String MESSAGE_SEPARATOR = ":";
+    private static String ENTITY_NAME_SEPARATOR = "->";
+
+    private class ClassificationAssociatorUpdaterForSpy extends ClassificationAssociator.Updater {
+        private final String entityFileName;
+
+        public ClassificationAssociatorUpdaterForSpy(AtlasTypeRegistry typeRegistry, AtlasEntityStore entitiesStore) {
+            super(typeRegistry, entitiesStore);
+            this.entityFileName = StringUtils.EMPTY;
+        }
+
+        public ClassificationAssociatorUpdaterForSpy(AtlasTypeRegistry typeRegistry, AtlasEntityStore entitiesStore, String entityFileName) {
+            super(typeRegistry, entitiesStore);
+            this.entityFileName = entityFileName;
+        }
+
+        @Override
+        AtlasEntityHeader getByUniqueAttributes(AtlasEntityType entityType, String qualifiedName, Map<String, Object> attrValues) {
+            try {
+                if (StringUtils.isEmpty(entityFileName)) {
+                    return null;
+                }
+
+                return getEntityHeaderFromFile(entityFileName);
+            } catch (IOException e) {
+                fail(entityFileName + " could not be loaded.");
+                return null;
+            }
+        }
+    }
+
+    @Test
+    public void auditScanYieldsNothing_EmptyHeadersReturned() {
+        AtlasEntityHeaders actualEntityHeaders = setupRetriever("header-empty", 0, 0, null);
+
+        assertNotNull(actualEntityHeaders);
+        assertEquals(actualEntityHeaders.getGuidHeaderMap().size(),0);
+    }
+
+    @Test
+    public void auditScanYieldsOneEntity_EntityHeadersHasOneElementWithClassification() {
+        AtlasEntityHeaders actualEntityHeaders = setupRetriever("header-Tx", 0, 0, TABLE_GUID);
+
+        assertNotNull(actualEntityHeaders);
+        assertEquals(actualEntityHeaders.getGuidHeaderMap().size(), 1);
+        assertTrue(actualEntityHeaders.getGuidHeaderMap().containsKey(TABLE_GUID));
+        assertEquals(actualEntityHeaders.getGuidHeaderMap().get(TABLE_GUID).getGuid(), TABLE_GUID);
+        assertNotNull(actualEntityHeaders.getGuidHeaderMap().get(TABLE_GUID).getClassifications());
+        assertEquals(actualEntityHeaders.getGuidHeaderMap().get(TABLE_GUID).getClassifications().size(), 1);
+    }
+
+    private AtlasEntityHeaders setupRetriever(String headersFile, int fromTimestamp, int toTimestamp, final String tableGuid) {
+        AtlasEntityHeader entityHeaderWithClassification = null;
+        try {
+            Set<String> guids = new HashSet<String>();
+            entityHeaderWithClassification = TestResourceFileUtils.readObjectFromJson(TEST_FILES_SUBDIR, headersFile, AtlasEntityHeader.class);
+            if (!StringUtils.isEmpty(tableGuid)) {
+                guids.add(tableGuid);
+            }
+
+            EntityAuditRepository auditRepository = mock(EntityAuditRepository.class);
+            when(auditRepository.getEntitiesWithTagChanges(anyLong(), anyLong())).thenReturn(guids);
+
+            EntityGraphRetriever entityGraphRetriever = mock(EntityGraphRetriever.class);
+            when(entityGraphRetriever.toAtlasEntityHeaderWithClassifications(TABLE_GUID)).thenReturn(entityHeaderWithClassification);
+
+            ClassificationAssociator.Retriever retriever = new ClassificationAssociator.Retriever(entityGraphRetriever, auditRepository);
+            return retriever.get(fromTimestamp, toTimestamp);
+        }
+        catch (Exception ex) {
+            fail("Exception!");
+            return null;
+        }
+    }
+
+    @Test
+    public void updaterIncorrectType_ReturnsError() throws IOException {
+        AtlasEntityHeaders entityHeaderMap = getEntityHeaderMapFromFile("header-PII");
+        AtlasEntityStore entitiesStore = mock(AtlasEntityStore.class);
+
+        AtlasTypeRegistry typeRegistry = mock(AtlasTypeRegistry.class);
+        when(typeRegistry.getEntityTypeByName(anyString())).thenReturn(null);
+
+        ClassificationAssociator.Updater updater = new ClassificationAssociator.Updater(typeRegistry, entitiesStore);
+        String summary = updater.setClassifications(entityHeaderMap.getGuidHeaderMap());
+
+        assertTrue(summary.contains("hive_"));
+        assertTrue(summary.contains(STATUS_SKIPPED));
+    }
+
+    @Test
+    public void updaterCorrectTypeEntityNotFound_Skipped() throws IOException {
+        AtlasEntityHeaders entityHeaderMap = getEntityHeaderMapFromFile("header-PII");
+        AtlasEntityType hiveTable = mock(AtlasEntityType.class);
+        AtlasEntityStore entitiesStore = mock(AtlasEntityStore.class);
+        AtlasTypeRegistry typeRegistry = mock(AtlasTypeRegistry.class);
+
+        when(typeRegistry.getEntityTypeByName(anyString())).thenReturn(hiveTable);
+        when(hiveTable.getTypeName()).thenReturn("hive_column");
+
+        ClassificationAssociatorUpdaterForSpy updater = new ClassificationAssociatorUpdaterForSpy(typeRegistry, entitiesStore);
+        String summary = updater.setClassifications(entityHeaderMap.getGuidHeaderMap());
+
+        TypeReference<String[]> typeReference = new TypeReference<String[]>() {};
+        String[] summaryArray = AtlasJson.fromJson(summary, typeReference);
+        assertEquals(summaryArray.length, 1);
+        assertSummaryElement(summaryArray[0], "Entity", STATUS_SKIPPED, "");
+    }
+
+    @Test
+    public void updaterTests() throws IOException {
+        updaterAssert("header-PII", "col-entity-None", PROCESS_ADD + ":PII");
+        updaterAssert("header-PII", "col-entity-PII", new String[]{PROCESS_UPDATE + ":PII"});
+        updaterAssert("header-None", "col-entity-PII", new String[]{PROCESS_DELETE + ":PII"});
+        updaterAssert("header-PII-VENDOR_PII", "col-entity-PII-FIN_PII",
+                PROCESS_DELETE + ":FIN_PII",
+                            PROCESS_UPDATE + ":PII",
+                            PROCESS_ADD + ":VENDOR_PII");
+        updaterAssert("header-None", "col-entity-None", new String[]{});
+        updaterAssert("header-FIN_PII", "col-entity-PII",
+                        PROCESS_DELETE + ":PII",
+                                    PROCESS_ADD + ":FIN_PII");
+    }
+
+    @Test
+    public void updater_filterPropagatedClassifications() throws IOException {
+        updaterAssert("header-Tx-prop-T1", "col-entity-T1-prop-Tn",
+                PROCESS_DELETE + ":T1",
+                            PROCESS_ADD + ":Tx");
+    }
+
+
+    private void assertSummaryElement(String summaryElement, String operation, String status, String classificationName) {
+        String[] splits = StringUtils.split(summaryElement, MESSAGE_SEPARATOR);
+        String[] nameSplits = StringUtils.split(splits[3], ENTITY_NAME_SEPARATOR);
+        if (nameSplits.length > 1) {
+            assertEquals(nameSplits[1].trim(), classificationName);
+        }
+
+        assertEquals(splits[0], operation);
+        assertEquals(splits[4], status);
+    }
+
+    private String[] setupUpdater(String entityHeaderFileName, String entityFileName, int expectedSummaryLength) throws IOException {
+        AtlasEntityHeaders entityHeaderMap = getEntityHeaderMapFromFile(entityHeaderFileName);
+
+        AtlasEntityType hiveTable = mock(AtlasEntityType.class);
+        AtlasEntityStore entitiesStore = mock(AtlasEntityStore.class);
+        AtlasTypeRegistry typeRegistry = mock(AtlasTypeRegistry.class);
+
+        when(typeRegistry.getEntityTypeByName(anyString())).thenReturn(hiveTable);
+        when(hiveTable.getTypeName()).thenReturn("hive_column");
+
+        ClassificationAssociatorUpdaterForSpy updater = new ClassificationAssociatorUpdaterForSpy(typeRegistry, entitiesStore, entityFileName);
+        String summary = updater.setClassifications(entityHeaderMap.getGuidHeaderMap());
+
+        TypeReference<String[]> typeReference = new TypeReference<String[]>() {};
+        String[] summaryArray = AtlasJson.fromJson(summary, typeReference);
+        assertEquals(summaryArray.length, expectedSummaryLength);
+
+        return summaryArray;
+    }
+
+    private AtlasEntityHeader getEntityHeaderFromFile(String entityJson) throws IOException {
+        return TestResourceFileUtils.readObjectFromJson(TEST_FILES_SUBDIR, entityJson, AtlasEntityHeader.class);
+    }
+
+    private AtlasEntityHeaders getEntityHeaderMapFromFile(String filename) throws IOException {
+        return TestResourceFileUtils.readObjectFromJson(TEST_FILES_SUBDIR, filename, AtlasEntityHeaders.class);
+    }
+
+    private void updaterAssert(String incoming, String entity, String... opNamePair) throws IOException {
+        String[] summary = setupUpdater(incoming, entity, opNamePair.length);
+
+        for (int i = 0; i < opNamePair.length; i++) {
+            String s = opNamePair[i];
+            String[] splits = StringUtils.split(s, ":");
+            assertSummaryElement(summary[i], splits[0], STATUS_DONE, splits[1]);
+        }
+    }
+}
diff --git a/repository/src/test/resources/json/classification-association/col-entity-None.json b/repository/src/test/resources/json/classification-association/col-entity-None.json
new file mode 100644
index 0000000..6c04a3d
--- /dev/null
+++ b/repository/src/test/resources/json/classification-association/col-entity-None.json
@@ -0,0 +1,10 @@
+{
+  "typeName": "hive_column",
+  "attributes": {
+    "owner": "hive",
+    "qualifiedName": "hortoniabank.us_customers.nationalid@cl1",
+    "name": "nationalid"
+  },
+  "guid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+  "status": "ACTIVE"
+}
diff --git a/repository/src/test/resources/json/classification-association/col-entity-PII-FIN_PII.json b/repository/src/test/resources/json/classification-association/col-entity-PII-FIN_PII.json
new file mode 100644
index 0000000..283d863
--- /dev/null
+++ b/repository/src/test/resources/json/classification-association/col-entity-PII-FIN_PII.json
@@ -0,0 +1,32 @@
+{
+  "typeName": "hive_column",
+  "attributes": {
+    "owner": "hive",
+    "qualifiedName": "hortoniabank.us_customers.nationalid@cl1",
+    "name": "nationalid"
+  },
+  "guid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+  "status": "ACTIVE",
+  "classifications": [
+    {
+      "typeName": "PII",
+      "attributes": {
+        "type": "ssn"
+      },
+      "entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+      "entityStatus": "ACTIVE",
+      "propagate": true,
+      "removePropagationsOnEntityDelete": false
+    },
+    {
+      "typeName": "FIN_PII",
+      "attributes": {
+        "type": "ssn"
+      },
+      "entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+      "entityStatus": "ACTIVE",
+      "propagate": true,
+      "removePropagationsOnEntityDelete": false
+    }
+  ]
+}
diff --git a/repository/src/test/resources/json/classification-association/col-entity-PII.json b/repository/src/test/resources/json/classification-association/col-entity-PII.json
new file mode 100644
index 0000000..af34e7d
--- /dev/null
+++ b/repository/src/test/resources/json/classification-association/col-entity-PII.json
@@ -0,0 +1,22 @@
+{
+  "typeName": "hive_column",
+  "attributes": {
+    "owner": "hive",
+    "qualifiedName": "hortoniabank.us_customers.nationalid@cl1",
+    "name": "nationalid"
+  },
+  "guid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+  "status": "ACTIVE",
+  "classifications": [
+    {
+      "typeName": "PII",
+      "attributes": {
+        "type": "ssn"
+      },
+      "entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+      "entityStatus": "ACTIVE",
+      "propagate": true,
+      "removePropagationsOnEntityDelete": false
+    }
+  ]
+}
diff --git a/repository/src/test/resources/json/classification-association/col-entity-T1-prop-Tn.json b/repository/src/test/resources/json/classification-association/col-entity-T1-prop-Tn.json
new file mode 100644
index 0000000..4f9cbd2
--- /dev/null
+++ b/repository/src/test/resources/json/classification-association/col-entity-T1-prop-Tn.json
@@ -0,0 +1,34 @@
+{
+  "typeName": "hive_column",
+  "attributes": {
+    "owner": "hive",
+    "createTime": 1547071410000,
+    "qualifiedName": "stocks.daily@cl1",
+    "name": "daily"
+  },
+  "guid": "df122fc3-5555-40f8-a30f-3090b8a622f8",
+  "status": "ACTIVE",
+  "displayText": "daily",
+  "classifications": [
+    {
+      "typeName": "T1",
+      "attributes": {},
+      "entityGuid": "df122fc3-5555-40f8-a30f-3090b8a622f8",
+      "entityStatus": "ACTIVE",
+      "propagate": false,
+      "validityPeriods": [],
+      "removePropagationsOnEntityDelete": false
+    },
+    {
+      "typeName": "Tn",
+      "attributes": {},
+      "entityGuid": "22222222-5555-40f8-a30f-3090b8a622f8",
+      "entityStatus": "ACTIVE",
+      "propagate": false,
+      "validityPeriods": [],
+      "removePropagationsOnEntityDelete": false
+    }
+  ],
+  "meaningNames": [],
+  "meanings": []
+}
diff --git a/repository/src/test/resources/json/classification-association/header-FIN_PII.json b/repository/src/test/resources/json/classification-association/header-FIN_PII.json
new file mode 100644
index 0000000..35bb6db
--- /dev/null
+++ b/repository/src/test/resources/json/classification-association/header-FIN_PII.json
@@ -0,0 +1,32 @@
+{
+  "guidHeaderMap": {
+    "0ce68113-77fe-4ed1-9585-69371202bd74": {
+      "typeName": "hive_column",
+      "attributes": {
+        "owner": "hive",
+        "qualifiedName": "hortoniabank.us_customers.nationalid@cl1",
+        "name": "nationalid"
+      },
+      "guid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+      "status": "ACTIVE",
+      "displayText": "nationalid",
+      "classificationNames": [
+        "FIN_PII"
+      ],
+      "classifications": [
+        {
+          "typeName": "FIN_PII",
+          "attributes": {
+            "type": "ssn"
+          },
+          "entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+          "entityStatus": "ACTIVE",
+          "propagate": true,
+          "removePropagationsOnEntityDelete": false
+        }
+      ],
+      "meaningNames": [],
+      "meanings": []
+    }
+  }
+}
diff --git a/repository/src/test/resources/json/classification-association/header-None.json b/repository/src/test/resources/json/classification-association/header-None.json
new file mode 100644
index 0000000..a858990
--- /dev/null
+++ b/repository/src/test/resources/json/classification-association/header-None.json
@@ -0,0 +1,21 @@
+{
+  "guidHeaderMap": {
+    "0ce68113-77fe-4ed1-9585-69371202bd74": {
+      "typeName": "hive_column",
+      "attributes": {
+        "owner": "hive",
+        "qualifiedName": "hortoniabank.us_customers.nationalid@cl1",
+        "name": "nationalid"
+      },
+      "guid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+      "status": "ACTIVE",
+      "displayText": "nationalid",
+      "classificationNames": [
+      ],
+      "classifications": [
+      ],
+      "meaningNames": [],
+      "meanings": []
+    }
+  }
+}
diff --git a/repository/src/test/resources/json/classification-association/header-PII-VENDOR_PII.json b/repository/src/test/resources/json/classification-association/header-PII-VENDOR_PII.json
new file mode 100644
index 0000000..58638f7
--- /dev/null
+++ b/repository/src/test/resources/json/classification-association/header-PII-VENDOR_PII.json
@@ -0,0 +1,42 @@
+{
+  "guidHeaderMap": {
+    "0ce68113-77fe-4ed1-9585-69371202bd74": {
+      "typeName": "hive_column",
+      "attributes": {
+        "owner": "hive",
+        "qualifiedName": "hortoniabank.us_customers.nationalid@cl1",
+        "name": "nationalid"
+      },
+      "guid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+      "status": "ACTIVE",
+      "displayText": "nationalid",
+      "classificationNames": [
+        "PII"
+      ],
+      "classifications": [
+        {
+          "typeName": "PII",
+          "attributes": {
+            "type": "ssn"
+          },
+          "entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+          "entityStatus": "ACTIVE",
+          "propagate": true,
+          "removePropagationsOnEntityDelete": false
+        },
+        {
+          "typeName": "VENDOR_PII",
+          "attributes": {
+            "type": "ssn"
+          },
+          "entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+          "entityStatus": "ACTIVE",
+          "propagate": true,
+          "removePropagationsOnEntityDelete": false
+        }
+      ],
+      "meaningNames": [],
+      "meanings": []
+    }
+  }
+}
diff --git a/repository/src/test/resources/json/classification-association/header-PII.json b/repository/src/test/resources/json/classification-association/header-PII.json
new file mode 100644
index 0000000..bfc6d2e
--- /dev/null
+++ b/repository/src/test/resources/json/classification-association/header-PII.json
@@ -0,0 +1,32 @@
+{
+  "guidHeaderMap": {
+    "0ce68113-77fe-4ed1-9585-69371202bd74": {
+      "typeName": "hive_column",
+      "attributes": {
+        "owner": "hive",
+        "qualifiedName": "hortoniabank.us_customers.nationalid@cl1",
+        "name": "nationalid"
+      },
+      "guid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+      "status": "ACTIVE",
+      "displayText": "nationalid",
+      "classificationNames": [
+        "PII"
+      ],
+      "classifications": [
+        {
+          "typeName": "PII",
+          "attributes": {
+            "type": "ssn"
+          },
+          "entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+          "entityStatus": "ACTIVE",
+          "propagate": true,
+          "removePropagationsOnEntityDelete": false
+        }
+      ],
+      "meaningNames": [],
+      "meanings": []
+    }
+  }
+}
diff --git a/repository/src/test/resources/json/classification-association/header-Tx-prop-T1.json b/repository/src/test/resources/json/classification-association/header-Tx-prop-T1.json
new file mode 100644
index 0000000..8f2f26f
--- /dev/null
+++ b/repository/src/test/resources/json/classification-association/header-Tx-prop-T1.json
@@ -0,0 +1,42 @@
+{
+  "guidHeaderMap": {
+    "0ce68113-77fe-4ed1-9585-69371202bd74": {
+      "typeName": "hive_column",
+      "attributes": {
+        "owner": "hive",
+        "qualifiedName": "hortoniabank.us_customers.nationalid@cl1",
+        "name": "nationalid"
+      },
+      "guid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+      "status": "ACTIVE",
+      "displayText": "nationalid",
+      "classificationNames": [
+        "T1", "Tx"
+      ],
+      "classifications": [
+        {
+          "typeName": "Tx",
+          "attributes": {
+            "type": "ssn"
+          },
+          "entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+          "entityStatus": "ACTIVE",
+          "propagate": true,
+          "removePropagationsOnEntityDelete": false
+        },
+        {
+          "typeName": "T1",
+          "attributes": {
+            "type": "ssn"
+          },
+          "entityGuid": "22222222-77fe-4ed1-9585-69371202bd74",
+          "entityStatus": "ACTIVE",
+          "propagate": true,
+          "removePropagationsOnEntityDelete": false
+        }
+      ],
+      "meaningNames": [],
+      "meanings": []
+    }
+  }
+}
diff --git a/repository/src/test/resources/json/classification-association/header-Tx.json b/repository/src/test/resources/json/classification-association/header-Tx.json
new file mode 100644
index 0000000..bab1eaa
--- /dev/null
+++ b/repository/src/test/resources/json/classification-association/header-Tx.json
@@ -0,0 +1,26 @@
+{
+  "typeName": "hive_table",
+  "attributes": {
+    "owner": "hive",
+    "createTime": 1547071410000,
+    "qualifiedName": "stocks.daily@cl1",
+    "name": "daily"
+  },
+  "guid": "df122fc3-5555-40f8-a30f-3090b8a622f8",
+  "status": "ACTIVE",
+  "displayText": "daily",
+  "classificationNames": [],
+  "classifications": [
+    {
+      "typeName": "Tx",
+      "attributes": {},
+      "entityGuid": "df122fc3-5555-40f8-a30f-3090b8a622f8",
+      "entityStatus": "ACTIVE",
+      "propagate": false,
+      "validityPeriods": [],
+      "removePropagationsOnEntityDelete": false
+    }
+  ],
+  "meaningNames": [],
+  "meanings": []
+}
\ No newline at end of file
diff --git a/repository/src/test/resources/json/classification-association/header-empty.json b/repository/src/test/resources/json/classification-association/header-empty.json
new file mode 100644
index 0000000..9c8f417
--- /dev/null
+++ b/repository/src/test/resources/json/classification-association/header-empty.json
@@ -0,0 +1,3 @@
+{
+  "guidHeaderMap": {}
+}
\ No newline at end of file
diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
index 68c132c..713338e 100644
--- a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
+++ b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
@@ -26,10 +26,12 @@ import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.AtlasEntityHeaders;
 import org.apache.atlas.model.instance.ClassificationAssociateRequest;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
 import org.apache.atlas.repository.audit.EntityAuditRepository;
+import org.apache.atlas.repository.store.graph.v2.ClassificationAssociator;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
@@ -49,7 +51,16 @@ import org.springframework.stereotype.Service;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.*;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import java.util.ArrayList;
@@ -77,7 +88,6 @@ public class EntityREST {
     private final EntityAuditRepository  auditRepository;
     private final AtlasInstanceConverter instanceConverter;
 
-
     @Inject
     public EntityREST(AtlasTypeRegistry typeRegistry, AtlasEntityStore entitiesStore,
                       EntityAuditRepository auditRepository, AtlasInstanceConverter instanceConverter) {
@@ -601,7 +611,7 @@ public class EntityREST {
     }
 
     /**
-     * Bulk API to create new entities or update existing entities in Atlas.
+     * Bulk API to create new entities or updates existing entities in Atlas.
      * Existing entity is matched using its unique guid if supplied or by its unique attributes eg: qualifiedName
      */
     @POST
@@ -708,6 +718,49 @@ public class EntityREST {
         }
     }
 
+    @GET
+    @Path("bulk/headers")
+    @Produces(Servlets.JSON_MEDIA_TYPE)
+    public AtlasEntityHeaders getEntityHeaders(@QueryParam("tagUpdateStartTime") long tagUpdateStartTime) throws AtlasBaseException {
+        AtlasPerfTracer perf = null;
+
+        try {
+            long  tagUpdateEndTime = System.currentTimeMillis();
+
+            if (tagUpdateStartTime > tagUpdateEndTime) {
+                throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "fromTimestamp should be less than toTimestamp");
+            }
+
+            if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+                perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityREST.getEntityHeaders(" + tagUpdateStartTime + ", " + tagUpdateEndTime + ")");
+            }
+
+            ClassificationAssociator.Retriever associator = new ClassificationAssociator.Retriever(typeRegistry, auditRepository);
+            return associator.get(tagUpdateStartTime, tagUpdateEndTime);
+        } finally {
+            AtlasPerfTracer.log(perf);
+        }
+    }
+
+    @POST
+    @Path("bulk/setClassifications")
+    @Produces(Servlets.JSON_MEDIA_TYPE)
+    @Consumes(Servlets.JSON_MEDIA_TYPE)
+    public String setClassifications(AtlasEntityHeaders entityHeaders) throws AtlasBaseException {
+        AtlasPerfTracer perf = null;
+
+        try {
+            if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+                perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityREST.setClassifications()");
+            }
+
+            ClassificationAssociator.Updater associator = new ClassificationAssociator.Updater(typeRegistry, entitiesStore);
+            return associator.setClassifications(entityHeaders.getGuidHeaderMap());
+        } finally {
+            AtlasPerfTracer.log(perf);
+        }
+    }
+
     private AtlasEntityType ensureEntityType(String typeName) throws AtlasBaseException {
         AtlasEntityType ret = typeRegistry.getEntityTypeByName(typeName);