You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/11/07 22:39:23 UTC

[1/4] atlas git commit: ATLAS-2251: notification module updates (#4)

Repository: atlas
Updated Branches:
  refs/heads/ATLAS-2251 84f1349df -> f01e46d73


http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
index e63099d..ab27612 100755
--- a/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
@@ -19,8 +19,6 @@
 package org.apache.atlas.web.integration;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasClientV2;
@@ -43,7 +41,7 @@ import org.apache.atlas.v1.model.typedef.*;
 import org.apache.atlas.v1.model.typedef.EnumTypeDefinition.EnumValue;
 import org.apache.atlas.notification.NotificationConsumer;
 import org.apache.atlas.kafka.*;
-import org.apache.atlas.v1.model.notification.EntityNotification;
+import org.apache.atlas.v1.model.notification.EntityNotificationV1;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeUtil;
 import org.apache.atlas.v1.typesystem.types.utils.TypesUtil;
@@ -75,31 +73,30 @@ import static org.testng.Assert.assertTrue;
  * Sets up the web resource and has helper methods to created type and entity.
  */
 public abstract class BaseResourceIT {
+    public static final Logger LOG = LoggerFactory.getLogger(BaseResourceIT.class);
 
     public static final String ATLAS_REST_ADDRESS = "atlas.rest.address";
-    public static final String NAME = "name";
-    public static final String QUALIFIED_NAME = "qualifiedName";
-    public static final String CLUSTER_NAME = "clusterName";
-    public static final String DESCRIPTION = "description";
-    public static final String PII_TAG = "pii_Tag";
-    public static final String PHI_TAG = "phi_Tag";
-    public static final String PCI_TAG = "pci_Tag";
-    public static final String SOX_TAG = "sox_Tag";
-    public static final String SEC_TAG = "sec_Tag";
-    public static final String FINANCE_TAG = "finance_Tag";
-    public static final String CLASSIFICATION = "classification";
+    public static final String NAME               = "name";
+    public static final String QUALIFIED_NAME     = "qualifiedName";
+    public static final String CLUSTER_NAME       = "clusterName";
+    public static final String DESCRIPTION        = "description";
+    public static final String PII_TAG            = "pii_Tag";
+    public static final String PHI_TAG            = "phi_Tag";
+    public static final String PCI_TAG            = "pci_Tag";
+    public static final String SOX_TAG            = "sox_Tag";
+    public static final String SEC_TAG            = "sec_Tag";
+    public static final String FINANCE_TAG        = "finance_Tag";
+    public static final String CLASSIFICATION     = "classification";
+
+    protected static final int MAX_WAIT_TIME = 60000;
 
     // All service clients
-    protected AtlasClient atlasClientV1;
+    protected AtlasClient   atlasClientV1;
     protected AtlasClientV2 atlasClientV2;
-
-    public static final Logger LOG = LoggerFactory.getLogger(BaseResourceIT.class);
-    protected static final int MAX_WAIT_TIME = 60000;
-    protected String[] atlasUrls;
+    protected String[]      atlasUrls;
 
     @BeforeClass
     public void setUp() throws Exception {
-
         //set high timeouts so that tests do not fail due to read timeouts while you
         //are stepping through the code in a debugger
         ApplicationProperties.get().setProperty("atlas.client.readTimeoutMSecs", "100000000");
@@ -107,6 +104,7 @@ public abstract class BaseResourceIT {
 
 
         Configuration configuration = ApplicationProperties.get();
+
         atlasUrls = configuration.getStringArray(ATLAS_REST_ADDRESS);
 
         if (atlasUrls == null || atlasUrls.length == 0) {
@@ -124,6 +122,7 @@ public abstract class BaseResourceIT {
 
     protected void batchCreateTypes(AtlasTypesDef typesDef) throws AtlasServiceException {
         AtlasTypesDef toCreate = new AtlasTypesDef();
+
         for (AtlasEnumDef enumDef : typesDef.getEnumDefs()) {
             if (atlasClientV2.typeWithNameExists(enumDef.getName())) {
                 LOG.warn("Type with name {} already exists. Skipping", enumDef.getName());
@@ -165,10 +164,10 @@ public abstract class BaseResourceIT {
     }
 
     protected List<String> createType(TypesDef typesDef) throws Exception {
-        List<EnumTypeDefinition> enumTypes = new ArrayList<>();
+        List<EnumTypeDefinition>   enumTypes   = new ArrayList<>();
         List<StructTypeDefinition> structTypes = new ArrayList<>();
-        List<TraitTypeDefinition> traitTypes = new ArrayList<>();
-        List<ClassTypeDefinition> classTypes = new ArrayList<>();
+        List<TraitTypeDefinition>  traitTypes  = new ArrayList<>();
+        List<ClassTypeDefinition>  classTypes  = new ArrayList<>();
 
         for (EnumTypeDefinition enumTypeDefinition : typesDef.getEnumTypes()) {
             if (atlasClientV2.typeWithNameExists(enumTypeDefinition.getName())) {
@@ -177,6 +176,7 @@ public abstract class BaseResourceIT {
                 enumTypes.add(enumTypeDefinition);
             }
         }
+
         for (StructTypeDefinition structTypeDefinition : typesDef.getStructTypes()) {
             if (atlasClientV2.typeWithNameExists(structTypeDefinition.getTypeName())) {
                 LOG.warn("Type with name {} already exists. Skipping", structTypeDefinition.getTypeName());
@@ -184,6 +184,7 @@ public abstract class BaseResourceIT {
                 structTypes.add(structTypeDefinition);
             }
         }
+
         for (TraitTypeDefinition hierarchicalTypeDefinition : typesDef.getTraitTypes()) {
             if (atlasClientV2.typeWithNameExists(hierarchicalTypeDefinition.getTypeName())) {
                 LOG.warn("Type with name {} already exists. Skipping", hierarchicalTypeDefinition.getTypeName());
@@ -191,6 +192,7 @@ public abstract class BaseResourceIT {
                 traitTypes.add(hierarchicalTypeDefinition);
             }
         }
+
         for (ClassTypeDefinition hierarchicalTypeDefinition : typesDef.getClassTypes()) {
             if (atlasClientV2.typeWithNameExists(hierarchicalTypeDefinition.getTypeName())) {
                 LOG.warn("Type with name {} already exists. Skipping", hierarchicalTypeDefinition.getTypeName());
@@ -200,6 +202,7 @@ public abstract class BaseResourceIT {
         }
 
         TypesDef toCreate = new TypesDef(enumTypes, structTypes, traitTypes, classTypes);
+
         return atlasClientV1.createType(toCreate);
     }
 
@@ -209,56 +212,59 @@ public abstract class BaseResourceIT {
 
     protected Id createInstance(Referenceable referenceable) throws Exception {
         String typeName = referenceable.getTypeName();
+
         System.out.println("creating instance of type " + typeName);
 
         List<String> guids = atlasClientV1.createEntity(referenceable);
+
         System.out.println("created instance for type " + typeName + ", guid: " + guids);
 
         // return the reference to created instance with guid
         if (guids.size() > 0) {
             return new Id(guids.get(guids.size() - 1), 0, referenceable.getTypeName());
         }
+
         return null;
     }
 
-    protected TypesDef getTypesDef(ImmutableList<EnumTypeDefinition> enums,
-                                   ImmutableList<StructTypeDefinition> structs,
-                                   ImmutableList<TraitTypeDefinition> traits,
-                                   ImmutableList<ClassTypeDefinition> classes){
-        enums = (enums != null) ? enums : ImmutableList
-                .<EnumTypeDefinition>of();
-        structs =
-                (structs != null) ? structs : ImmutableList.<StructTypeDefinition>of();
-
-        traits = (traits != null) ? traits : ImmutableList
-                .<TraitTypeDefinition>of();
+    protected TypesDef getTypesDef(List<EnumTypeDefinition>   enums,
+                                   List<StructTypeDefinition> structs,
+                                   List<TraitTypeDefinition>  traits,
+                                   List<ClassTypeDefinition>  classes){
+        enums   = (enums != null) ? enums : Collections.<EnumTypeDefinition>emptyList();
+        structs = (structs != null) ? structs : Collections.<StructTypeDefinition>emptyList();
+        traits  = (traits != null) ? traits : Collections.<TraitTypeDefinition>emptyList();
+        classes = (classes != null) ? classes : Collections.<ClassTypeDefinition>emptyList();
 
-        classes = (classes != null) ? classes : ImmutableList
-                .<ClassTypeDefinition>of();
         return new TypesDef(enums, structs, traits, classes);
-
     }
 
     protected AtlasEntityHeader modifyEntity(AtlasEntity atlasEntity, boolean update) {
         EntityMutationResponse entity = null;
+
         try {
             if (!update) {
                 entity = atlasClientV2.createEntity(new AtlasEntityWithExtInfo(atlasEntity));
+
                 assertNotNull(entity);
                 assertNotNull(entity.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE));
                 assertTrue(entity.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE).size() > 0);
+
                 return entity.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE).get(0);
             } else {
                 entity = atlasClientV2.updateEntity(new AtlasEntityWithExtInfo(atlasEntity));
+
                 assertNotNull(entity);
                 assertNotNull(entity.getEntitiesByOperation(EntityMutations.EntityOperation.UPDATE));
                 assertTrue(entity.getEntitiesByOperation(EntityMutations.EntityOperation.UPDATE).size() > 0);
+
                 return entity.getEntitiesByOperation(EntityMutations.EntityOperation.UPDATE).get(0);
             }
 
         } catch (AtlasServiceException e) {
             LOG.error("Entity {} failed", update ? "update" : "creation", entity);
         }
+
         return null;
     }
 
@@ -270,19 +276,19 @@ public abstract class BaseResourceIT {
         return modifyEntity(atlasEntity, true);
     }
 
-    protected static final String DATABASE_TYPE_V2 = "hive_db_v2";
-    protected static final String HIVE_TABLE_TYPE_V2 = "hive_table_v2";
-    protected static final String COLUMN_TYPE_V2 = "hive_column_v2";
+    protected static final String DATABASE_TYPE_V2     = "hive_db_v2";
+    protected static final String HIVE_TABLE_TYPE_V2   = "hive_table_v2";
+    protected static final String COLUMN_TYPE_V2       = "hive_column_v2";
     protected static final String HIVE_PROCESS_TYPE_V2 = "hive_process_v2";
 
-    protected static final String DATABASE_TYPE = "hive_db_v1";
-    protected static final String HIVE_TABLE_TYPE = "hive_table_v1";
-    protected static final String COLUMN_TYPE = "hive_column_v1";
+    protected static final String DATABASE_TYPE     = "hive_db_v1";
+    protected static final String HIVE_TABLE_TYPE   = "hive_table_v1";
+    protected static final String COLUMN_TYPE       = "hive_column_v1";
     protected static final String HIVE_PROCESS_TYPE = "hive_process_v1";
 
-    protected static final String DATABASE_TYPE_BUILTIN = "hive_db";
-    protected static final String HIVE_TABLE_TYPE_BUILTIN = "hive_table";
-    protected static final String COLUMN_TYPE_BUILTIN = "hive_column";
+    protected static final String DATABASE_TYPE_BUILTIN     = "hive_db";
+    protected static final String HIVE_TABLE_TYPE_BUILTIN   = "hive_table";
+    protected static final String COLUMN_TYPE_BUILTIN       = "hive_column";
     protected static final String HIVE_PROCESS_TYPE_BUILTIN = "hive_process";
 
     protected void createTypeDefinitionsV1() throws Exception {
@@ -309,7 +315,7 @@ public abstract class BaseResourceIT {
         EnumTypeDefinition enumTypeDefinition = new EnumTypeDefinition("tableType", null, null, Arrays.asList(values));
 
         ClassTypeDefinition tblClsDef = TypesUtil
-                .createClassTypeDef(HIVE_TABLE_TYPE, null, ImmutableSet.of("DataSet"),
+                .createClassTypeDef(HIVE_TABLE_TYPE, null, Collections.singleton("DataSet"),
                         attrDef("owner", AtlasBaseTypeDef.ATLAS_TYPE_STRING), attrDef("createTime", AtlasBaseTypeDef.ATLAS_TYPE_LONG),
                         attrDef("lastAccessTime", AtlasBaseTypeDef.ATLAS_TYPE_DATE),
                         attrDef("temporary", AtlasBaseTypeDef.ATLAS_TYPE_BOOLEAN),
@@ -321,7 +327,7 @@ public abstract class BaseResourceIT {
                         new AttributeDefinition("serde2", "serdeType", Multiplicity.OPTIONAL, false, null));
 
         ClassTypeDefinition loadProcessClsDef = TypesUtil
-                .createClassTypeDef(HIVE_PROCESS_TYPE, null, ImmutableSet.of("Process"),
+                .createClassTypeDef(HIVE_PROCESS_TYPE, null, Collections.singleton("Process"),
                         attrDef("userName", AtlasBaseTypeDef.ATLAS_TYPE_STRING), attrDef("startTime", AtlasBaseTypeDef.ATLAS_TYPE_INT),
                         attrDef("endTime", AtlasBaseTypeDef.ATLAS_TYPE_LONG),
                         attrDef("queryText", AtlasBaseTypeDef.ATLAS_TYPE_STRING, Multiplicity.REQUIRED),
@@ -330,41 +336,29 @@ public abstract class BaseResourceIT {
                         attrDef("queryGraph", AtlasBaseTypeDef.ATLAS_TYPE_STRING, Multiplicity.REQUIRED));
 
         TraitTypeDefinition classificationTrait = TypesUtil
-                .createTraitTypeDef("classification", null, ImmutableSet.<String>of(),
+                .createTraitTypeDef("classification", null, Collections.<String>emptySet(),
                         TypesUtil.createRequiredAttrDef("tag", AtlasBaseTypeDef.ATLAS_TYPE_STRING));
-        TraitTypeDefinition piiTrait =
-                TypesUtil.createTraitTypeDef(PII_TAG, null, ImmutableSet.<String>of());
-        TraitTypeDefinition phiTrait =
-                TypesUtil.createTraitTypeDef(PHI_TAG, null, ImmutableSet.<String>of());
-        TraitTypeDefinition pciTrait =
-                TypesUtil.createTraitTypeDef(PCI_TAG, null, ImmutableSet.<String>of());
-        TraitTypeDefinition soxTrait =
-                TypesUtil.createTraitTypeDef(SOX_TAG, null, ImmutableSet.<String>of());
-        TraitTypeDefinition secTrait =
-                TypesUtil.createTraitTypeDef(SEC_TAG, null, ImmutableSet.<String>of());
-        TraitTypeDefinition financeTrait =
-                TypesUtil.createTraitTypeDef(FINANCE_TAG, null, ImmutableSet.<String>of());
-        TraitTypeDefinition factTrait =
-                TypesUtil.createTraitTypeDef("Fact" + randomString(), null, ImmutableSet.<String>of());
-        TraitTypeDefinition etlTrait =
-                TypesUtil.createTraitTypeDef("ETL" + randomString(), null, ImmutableSet.<String>of());
-        TraitTypeDefinition dimensionTrait =
-                TypesUtil.createTraitTypeDef("Dimension" + randomString(), null, ImmutableSet.<String>of());
-        TraitTypeDefinition metricTrait =
-                TypesUtil.createTraitTypeDef("Metric" + randomString(), null, ImmutableSet.<String>of());
-
-        createType(getTypesDef(ImmutableList.of(enumTypeDefinition), ImmutableList.of(structTypeDefinition),
-                ImmutableList.of(classificationTrait, piiTrait, phiTrait, pciTrait,
-                        soxTrait, secTrait, financeTrait, factTrait, etlTrait, dimensionTrait, metricTrait),
-                ImmutableList.of(dbClsDef, columnClsDef, tblClsDef, loadProcessClsDef)));
+
+        TraitTypeDefinition piiTrait       = TypesUtil.createTraitTypeDef(PII_TAG, null, Collections.<String>emptySet());
+        TraitTypeDefinition phiTrait       = TypesUtil.createTraitTypeDef(PHI_TAG, null, Collections.<String>emptySet());
+        TraitTypeDefinition pciTrait       = TypesUtil.createTraitTypeDef(PCI_TAG, null, Collections.<String>emptySet());
+        TraitTypeDefinition soxTrait       = TypesUtil.createTraitTypeDef(SOX_TAG, null, Collections.<String>emptySet());
+        TraitTypeDefinition secTrait       = TypesUtil.createTraitTypeDef(SEC_TAG, null, Collections.<String>emptySet());
+        TraitTypeDefinition financeTrait   = TypesUtil.createTraitTypeDef(FINANCE_TAG, null, Collections.<String>emptySet());
+        TraitTypeDefinition factTrait      = TypesUtil.createTraitTypeDef("Fact" + randomString(), null, Collections.<String>emptySet());
+        TraitTypeDefinition etlTrait       = TypesUtil.createTraitTypeDef("ETL" + randomString(), null, Collections.<String>emptySet());
+        TraitTypeDefinition dimensionTrait = TypesUtil.createTraitTypeDef("Dimension" + randomString(), null, Collections.<String>emptySet());
+        TraitTypeDefinition metricTrait    = TypesUtil.createTraitTypeDef("Metric" + randomString(), null, Collections.<String>emptySet());
+
+        createType(getTypesDef(Collections.singletonList(enumTypeDefinition),
+                               Collections.singletonList(structTypeDefinition),
+                               Arrays.asList(classificationTrait, piiTrait, phiTrait, pciTrait, soxTrait, secTrait, financeTrait, factTrait, etlTrait, dimensionTrait, metricTrait),
+                               Arrays.asList(dbClsDef, columnClsDef, tblClsDef, loadProcessClsDef)));
     }
 
     protected void createTypeDefinitionsV2() throws Exception {
-
         AtlasConstraintDef isCompositeSourceConstraint = new AtlasConstraintDef(CONSTRAINT_TYPE_OWNED_REF);
-
-        AtlasConstraintDef isCompositeTargetConstraint = new AtlasConstraintDef(CONSTRAINT_TYPE_INVERSE_REF,
-                Collections.<String, Object>singletonMap(CONSTRAINT_PARAM_ATTRIBUTE, "randomTable"));
+        AtlasConstraintDef isCompositeTargetConstraint = new AtlasConstraintDef(CONSTRAINT_TYPE_INVERSE_REF, Collections.<String, Object>singletonMap(CONSTRAINT_PARAM_ATTRIBUTE, "randomTable"));
 
         AtlasEntityDef dbClsTypeDef = AtlasTypeUtil.createClassTypeDef(
                 DATABASE_TYPE_V2,
@@ -401,7 +395,7 @@ public abstract class BaseResourceIT {
 
         AtlasEntityDef tblClsDef = AtlasTypeUtil
                 .createClassTypeDef(HIVE_TABLE_TYPE_V2,
-                        ImmutableSet.of("DataSet"),
+                        Collections.singleton("DataSet"),
                         AtlasTypeUtil.createOptionalAttrDef("owner", "string"),
                         AtlasTypeUtil.createOptionalAttrDef("createTime", "long"),
                         AtlasTypeUtil.createOptionalAttrDef("lastAccessTime", "date"),
@@ -420,7 +414,7 @@ public abstract class BaseResourceIT {
 
         AtlasEntityDef loadProcessClsDef = AtlasTypeUtil
                 .createClassTypeDef(HIVE_PROCESS_TYPE_V2,
-                        ImmutableSet.of("Process"),
+                        Collections.singleton("Process"),
                         AtlasTypeUtil.createOptionalAttrDef("userName", "string"),
                         AtlasTypeUtil.createOptionalAttrDef("startTime", "int"),
                         AtlasTypeUtil.createOptionalAttrDef("endTime", "long"),
@@ -430,25 +424,19 @@ public abstract class BaseResourceIT {
                         AtlasTypeUtil.createRequiredAttrDef("queryGraph", "string"));
 
         AtlasClassificationDef classificationTrait = AtlasTypeUtil
-                .createTraitTypeDef("classification",ImmutableSet.<String>of(),
+                .createTraitTypeDef("classification", Collections.<String>emptySet(),
                         AtlasTypeUtil.createRequiredAttrDef("tag", "string"));
-        AtlasClassificationDef piiTrait =
-                AtlasTypeUtil.createTraitTypeDef(PII_TAG, ImmutableSet.<String>of());
-        AtlasClassificationDef phiTrait =
-                AtlasTypeUtil.createTraitTypeDef(PHI_TAG, ImmutableSet.<String>of());
-        AtlasClassificationDef pciTrait =
-                AtlasTypeUtil.createTraitTypeDef(PCI_TAG, ImmutableSet.<String>of());
-        AtlasClassificationDef soxTrait =
-                AtlasTypeUtil.createTraitTypeDef(SOX_TAG, ImmutableSet.<String>of());
-        AtlasClassificationDef secTrait =
-                AtlasTypeUtil.createTraitTypeDef(SEC_TAG, ImmutableSet.<String>of());
-        AtlasClassificationDef financeTrait =
-                AtlasTypeUtil.createTraitTypeDef(FINANCE_TAG, ImmutableSet.<String>of());
-
-        AtlasTypesDef typesDef = new AtlasTypesDef(ImmutableList.of(enumDef),
-                ImmutableList.of(structTypeDef),
-                ImmutableList.of(classificationTrait, piiTrait, phiTrait, pciTrait, soxTrait, secTrait, financeTrait),
-                ImmutableList.of(dbClsTypeDef, columnClsDef, tblClsDef, loadProcessClsDef));
+        AtlasClassificationDef piiTrait     = AtlasTypeUtil.createTraitTypeDef(PII_TAG, Collections.<String>emptySet());
+        AtlasClassificationDef phiTrait     = AtlasTypeUtil.createTraitTypeDef(PHI_TAG, Collections.<String>emptySet());
+        AtlasClassificationDef pciTrait     = AtlasTypeUtil.createTraitTypeDef(PCI_TAG, Collections.<String>emptySet());
+        AtlasClassificationDef soxTrait     = AtlasTypeUtil.createTraitTypeDef(SOX_TAG, Collections.<String>emptySet());
+        AtlasClassificationDef secTrait     = AtlasTypeUtil.createTraitTypeDef(SEC_TAG, Collections.<String>emptySet());
+        AtlasClassificationDef financeTrait = AtlasTypeUtil.createTraitTypeDef(FINANCE_TAG, Collections.<String>emptySet());
+
+        AtlasTypesDef typesDef = new AtlasTypesDef(Collections.singletonList(enumDef),
+                Collections.singletonList(structTypeDef),
+                Arrays.asList(classificationTrait, piiTrait, phiTrait, pciTrait, soxTrait, secTrait, financeTrait),
+                Arrays.asList(dbClsTypeDef, columnClsDef, tblClsDef, loadProcessClsDef));
 
         batchCreateTypes(typesDef);
     }
@@ -461,10 +449,10 @@ public abstract class BaseResourceIT {
         return attrDef(name, dT, m, false, null);
     }
 
-    AttributeDefinition attrDef(String name, String dT, Multiplicity m, boolean isComposite,
-                                String reverseAttributeName) {
+    AttributeDefinition attrDef(String name, String dT, Multiplicity m, boolean isComposite, String reverseAttributeName) {
         Preconditions.checkNotNull(name);
         Preconditions.checkNotNull(dT);
+
         return new AttributeDefinition(name, dT, m, isComposite, reverseAttributeName);
     }
 
@@ -482,9 +470,9 @@ public abstract class BaseResourceIT {
         values.put(CLUSTER_NAME, "cl1");
         values.put("parameters", Collections.EMPTY_MAP);
         values.put("location", "/tmp");
+
         Referenceable databaseInstance = new Referenceable(dbId._getId(), dbId.getTypeName(), values);
-        Referenceable tableInstance =
-                new Referenceable(HIVE_TABLE_TYPE_BUILTIN, CLASSIFICATION, PII_TAG, PHI_TAG, PCI_TAG, SOX_TAG, SEC_TAG, FINANCE_TAG);
+        Referenceable tableInstance    = new Referenceable(HIVE_TABLE_TYPE_BUILTIN, CLASSIFICATION, PII_TAG, PHI_TAG, PCI_TAG, SOX_TAG, SEC_TAG, FINANCE_TAG);
         tableInstance.set(NAME, tableName);
         tableInstance.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableName);
         tableInstance.set("db", databaseInstance);
@@ -556,30 +544,36 @@ public abstract class BaseResourceIT {
     }
     protected Referenceable createHiveDBInstanceBuiltIn(String dbName) {
         Referenceable databaseInstance = new Referenceable(DATABASE_TYPE_BUILTIN);
+
         databaseInstance.set(NAME, dbName);
         databaseInstance.set(QUALIFIED_NAME, dbName);
         databaseInstance.set(CLUSTER_NAME, randomString());
         databaseInstance.set(DESCRIPTION, "foo database");
+
         return databaseInstance;
     }
 
 
     protected Referenceable createHiveDBInstanceV1(String dbName) {
         Referenceable databaseInstance = new Referenceable(DATABASE_TYPE);
+
         databaseInstance.set(NAME, dbName);
         databaseInstance.set(DESCRIPTION, "foo database");
         databaseInstance.set(CLUSTER_NAME, "fooCluster");
+
         return databaseInstance;
     }
 
     protected AtlasEntity createHiveDBInstanceV2(String dbName) {
         AtlasEntity atlasEntity = new AtlasEntity(DATABASE_TYPE_V2);
+
         atlasEntity.setAttribute(NAME, dbName);
         atlasEntity.setAttribute(DESCRIPTION, "foo database");
         atlasEntity.setAttribute(CLUSTER_NAME, "fooCluster");
         atlasEntity.setAttribute("owner", "user1");
         atlasEntity.setAttribute("locationUri", "/tmp");
         atlasEntity.setAttribute("createTime",1000);
+
         return atlasEntity;
     }
 
@@ -603,7 +597,7 @@ public abstract class BaseResourceIT {
          * @return the boolean result of the evaluation.
          * @throws Exception thrown if the predicate evaluation could not evaluate.
          */
-        boolean evaluate(EntityNotification notification) throws Exception;
+        boolean evaluate(EntityNotificationV1 notification) throws Exception;
     }
 
     /**
@@ -614,54 +608,62 @@ public abstract class BaseResourceIT {
      */
     protected void waitFor(int timeout, Predicate predicate) throws Exception {
         ParamChecker.notNull(predicate, "predicate");
-        long mustEnd = System.currentTimeMillis() + timeout;
 
+        long    mustEnd = System.currentTimeMillis() + timeout;
         boolean eval;
+
         while (!(eval = predicate.evaluate()) && System.currentTimeMillis() < mustEnd) {
             LOG.info("Waiting up to {} msec", mustEnd - System.currentTimeMillis());
+
             Thread.sleep(100);
         }
+
         if (!eval) {
             throw new Exception("Waiting timed out after " + timeout + " msec");
         }
     }
 
-    protected EntityNotification waitForNotification(final NotificationConsumer<EntityNotification> consumer, int maxWait,
-                                                     final NotificationPredicate predicate) throws Exception {
-        final TypesUtil.Pair<EntityNotification, String> pair = TypesUtil.Pair.of(null, null);
-        final long maxCurrentTime = System.currentTimeMillis() + maxWait;
+    protected EntityNotificationV1 waitForNotification(final NotificationConsumer<EntityNotificationV1> consumer, int maxWait,
+                                                       final NotificationPredicate predicate) throws Exception {
+        final TypesUtil.Pair<EntityNotificationV1, String> pair           = TypesUtil.Pair.of(null, null);
+        final long                                         maxCurrentTime = System.currentTimeMillis() + maxWait;
+
         waitFor(maxWait, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
                 try {
-
                     while (System.currentTimeMillis() < maxCurrentTime) {
-                        List<AtlasKafkaMessage<EntityNotification>> messageList = consumer.receive();
-                            if(messageList.size() > 0) {
-                                EntityNotification notification = messageList.get(0).getMessage();
-                                if (predicate.evaluate(notification)) {
-                                    pair.left = notification;
-                                    return true;
-                                }
-                            }else{
-                                LOG.info( System.currentTimeMillis()+ " messageList no records" +maxCurrentTime );
+                        List<AtlasKafkaMessage<EntityNotificationV1>> messageList = consumer.receive();
+
+                        if(messageList.size() > 0) {
+                            EntityNotificationV1 notification = messageList.get(0).getMessage();
+
+                            if (predicate.evaluate(notification)) {
+                                pair.left = notification;
+
+                                return true;
                             }
+                        } else {
+                            LOG.info( System.currentTimeMillis()+ " messageList no records" +maxCurrentTime );
+                        }
                     }
                 } catch(Exception e) {
                     LOG.error(" waitForNotification", e);
                     //ignore
                 }
+
                 return false;
             }
         });
+
         return pair.left;
     }
 
-    protected NotificationPredicate newNotificationPredicate(final EntityNotification.OperationType operationType,
+    protected NotificationPredicate newNotificationPredicate(final EntityNotificationV1.OperationType operationType,
                                                              final String typeName, final String guid) {
         return new NotificationPredicate() {
             @Override
-            public boolean evaluate(EntityNotification notification) throws Exception {
+            public boolean evaluate(EntityNotificationV1 notification) throws Exception {
                 return notification != null &&
                         notification.getOperationType() == operationType &&
                         notification.getEntity().getTypeName().equals(typeName) &&


[4/4] atlas git commit: ATLAS-2251: notification module updates (#4)

Posted by ma...@apache.org.
ATLAS-2251: notification module updates (#4)


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/f01e46d7
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/f01e46d7
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/f01e46d7

Branch: refs/heads/ATLAS-2251
Commit: f01e46d7399cdf382a5b4dc9a5102b3a4f3f9d6a
Parents: 84f1349
Author: Madhan Neethiraj <ma...@apache.org>
Authored: Tue Nov 7 14:24:25 2017 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Tue Nov 7 14:24:25 2017 -0800

----------------------------------------------------------------------
 .../apache/atlas/falcon/hook/FalconHook.java    |   7 +-
 .../atlas/hbase/bridge/HBaseAtlasHook.java      |  45 +-
 .../hbase/model/HBaseOperationContext.java      |   8 +-
 .../org/apache/atlas/hive/hook/HiveHook.java    |  27 +-
 .../org/apache/atlas/sqoop/hook/SqoopHook.java  |   7 +-
 .../model/notification/EntityNotification.java  |  86 ++++
 .../model/notification/HookNotification.java    | 103 +++++
 .../java/org/apache/atlas/type/AtlasType.java   |  58 ++-
 .../org/apache/atlas/type/AtlasTypeUtil.java    |  18 +-
 .../model/instance/AtlasSystemAttributes.java   |   2 +-
 .../org/apache/atlas/v1/model/instance/Id.java  |   2 +-
 .../atlas/v1/model/instance/Referenceable.java  |   2 +-
 .../apache/atlas/v1/model/instance/Struct.java  |   2 +-
 .../model/notification/EntityNotification.java  | 231 ----------
 .../notification/EntityNotificationV1.java      | 231 ++++++++++
 .../v1/model/notification/HookNotification.java | 423 -------------------
 .../model/notification/HookNotificationV1.java  | 357 ++++++++++++++++
 .../apache/atlas/v1/model/typedef/TypesDef.java |   2 +-
 .../java/org/apache/atlas/hook/AtlasHook.java   |  18 +-
 .../notification/NotificationInterface.java     |  20 -
 .../entity/EntityMessageDeserializer.java       |   2 +-
 .../hook/HookMessageDeserializer.java           |  12 +-
 .../org/apache/atlas/hook/AtlasHookTest.java    |  53 +--
 .../apache/atlas/kafka/KafkaConsumerTest.java   | 119 ++----
 .../atlas/kafka/KafkaNotificationTest.java      |  34 +-
 .../notification/AbstractNotificationTest.java  |  59 ++-
 .../entity/EntityMessageDeserializerTest.java   |  72 ----
 .../EntityNotificationDeserializerTest.java     |  71 ++++
 .../entity/EntityNotificationTest.java          |  82 ++--
 .../hook/HookMessageDeserializerTest.java       | 171 --------
 .../hook/HookNotificationDeserializerTest.java  | 167 ++++++++
 .../notification/hook/HookNotificationTest.java |  25 +-
 .../NotificationEntityChangeListener.java       |  24 +-
 .../notification/NotificationHookConsumer.java  | 205 +++++----
 .../notification/EntityNotificationIT.java      |  96 +++--
 .../NotificationHookConsumerIT.java             |  80 ++--
 .../NotificationHookConsumerKafkaTest.java      |  66 +--
 .../NotificationHookConsumerTest.java           | 153 ++++---
 .../atlas/web/integration/BaseResourceIT.java   | 242 +++++------
 39 files changed, 1772 insertions(+), 1610 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
index 5912cb0..77177b4 100644
--- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
+++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
@@ -25,8 +25,9 @@ import org.apache.atlas.falcon.event.FalconEvent;
 import org.apache.atlas.falcon.publisher.FalconEventPublisher;
 import org.apache.atlas.hook.AtlasHook;
 import org.apache.atlas.kafka.NotificationProvider;
+import org.apache.atlas.model.notification.HookNotification;
 import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.v1.model.notification.HookNotification;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.feed.Feed;
@@ -151,14 +152,14 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher {
 
     private void fireAndForget(FalconEvent event) throws FalconException, URISyntaxException {
         LOG.info("Entered Atlas hook for Falcon hook operation {}", event.getOperation());
-        List<HookNotification.HookNotificationMessage> messages = new ArrayList<>();
+        List<HookNotification> messages = new ArrayList<>();
 
         Operation op = getOperation(event.getOperation());
         String user = getUser(event.getUser());
         LOG.info("fireAndForget user:{}", user);
         switch (op) {
         case ADD:
-            messages.add(new HookNotification.EntityCreateRequest(user, createEntities(event, user)));
+            messages.add(new EntityCreateRequest(user, createEntities(event, user)));
             break;
 
         }

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
----------------------------------------------------------------------
diff --git a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
index 6fcaf1b..03e340c 100644
--- a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
+++ b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
@@ -23,8 +23,11 @@ import org.apache.atlas.AtlasConstants;
 import org.apache.atlas.hbase.model.HBaseOperationContext;
 import org.apache.atlas.hbase.model.HBaseDataTypes;
 import org.apache.atlas.hook.AtlasHook;
+import org.apache.atlas.model.notification.HookNotification;
 import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.v1.model.notification.HookNotification;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityDeleteRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityUpdateRequest;
 import org.apache.commons.configuration.Configuration;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -219,13 +222,13 @@ public class HBaseAtlasHook extends AtlasHook {
             case CREATE_NAMESPACE:
                 LOG.info("Create NameSpace {}", nameSpaceRef.get(REFERENCEABLE_ATTRIBUTE_NAME));
 
-                hbaseOperationContext.addMessage(new HookNotification.EntityCreateRequest(hbaseOperationContext.getUser(), nameSpaceRef));
+                hbaseOperationContext.addMessage(new EntityCreateRequest(hbaseOperationContext.getUser(), nameSpaceRef));
                 break;
 
             case ALTER_NAMESPACE:
                 LOG.info("Modify NameSpace {}", nameSpaceRef.get(REFERENCEABLE_ATTRIBUTE_NAME));
 
-                hbaseOperationContext.addMessage(new HookNotification.EntityUpdateRequest(hbaseOperationContext.getUser(), nameSpaceRef));
+                hbaseOperationContext.addMessage(new EntityUpdateRequest(hbaseOperationContext.getUser(), nameSpaceRef));
                 break;
         }
     }
@@ -235,10 +238,10 @@ public class HBaseAtlasHook extends AtlasHook {
 
         LOG.info("Delete NameSpace {}", nameSpaceQualifiedName);
 
-        hbaseOperationContext.addMessage(new HookNotification.EntityDeleteRequest(hbaseOperationContext.getUser(),
-                                                                                  HBaseDataTypes.HBASE_NAMESPACE.getName(),
-                                                                                  REFERENCEABLE_ATTRIBUTE_NAME,
-                                                                                  nameSpaceQualifiedName));
+        hbaseOperationContext.addMessage(new EntityDeleteRequest(hbaseOperationContext.getUser(),
+                                                                 HBaseDataTypes.HBASE_NAMESPACE.getName(),
+                                                                 REFERENCEABLE_ATTRIBUTE_NAME,
+                                                                 nameSpaceQualifiedName));
     }
 
     private void createOrUpdateTableInstance(HBaseOperationContext hbaseOperationContext) {
@@ -252,13 +255,13 @@ public class HBaseAtlasHook extends AtlasHook {
             case CREATE_TABLE:
                 LOG.info("Create Table {}", tableRef.get(REFERENCEABLE_ATTRIBUTE_NAME));
 
-                hbaseOperationContext.addMessage(new HookNotification.EntityCreateRequest(hbaseOperationContext.getUser(), nameSpaceRef, tableRef));
+                hbaseOperationContext.addMessage(new EntityCreateRequest(hbaseOperationContext.getUser(), nameSpaceRef, tableRef));
                 break;
 
             case ALTER_TABLE:
                 LOG.info("Modify Table {}", tableRef.get(REFERENCEABLE_ATTRIBUTE_NAME));
 
-                hbaseOperationContext.addMessage(new HookNotification.EntityUpdateRequest(hbaseOperationContext.getUser(), nameSpaceRef, tableRef));
+                hbaseOperationContext.addMessage(new EntityUpdateRequest(hbaseOperationContext.getUser(), nameSpaceRef, tableRef));
                 break;
         }
     }
@@ -276,10 +279,10 @@ public class HBaseAtlasHook extends AtlasHook {
 
         LOG.info("Delete Table {}", tableQualifiedName);
 
-        hbaseOperationContext.addMessage(new HookNotification.EntityDeleteRequest(hbaseOperationContext.getUser(),
-                                                                                  HBaseDataTypes.HBASE_TABLE.getName(),
-                                                                                  REFERENCEABLE_ATTRIBUTE_NAME,
-                                                                                  tableQualifiedName));
+        hbaseOperationContext.addMessage(new EntityDeleteRequest(hbaseOperationContext.getUser(),
+                                                                 HBaseDataTypes.HBASE_TABLE.getName(),
+                                                                 REFERENCEABLE_ATTRIBUTE_NAME,
+                                                                 tableQualifiedName));
     }
 
     private void createOrUpdateColumnFamilyInstance(HBaseOperationContext hbaseOperationContext) {
@@ -291,13 +294,13 @@ public class HBaseAtlasHook extends AtlasHook {
             case CREATE_COLUMN_FAMILY:
                 LOG.info("Create ColumnFamily {}", columnFamilyRef.get(REFERENCEABLE_ATTRIBUTE_NAME));
 
-                hbaseOperationContext.addMessage(new HookNotification.EntityCreateRequest(hbaseOperationContext.getUser(), nameSpaceRef, tableRef, columnFamilyRef));
+                hbaseOperationContext.addMessage(new EntityCreateRequest(hbaseOperationContext.getUser(), nameSpaceRef, tableRef, columnFamilyRef));
                 break;
 
             case ALTER_COLUMN_FAMILY:
                 LOG.info("Alter ColumnFamily {}", columnFamilyRef.get(REFERENCEABLE_ATTRIBUTE_NAME));
 
-                hbaseOperationContext.addMessage(new HookNotification.EntityUpdateRequest(hbaseOperationContext.getUser(), nameSpaceRef, tableRef, columnFamilyRef));
+                hbaseOperationContext.addMessage(new EntityUpdateRequest(hbaseOperationContext.getUser(), nameSpaceRef, tableRef, columnFamilyRef));
                 break;
         }
     }
@@ -316,10 +319,10 @@ public class HBaseAtlasHook extends AtlasHook {
 
         LOG.info("Delete ColumnFamily {}", columnFamilyQualifiedName);
 
-        hbaseOperationContext.addMessage(new HookNotification.EntityDeleteRequest(hbaseOperationContext.getUser(),
-                                                                                  HBaseDataTypes.HBASE_COLUMN_FAMILY.getName(),
-                                                                                  REFERENCEABLE_ATTRIBUTE_NAME,
-                                                                                  columnFamilyQualifiedName));
+        hbaseOperationContext.addMessage(new EntityDeleteRequest(hbaseOperationContext.getUser(),
+                                                                 HBaseDataTypes.HBASE_COLUMN_FAMILY.getName(),
+                                                                 REFERENCEABLE_ATTRIBUTE_NAME,
+                                                                 columnFamilyQualifiedName));
     }
 
 
@@ -491,7 +494,7 @@ public class HBaseAtlasHook extends AtlasHook {
             LOG.debug("==> HBaseAtlasHook.notifyAsPrivilegedAction({})", hbaseOperationContext);
         }
 
-        final List<HookNotification.HookNotificationMessage> messages = hbaseOperationContext.getMessages();
+        final List<HookNotification> messages = hbaseOperationContext.getMessages();
 
 
         try {
@@ -534,7 +537,7 @@ public class HBaseAtlasHook extends AtlasHook {
      *
      * @param messages hook notification messages
      */
-    protected void notifyEntities(List<HookNotification.HookNotificationMessage> messages) {
+    protected void notifyEntities(List<HookNotification> messages) {
         final int maxRetries = atlasProperties.getInt(HOOK_NUM_RETRIES, 3);
         notifyEntities(messages, maxRetries);
     }

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/model/HBaseOperationContext.java
----------------------------------------------------------------------
diff --git a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/model/HBaseOperationContext.java b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/model/HBaseOperationContext.java
index 33858d4..bc8485b 100644
--- a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/model/HBaseOperationContext.java
+++ b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/model/HBaseOperationContext.java
@@ -19,7 +19,7 @@
 package org.apache.atlas.hbase.model;
 
 import org.apache.atlas.hbase.bridge.HBaseAtlasHook;
-import org.apache.atlas.v1.model.notification.HookNotification;
+import org.apache.atlas.model.notification.HookNotification;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -74,7 +74,7 @@ public class HBaseOperationContext {
         this(null, nameSpace, null, tableName, null, hColumnDescriptor, columnFamily, operation, ugi, user, owner, hbaseConf);
     }
 
-    private List<HookNotification.HookNotificationMessage> messages = new ArrayList<>();
+    private List<HookNotification> messages = new ArrayList<>();
 
     public UserGroupInformation getUgi() {
         return ugi;
@@ -120,7 +120,7 @@ public class HBaseOperationContext {
         return columnFamily;
     }
 
-    public void addMessage(HookNotification.HookNotificationMessage message) {
+    public void addMessage(HookNotification message) {
         messages.add(message);
     }
 
@@ -128,7 +128,7 @@ public class HBaseOperationContext {
         return owner;
     }
 
-    public List<HookNotification.HookNotificationMessage> getMessages() {
+    public List<HookNotification> getMessages() {
         return messages;
     }
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 5f8dcdb..57f5efb 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -28,8 +28,11 @@ import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
 import org.apache.atlas.hive.model.HiveDataTypes;
 import org.apache.atlas.hook.AtlasHook;
 import org.apache.atlas.hook.AtlasHookException;
+import org.apache.atlas.model.notification.HookNotification;
 import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.v1.model.notification.HookNotification;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityDeleteRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityPartialUpdateRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityUpdateRequest;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.fs.Path;
@@ -331,7 +334,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         final String tblQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(dgiBridge.getClusterName(), output.getTable());
         LOG.info("Deleting table {} ", tblQualifiedName);
         event.addMessage(
-            new HookNotification.EntityDeleteRequest(event.getUser(),
+            new EntityDeleteRequest(event.getUser(),
                 HiveDataTypes.HIVE_TABLE.getName(),
                 AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
                 tblQualifiedName));
@@ -350,7 +353,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
             } else if (Type.DATABASE.equals(output.getType())) {
                 final String dbQualifiedName = HiveMetaStoreBridge.getDBQualifiedName(dgiBridge.getClusterName(), output.getDatabase().getName());
                 event.addMessage(
-                    new HookNotification.EntityDeleteRequest(event.getUser(),
+                    new EntityDeleteRequest(event.getUser(),
                         HiveDataTypes.HIVE_DB.getName(),
                         AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
                         dbQualifiedName));
@@ -412,7 +415,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
                     Referenceable newColEntity = new Referenceable(HiveDataTypes.HIVE_COLUMN.getName());
                     newColEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, newColumnQFName);
 
-                    event.addMessage(new HookNotification.EntityPartialUpdateRequest(event.getUser(),
+                    event.addMessage(new EntityPartialUpdateRequest(event.getUser(),
                             HiveDataTypes.HIVE_COLUMN.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
                             oldColumnQFName, newColEntity));
                 }
@@ -481,7 +484,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         ArrayList<String> alias_list = new ArrayList<>();
         alias_list.add(oldTable.getTableName().toLowerCase());
         newEntity.set(HiveMetaStoreBridge.TABLE_ALIAS_LIST, alias_list);
-        event.addMessage(new HookNotification.EntityPartialUpdateRequest(event.getUser(),
+        event.addMessage(new EntityPartialUpdateRequest(event.getUser(),
             HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
             oldTableQFName, newEntity));
 
@@ -499,7 +502,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
             Referenceable newColEntity = new Referenceable(HiveDataTypes.HIVE_COLUMN.getName());
             ///Only QF Name changes
             newColEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, newColumnQFName);
-            event.addMessage(new HookNotification.EntityPartialUpdateRequest(event.getUser(),
+            event.addMessage(new EntityPartialUpdateRequest(event.getUser(),
                 HiveDataTypes.HIVE_COLUMN.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
                 oldColumnQFName, newColEntity));
             newColEntities.add(newColEntity);
@@ -518,7 +521,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
 
         final Referenceable newSDEntity = new Referenceable(HiveDataTypes.HIVE_STORAGEDESC.getName());
         newSDEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, newSDQFName);
-        event.addMessage(new HookNotification.EntityPartialUpdateRequest(event.getUser(),
+        event.addMessage(new EntityPartialUpdateRequest(event.getUser(),
             HiveDataTypes.HIVE_STORAGEDESC.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
             oldSDQFName, newSDEntity));
 
@@ -593,7 +596,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
             }
 
             if (!entities.isEmpty()) {
-                event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), entities));
+                event.addMessage(new EntityUpdateRequest(event.getUser(), entities));
             }
 
             return result;
@@ -719,7 +722,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
     private void addEntityUpdateNotificationMessagess(final HiveEventContext event, final Collection<Referenceable> entities) {
         // process each entity as separate message to avoid running into OOM errors
         for (Referenceable entity : entities) {
-            event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), entity));
+            event.addMessage(new EntityUpdateRequest(event.getUser(), entity));
         }
     }
 
@@ -1089,7 +1092,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
 
         public Map<String, List<ColumnLineageUtils.HiveColumnLineageInfo>> lineageInfo;
 
-        private List<HookNotification.HookNotificationMessage> messages = new ArrayList<>();
+        private List<HookNotification> messages = new ArrayList<>();
 
         public void setInputs(Set<ReadEntity> inputs) {
             this.inputs = inputs;
@@ -1172,11 +1175,11 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
             return queryStartTime;
         }
 
-        public void addMessage(HookNotification.HookNotificationMessage message) {
+        public void addMessage(HookNotification message) {
             messages.add(message);
         }
 
-        public List<HookNotification.HookNotificationMessage> getMessages() {
+        public List<HookNotification> getMessages() {
             return messages;
         }
     }

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
----------------------------------------------------------------------
diff --git a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
index aee24ab..5ded92c 100644
--- a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
+++ b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
@@ -26,8 +26,9 @@ import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
 import org.apache.atlas.hive.model.HiveDataTypes;
 import org.apache.atlas.hook.AtlasHook;
 import org.apache.atlas.hook.AtlasHookException;
+import org.apache.atlas.model.notification.HookNotification;
 import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.v1.model.notification.HookNotification;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
 import org.apache.atlas.sqoop.model.SqoopDataTypes;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang3.StringUtils;
@@ -185,8 +186,8 @@ public class SqoopHook extends SqoopJobDataPublisher {
             Referenceable procRef = createSqoopProcessInstance(dbStoreRef, hiveTableRef, data, clusterName);
 
             int maxRetries = atlasProperties.getInt(HOOK_NUM_RETRIES, 3);
-            HookNotification.HookNotificationMessage message =
-                    new HookNotification.EntityCreateRequest(AtlasHook.getUser(), dbStoreRef, dbRef, hiveTableRef, procRef);
+            HookNotification message =
+                    new EntityCreateRequest(AtlasHook.getUser(), dbStoreRef, dbRef, hiveTableRef, procRef);
             AtlasHook.notifyEntities(Arrays.asList(message), maxRetries);
         }
         catch(Exception e) {

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/intg/src/main/java/org/apache/atlas/model/notification/EntityNotification.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/notification/EntityNotification.java b/intg/src/main/java/org/apache/atlas/model/notification/EntityNotification.java
new file mode 100644
index 0000000..3d03457
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/model/notification/EntityNotification.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.model.notification;
+
+import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+
+import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE;
+import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+/**
+ * Base type of hook message.
+ */
+@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+@JsonSerialize(include=JsonSerialize.Inclusion.ALWAYS)
+@JsonIgnoreProperties(ignoreUnknown=true)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.PROPERTY)
+public class EntityNotification implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Type of the hook message.
+     */
+    public enum EntityNotificationType {
+        ENTITY_NOTIFICATION_V1
+    }
+
+    protected EntityNotificationType type;
+
+    public EntityNotification() {
+        this.type = EntityNotificationType.ENTITY_NOTIFICATION_V1;
+    }
+
+    public EntityNotification(EntityNotificationType type) {
+        this.type = type;
+    }
+
+    public EntityNotificationType getType() {
+        return type;
+    }
+
+    public void setType(EntityNotificationType type) {
+        this.type = type;
+    }
+
+    public void normalize() { }
+
+    @Override
+    public String toString() {
+        return toString(new StringBuilder()).toString();
+    }
+
+    public StringBuilder toString(StringBuilder sb) {
+        if (sb == null) {
+            sb = new StringBuilder();
+        }
+
+        sb.append("EntityNotification{");
+        sb.append("type=").append(type);
+        sb.append("}");
+
+        return sb;
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/intg/src/main/java/org/apache/atlas/model/notification/HookNotification.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/notification/HookNotification.java b/intg/src/main/java/org/apache/atlas/model/notification/HookNotification.java
new file mode 100644
index 0000000..ea77a20
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/model/notification/HookNotification.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.model.notification;
+
+import org.apache.commons.lang.StringUtils;
+import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+
+import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE;
+import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+/**
+ * Base type of hook message.
+ */
+@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+@JsonSerialize(include=JsonSerialize.Inclusion.ALWAYS)
+@JsonIgnoreProperties(ignoreUnknown=true)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.PROPERTY)
+public class HookNotification implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    public static final String UNKNOW_USER = "UNKNOWN";
+
+    /**
+     * Type of the hook message.
+     */
+    public enum HookNotificationType {
+        TYPE_CREATE, TYPE_UPDATE, ENTITY_CREATE, ENTITY_PARTIAL_UPDATE, ENTITY_FULL_UPDATE, ENTITY_DELETE
+    }
+
+    protected HookNotificationType type;
+    protected String               user;
+
+    public HookNotification() {
+    }
+
+    public HookNotification(HookNotificationType type, String user) {
+        this.type = type;
+        this.user = user;
+    }
+
+    public HookNotificationType getType() {
+        return type;
+    }
+
+    public void setType(HookNotificationType type) {
+        this.type = type;
+    }
+
+    public String getUser() {
+        if (StringUtils.isEmpty(user)) {
+            return UNKNOW_USER;
+        }
+
+        return user;
+    }
+
+    public void setUser(String user) {
+        this.user = user;
+    }
+
+    public void normalize() { }
+
+    @Override
+    public String toString() {
+        return toString(new StringBuilder()).toString();
+    }
+
+    public StringBuilder toString(StringBuilder sb) {
+        if (sb == null) {
+            sb = new StringBuilder();
+        }
+
+        sb.append("HookNotification{");
+        sb.append("type=").append(type);
+        sb.append(", user=").append(user);
+        sb.append("}");
+
+        return sb;
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/intg/src/main/java/org/apache/atlas/type/AtlasType.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasType.java b/intg/src/main/java/org/apache/atlas/type/AtlasType.java
index 63d2a9d..d2a8e1f 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasType.java
@@ -20,14 +20,17 @@ package org.apache.atlas.type;
 
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.TypeCategory;
+import org.apache.atlas.model.notification.EntityNotification;
+import org.apache.atlas.model.notification.EntityNotification.EntityNotificationType;
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.model.notification.HookNotification.HookNotificationType;
 import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
-import org.apache.atlas.v1.model.notification.HookNotification.EntityCreateRequest;
-import org.apache.atlas.v1.model.notification.HookNotification.EntityDeleteRequest;
-import org.apache.atlas.v1.model.notification.HookNotification.EntityPartialUpdateRequest;
-import org.apache.atlas.v1.model.notification.HookNotification.EntityUpdateRequest;
-import org.apache.atlas.v1.model.notification.HookNotification.HookNotificationMessage;
-import org.apache.atlas.v1.model.notification.HookNotification.HookNotificationType;
-import org.apache.atlas.v1.model.notification.HookNotification.TypeRequest;
+import org.apache.atlas.v1.model.notification.EntityNotificationV1;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityDeleteRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityPartialUpdateRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityUpdateRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.TypeRequest;
 import org.codehaus.jackson.*;
 import org.codehaus.jackson.map.*;
 import org.codehaus.jackson.map.module.SimpleModule;
@@ -57,7 +60,8 @@ public abstract class AtlasType {
 
         atlasSerDeModule.addSerializer(Date.class, new DateSerializer());
         atlasSerDeModule.addDeserializer(Date.class, new DateDeserializer());
-        atlasSerDeModule.addDeserializer(HookNotificationMessage.class, new HookMessageDeserializer());
+        atlasSerDeModule.addDeserializer(HookNotification.class, new HookNotificationDeserializer());
+        atlasSerDeModule.addDeserializer(EntityNotification.class, new EntityNotificationDeserializer());
 
         mapperV1.registerModule(atlasSerDeModule);
     }
@@ -206,15 +210,15 @@ public abstract class AtlasType {
         }
     }
 
-    static class HookMessageDeserializer extends JsonDeserializer<HookNotificationMessage> {
+    static class HookNotificationDeserializer extends JsonDeserializer<HookNotification> {
         @Override
-        public HookNotificationMessage deserialize(JsonParser parser, DeserializationContext context) throws IOException {
-            HookNotificationMessage ret              = null;
-            ObjectMapper            mapper           = (ObjectMapper) parser.getCodec();
-            ObjectNode              root             = (ObjectNode) mapper.readTree(parser);
-            JsonNode                typeNode         = root != null ? root.get("type") : null;
-            String                  strType          = typeNode != null ? typeNode.asText() : null;
-            HookNotificationType    notificationType = strType != null ? HookNotificationType.valueOf(strType) : null;
+        public HookNotification deserialize(JsonParser parser, DeserializationContext context) throws IOException {
+            HookNotification     ret              = null;
+            ObjectMapper         mapper           = (ObjectMapper) parser.getCodec();
+            ObjectNode           root             = (ObjectNode) mapper.readTree(parser);
+            JsonNode             typeNode         = root != null ? root.get("type") : null;
+            String               strType          = typeNode != null ? typeNode.asText() : null;
+            HookNotificationType notificationType = strType != null ? HookNotificationType.valueOf(strType) : null;
 
             if (notificationType != null) {
                 switch (notificationType) {
@@ -244,4 +248,26 @@ public abstract class AtlasType {
             return ret;
         }
     }
+
+    static class EntityNotificationDeserializer extends JsonDeserializer<EntityNotification> {
+        @Override
+        public EntityNotification deserialize(JsonParser parser, DeserializationContext context) throws IOException {
+            EntityNotification     ret              = null;
+            ObjectMapper           mapper           = (ObjectMapper) parser.getCodec();
+            ObjectNode             root             = (ObjectNode) mapper.readTree(parser);
+            JsonNode               typeNode         = root != null ? root.get("type") : null;
+            String                 strType          = typeNode != null ? typeNode.asText() : null;
+            EntityNotificationType notificationType = strType != null ? EntityNotificationType.valueOf(strType) : EntityNotificationType.ENTITY_NOTIFICATION_V1;
+
+            if (root != null) {
+                switch (notificationType) {
+                    case ENTITY_NOTIFICATION_V1:
+                        ret = mapper.readValue(root, EntityNotificationV1.class);
+                        break;
+                }
+            }
+
+            return ret;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java b/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
index 5f3cefd..0652855 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
@@ -17,7 +17,6 @@
  */
 package org.apache.atlas.type;
 
-import com.google.common.collect.ImmutableSet;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.AtlasObjectId;
@@ -208,19 +207,19 @@ public class AtlasTypeUtil {
         return new AtlasEnumDef(name, description, "1.0", Arrays.asList(enumValues));
     }
 
-    public static AtlasClassificationDef createTraitTypeDef(String name, ImmutableSet<String> superTypes, AtlasAttributeDef... attrDefs) {
+    public static AtlasClassificationDef createTraitTypeDef(String name, Set<String> superTypes, AtlasAttributeDef... attrDefs) {
         return createTraitTypeDef(name, null, superTypes, attrDefs);
     }
 
-    public static AtlasClassificationDef createTraitTypeDef(String name, String description, ImmutableSet<String> superTypes, AtlasAttributeDef... attrDefs) {
+    public static AtlasClassificationDef createTraitTypeDef(String name, String description, Set<String> superTypes, AtlasAttributeDef... attrDefs) {
         return createTraitTypeDef(name, description, "1.0", superTypes, attrDefs);
     }
 
-    public static AtlasClassificationDef createTraitTypeDef(String name, String description, String version, ImmutableSet<String> superTypes, AtlasAttributeDef... attrDefs) {
+    public static AtlasClassificationDef createTraitTypeDef(String name, String description, String version, Set<String> superTypes, AtlasAttributeDef... attrDefs) {
         return new AtlasClassificationDef(name, description, version, Arrays.asList(attrDefs), superTypes);
     }
 
-    public static AtlasClassificationDef createAtlasClassificationDef(String name, String description, String version, ImmutableSet<String> superTypes, ImmutableSet<String> entityTypes, AtlasAttributeDef... attrDefs) {
+    public static AtlasClassificationDef createAtlasClassificationDef(String name, String description, String version, Set<String> superTypes, Set<String> entityTypes, AtlasAttributeDef... attrDefs) {
         return new AtlasClassificationDef(name, description, version, Arrays.asList(attrDefs), superTypes, entityTypes, null);
     }
 
@@ -232,18 +231,15 @@ public class AtlasTypeUtil {
         return new AtlasStructDef(name, description, "1.0", Arrays.asList(attrDefs));
     }
 
-    public static AtlasEntityDef createClassTypeDef(String name,
-        ImmutableSet<String> superTypes, AtlasAttributeDef... attrDefs) {
+    public static AtlasEntityDef createClassTypeDef(String name, Set<String> superTypes, AtlasAttributeDef... attrDefs) {
         return createClassTypeDef(name, null, "1.0", superTypes, attrDefs);
     }
 
-    public static AtlasEntityDef createClassTypeDef(String name, String description,
-        ImmutableSet<String> superTypes, AtlasAttributeDef... attrDefs) {
+    public static AtlasEntityDef createClassTypeDef(String name, String description, Set<String> superTypes, AtlasAttributeDef... attrDefs) {
         return createClassTypeDef(name, description, "1.0", superTypes, attrDefs);
     }
 
-    public static AtlasEntityDef createClassTypeDef(String name, String description, String version,
-        ImmutableSet<String> superTypes, AtlasAttributeDef... attrDefs) {
+    public static AtlasEntityDef createClassTypeDef(String name, String description, String version, Set<String> superTypes, AtlasAttributeDef... attrDefs) {
         return new AtlasEntityDef(name, description, version, Arrays.asList(attrDefs), superTypes);
     }
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/intg/src/main/java/org/apache/atlas/v1/model/instance/AtlasSystemAttributes.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/v1/model/instance/AtlasSystemAttributes.java b/intg/src/main/java/org/apache/atlas/v1/model/instance/AtlasSystemAttributes.java
index 30d500d..0b74365 100644
--- a/intg/src/main/java/org/apache/atlas/v1/model/instance/AtlasSystemAttributes.java
+++ b/intg/src/main/java/org/apache/atlas/v1/model/instance/AtlasSystemAttributes.java
@@ -35,7 +35,7 @@ import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONL
 
 
 @JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include=JsonSerialize.Inclusion.ALWAYS)
 @JsonIgnoreProperties(ignoreUnknown=true)
 @XmlRootElement
 @XmlAccessorType(XmlAccessType.PROPERTY)

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/intg/src/main/java/org/apache/atlas/v1/model/instance/Id.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/v1/model/instance/Id.java b/intg/src/main/java/org/apache/atlas/v1/model/instance/Id.java
index 67e647d..f3087d1 100644
--- a/intg/src/main/java/org/apache/atlas/v1/model/instance/Id.java
+++ b/intg/src/main/java/org/apache/atlas/v1/model/instance/Id.java
@@ -41,7 +41,7 @@ import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONL
 
 
 @JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include=JsonSerialize.Inclusion.ALWAYS)
 @JsonIgnoreProperties(ignoreUnknown=true)
 @XmlRootElement
 @XmlAccessorType(XmlAccessType.PROPERTY)

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/intg/src/main/java/org/apache/atlas/v1/model/instance/Referenceable.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/v1/model/instance/Referenceable.java b/intg/src/main/java/org/apache/atlas/v1/model/instance/Referenceable.java
index 8fc0acb..f1d28d1 100644
--- a/intg/src/main/java/org/apache/atlas/v1/model/instance/Referenceable.java
+++ b/intg/src/main/java/org/apache/atlas/v1/model/instance/Referenceable.java
@@ -43,7 +43,7 @@ import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONL
 
 
 @JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include=JsonSerialize.Inclusion.ALWAYS)
 @JsonIgnoreProperties(ignoreUnknown=true)
 @XmlRootElement
 @XmlAccessorType(XmlAccessType.PROPERTY)

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/intg/src/main/java/org/apache/atlas/v1/model/instance/Struct.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/v1/model/instance/Struct.java b/intg/src/main/java/org/apache/atlas/v1/model/instance/Struct.java
index 5f61f6c..5aebd4b 100644
--- a/intg/src/main/java/org/apache/atlas/v1/model/instance/Struct.java
+++ b/intg/src/main/java/org/apache/atlas/v1/model/instance/Struct.java
@@ -37,7 +37,7 @@ import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONL
 
 
 @JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include=JsonSerialize.Inclusion.ALWAYS)
 @JsonIgnoreProperties(ignoreUnknown=true)
 @XmlRootElement
 @XmlAccessorType(XmlAccessType.PROPERTY)

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/intg/src/main/java/org/apache/atlas/v1/model/notification/EntityNotification.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/v1/model/notification/EntityNotification.java b/intg/src/main/java/org/apache/atlas/v1/model/notification/EntityNotification.java
deleted file mode 100644
index cb224af..0000000
--- a/intg/src/main/java/org/apache/atlas/v1/model/notification/EntityNotification.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.v1.model.notification;
-
-import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
-import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.v1.model.instance.Struct;
-import org.apache.atlas.type.AtlasClassificationType;
-import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
-import org.codehaus.jackson.annotate.JsonAutoDetect;
-import org.codehaus.jackson.annotate.JsonIgnore;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlRootElement;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-
-import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE;
-import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONLY;
-
-/**
- * Entity notification
- */
-@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
-@JsonIgnoreProperties(ignoreUnknown=true)
-@XmlRootElement
-@XmlAccessorType(XmlAccessType.PROPERTY)
-public class EntityNotification implements Serializable {
-    private static final long serialVersionUID = 1L;
-
-    public enum OperationType {
-        ENTITY_CREATE,
-        ENTITY_UPDATE,
-        ENTITY_DELETE,
-        TRAIT_ADD,
-        TRAIT_DELETE,
-        TRAIT_UPDATE
-    }
-
-    private Referenceable entity;
-    private OperationType operationType;
-    private List<Struct>  traits;
-
-
-    // ----- Constructors ------------------------------------------------------
-
-    /**
-     * No-arg constructor for serialization.
-     */
-    public EntityNotification() {
-    }
-
-    /**
-     * Construct an EntityNotification.
-     *
-     * @param entity            the entity subject of the notification
-     * @param operationType     the type of operation that caused the notification
-     * @param traits            the traits for the given entity
-     */
-    public EntityNotification(Referenceable entity, OperationType operationType, List<Struct> traits) {
-        this.entity        = entity;
-        this.operationType = operationType;
-        this.traits        = traits;
-    }
-
-    /**
-     * Construct an EntityNotification.
-     *
-     * @param entity         the entity subject of the notification
-     * @param operationType  the type of operation that caused the notification
-     * @param typeRegistry     the Atlas type system
-     */
-    public EntityNotification(Referenceable entity, OperationType operationType, AtlasTypeRegistry typeRegistry) {
-        this(entity, operationType, getAllTraits(entity, typeRegistry));
-    }
-
-    public Referenceable getEntity() {
-        return entity;
-    }
-
-    public void setEntity(Referenceable entity) {
-        this.entity = entity;
-    }
-
-    public OperationType getOperationType() {
-        return operationType;
-    }
-
-    public void setOperationType(OperationType operationType) {
-        this.operationType = operationType;
-    }
-
-    public List<Struct> getTraits() {
-        return traits;
-    }
-
-    public void setTraits(List<Struct> traits) {
-        this.traits = traits;
-    }
-
-    @JsonIgnore
-    public List<Struct> getAllTraits() {
-        return traits;
-    }
-
-    public void normalize() {
-        if (entity != null) {
-            entity.normailze();
-        }
-
-        if (traits != null) {
-            for (Struct trait : traits) {
-                if (trait != null) {
-                    trait.normailze();
-                }
-            }
-        }
-    }
-
-    // ----- Object overrides --------------------------------------------------
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        EntityNotification that = (EntityNotification) o;
-        return Objects.equals(entity, that.entity) &&
-                operationType == that.operationType &&
-                Objects.equals(traits, that.traits);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(entity, operationType, traits);
-    }
-
-    @Override
-    public String toString() {
-        return toString(new StringBuilder()).toString();
-    }
-
-    public StringBuilder toString(StringBuilder sb) {
-        if (sb == null) {
-            sb = new StringBuilder();
-        }
-
-        sb.append("EntityNotification{");
-        sb.append("entity=");
-        if (entity != null) {
-            entity.toString(sb);
-        } else {
-            sb.append(entity);
-        }
-        sb.append(", operationType=").append(operationType);
-        sb.append(", traits=[");
-        AtlasBaseTypeDef.dumpObjects(traits, sb);
-        sb.append("]");
-        sb.append("}");
-
-        return sb;
-    }
-
-
-    // ----- helper methods ----------------------------------------------------
-
-    private static List<Struct> getAllTraits(Referenceable entityDefinition, AtlasTypeRegistry typeRegistry) {
-        List<Struct> ret = new LinkedList<>();
-
-        for (String traitName : entityDefinition.getTraitNames()) {
-            Struct                  trait          = entityDefinition.getTrait(traitName);
-            AtlasClassificationType traitType      = typeRegistry.getClassificationTypeByName(traitName);
-            Set<String>             superTypeNames = traitType != null ? traitType.getAllSuperTypes() : null;
-
-            ret.add(trait);
-
-            if (CollectionUtils.isNotEmpty(superTypeNames)) {
-                for (String superTypeName : superTypeNames) {
-                    Struct superTypeTrait = new Struct(superTypeName);
-
-                    if (MapUtils.isNotEmpty(trait.getValues())) {
-                        AtlasClassificationType superType = typeRegistry.getClassificationTypeByName(superTypeName);
-
-                        if (superType != null && MapUtils.isNotEmpty(superType.getAllAttributes())) {
-                            Map<String, Object> superTypeTraitAttributes = new HashMap<>();
-
-                            for (Map.Entry<String, Object> attrEntry : trait.getValues().entrySet()) {
-                                String attrName = attrEntry.getKey();
-
-                                if (superType.getAllAttributes().containsKey(attrName)) {
-                                    superTypeTraitAttributes.put(attrName, attrEntry.getValue());
-                                }
-                            }
-
-                            superTypeTrait.setValues(superTypeTraitAttributes);
-                        }
-                    }
-
-                    ret.add(superTypeTrait);
-                }
-            }
-        }
-
-        return ret;
-    }
-}

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/intg/src/main/java/org/apache/atlas/v1/model/notification/EntityNotificationV1.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/v1/model/notification/EntityNotificationV1.java b/intg/src/main/java/org/apache/atlas/v1/model/notification/EntityNotificationV1.java
new file mode 100644
index 0000000..549dbe3
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/v1/model/notification/EntityNotificationV1.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.v1.model.notification;
+
+import org.apache.atlas.model.notification.EntityNotification;
+import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.instance.Struct;
+import org.apache.atlas.type.AtlasClassificationType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE;
+import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+/**
+ * Entity notification
+ */
+@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown=true)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.PROPERTY)
+public class EntityNotificationV1 extends EntityNotification implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    public enum OperationType {
+        ENTITY_CREATE,
+        ENTITY_UPDATE,
+        ENTITY_DELETE,
+        TRAIT_ADD,
+        TRAIT_DELETE,
+        TRAIT_UPDATE
+    }
+
+    private Referenceable entity;
+    private OperationType operationType;
+    private List<Struct>  traits;
+
+
+    // ----- Constructors ------------------------------------------------------
+
+    /**
+     * No-arg constructor for serialization.
+     */
+    public EntityNotificationV1() {
+    }
+
+    /**
+     * Construct an EntityNotificationV1.
+     *
+     * @param entity            the entity subject of the notification
+     * @param operationType     the type of operation that caused the notification
+     * @param traits            the traits for the given entity
+     */
+    public EntityNotificationV1(Referenceable entity, OperationType operationType, List<Struct> traits) {
+        this.entity        = entity;
+        this.operationType = operationType;
+        this.traits        = traits;
+    }
+
+    /**
+     * Construct an EntityNotificationV1.
+     *
+     * @param entity         the entity subject of the notification
+     * @param operationType  the type of operation that caused the notification
+     * @param typeRegistry     the Atlas type system
+     */
+    public EntityNotificationV1(Referenceable entity, OperationType operationType, AtlasTypeRegistry typeRegistry) {
+        this(entity, operationType, getAllTraits(entity, typeRegistry));
+    }
+
+    public Referenceable getEntity() {
+        return entity;
+    }
+
+    public void setEntity(Referenceable entity) {
+        this.entity = entity;
+    }
+
+    public OperationType getOperationType() {
+        return operationType;
+    }
+
+    public void setOperationType(OperationType operationType) {
+        this.operationType = operationType;
+    }
+
+    public List<Struct> getTraits() {
+        return traits;
+    }
+
+    public void setTraits(List<Struct> traits) {
+        this.traits = traits;
+    }
+
+    @JsonIgnore
+    public List<Struct> getAllTraits() {
+        return traits;
+    }
+
+    public void normalize() {
+        super.normalize();
+
+        if (entity != null) {
+            entity.normailze();
+        }
+
+        if (traits != null) {
+            for (Struct trait : traits) {
+                if (trait != null) {
+                    trait.normailze();
+                }
+            }
+        }
+    }
+
+    // ----- Object overrides --------------------------------------------------
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        EntityNotificationV1 that = (EntityNotificationV1) o;
+        return Objects.equals(entity, that.entity) &&
+                operationType == that.operationType &&
+                Objects.equals(traits, that.traits);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(entity, operationType, traits);
+    }
+
+    @Override
+    public StringBuilder toString(StringBuilder sb) {
+        if (sb == null) {
+            sb = new StringBuilder();
+        }
+
+        sb.append("EntityNotificationV1{");
+        super.toString(sb);
+        sb.append(", entity=");
+        if (entity != null) {
+            entity.toString(sb);
+        } else {
+            sb.append(entity);
+        }
+        sb.append(", operationType=").append(operationType);
+        sb.append(", traits=[");
+        AtlasBaseTypeDef.dumpObjects(traits, sb);
+        sb.append("]");
+        sb.append("}");
+
+        return sb;
+    }
+
+
+    // ----- helper methods ----------------------------------------------------
+
+    private static List<Struct> getAllTraits(Referenceable entityDefinition, AtlasTypeRegistry typeRegistry) {
+        List<Struct> ret = new LinkedList<>();
+
+        for (String traitName : entityDefinition.getTraitNames()) {
+            Struct                  trait          = entityDefinition.getTrait(traitName);
+            AtlasClassificationType traitType      = typeRegistry.getClassificationTypeByName(traitName);
+            Set<String>             superTypeNames = traitType != null ? traitType.getAllSuperTypes() : null;
+
+            ret.add(trait);
+
+            if (CollectionUtils.isNotEmpty(superTypeNames)) {
+                for (String superTypeName : superTypeNames) {
+                    Struct superTypeTrait = new Struct(superTypeName);
+
+                    if (MapUtils.isNotEmpty(trait.getValues())) {
+                        AtlasClassificationType superType = typeRegistry.getClassificationTypeByName(superTypeName);
+
+                        if (superType != null && MapUtils.isNotEmpty(superType.getAllAttributes())) {
+                            Map<String, Object> superTypeTraitAttributes = new HashMap<>();
+
+                            for (Map.Entry<String, Object> attrEntry : trait.getValues().entrySet()) {
+                                String attrName = attrEntry.getKey();
+
+                                if (superType.getAllAttributes().containsKey(attrName)) {
+                                    superTypeTraitAttributes.put(attrName, attrEntry.getValue());
+                                }
+                            }
+
+                            superTypeTrait.setValues(superTypeTraitAttributes);
+                        }
+                    }
+
+                    ret.add(superTypeTrait);
+                }
+            }
+        }
+
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/intg/src/main/java/org/apache/atlas/v1/model/notification/HookNotification.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/v1/model/notification/HookNotification.java b/intg/src/main/java/org/apache/atlas/v1/model/notification/HookNotification.java
deleted file mode 100644
index ae0ec15..0000000
--- a/intg/src/main/java/org/apache/atlas/v1/model/notification/HookNotification.java
+++ /dev/null
@@ -1,423 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.v1.model.notification;
-
-import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
-import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.v1.model.typedef.TypesDef;
-import org.apache.commons.lang.StringUtils;
-import org.codehaus.jackson.annotate.JsonAutoDetect;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlRootElement;
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE;
-import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONLY;
-
-/**
- * Contains the structure of messages transferred from hooks to atlas.
- */
-public class HookNotification {
-    /**
-     * Type of the hook message.
-     */
-    public enum HookNotificationType {
-        TYPE_CREATE, TYPE_UPDATE, ENTITY_CREATE, ENTITY_PARTIAL_UPDATE, ENTITY_FULL_UPDATE, ENTITY_DELETE
-    }
-
-    /**
-     * Base type of hook message.
-     */
-    @JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
-    @JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
-    @JsonIgnoreProperties(ignoreUnknown=true)
-    @XmlRootElement
-    @XmlAccessorType(XmlAccessType.PROPERTY)
-    public static class HookNotificationMessage implements Serializable {
-        private static final long serialVersionUID = 1L;
-
-        public static final String UNKNOW_USER = "UNKNOWN";
-
-        protected HookNotificationType type;
-        protected String               user;
-
-        public HookNotificationMessage() {
-        }
-
-        public HookNotificationMessage(HookNotificationType type, String user) {
-            this.type = type;
-            this.user = user;
-        }
-
-        public HookNotificationType getType() {
-            return type;
-        }
-
-        public void setType(HookNotificationType type) {
-            this.type = type;
-        }
-
-        public String getUser() {
-            if (StringUtils.isEmpty(user)) {
-                return UNKNOW_USER;
-            }
-
-            return user;
-        }
-
-        public void setUser(String user) {
-            this.user = user;
-        }
-
-        public void normalize() { }
-
-        @Override
-        public String toString() {
-            return toString(new StringBuilder()).toString();
-        }
-
-        public StringBuilder toString(StringBuilder sb) {
-            if (sb == null) {
-                sb = new StringBuilder();
-            }
-
-            sb.append("HookNotificationMessage{");
-            sb.append("type=").append(type);
-            sb.append(", user=").append(user);
-            sb.append("}");
-
-            return sb;
-        }
-    }
-
-    /**
-     * Hook message for create type definitions.
-     */
-    @JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
-    @JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
-    @JsonIgnoreProperties(ignoreUnknown=true)
-    @XmlRootElement
-    @XmlAccessorType(XmlAccessType.PROPERTY)
-    public static class TypeRequest extends HookNotificationMessage implements Serializable {
-        private static final long serialVersionUID = 1L;
-
-        private TypesDef typesDef;
-
-        public TypeRequest() {
-        }
-
-        public TypeRequest(HookNotificationType type, TypesDef typesDef, String user) {
-            super(type, user);
-            this.typesDef = typesDef;
-        }
-
-        public TypesDef getTypesDef() {
-            return typesDef;
-        }
-
-        public void setTypesDef(TypesDef typesDef) {
-            this.typesDef = typesDef;
-        }
-
-        @Override
-        public StringBuilder toString(StringBuilder sb) {
-            if (sb == null) {
-                sb = new StringBuilder();
-            }
-
-            sb.append("TypeRequest{");
-            super.toString(sb);
-            sb.append("typesDef=");
-            if (typesDef != null) {
-                typesDef.toString(sb);
-            }
-            sb.append("}");
-
-            return sb;
-        }
-    }
-
-    /**
-     * Hook message for creating new entities.
-     */
-    @JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
-    @JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
-    @JsonIgnoreProperties(ignoreUnknown=true)
-    @XmlRootElement
-    @XmlAccessorType(XmlAccessType.PROPERTY)
-    public static class EntityCreateRequest extends HookNotificationMessage implements Serializable {
-        private static final long serialVersionUID = 1L;
-
-        private List<Referenceable> entities;
-
-        public EntityCreateRequest() {
-        }
-
-        public EntityCreateRequest(String user, Referenceable... entities) {
-            this(HookNotificationType.ENTITY_CREATE, Arrays.asList(entities), user);
-        }
-
-        public EntityCreateRequest(String user, List<Referenceable> entities) {
-            this(HookNotificationType.ENTITY_CREATE, entities, user);
-        }
-
-        protected EntityCreateRequest(HookNotificationType type, List<Referenceable> entities, String user) {
-            super(type, user);
-
-            this.entities = entities;
-        }
-
-        public List<Referenceable> getEntities() {
-            return entities;
-        }
-
-        public void setEntities(List<Referenceable> entities) {
-            this.entities = entities;
-        }
-
-        @Override
-        public void normalize() {
-            super.normalize();
-
-            if (entities != null) {
-                for (Referenceable entity : entities) {
-                    if (entity != null) {
-                        entity.normailze();
-                    }
-                }
-            }
-        }
-
-        @Override
-        public StringBuilder toString(StringBuilder sb) {
-            if (sb == null) {
-                sb = new StringBuilder();
-            }
-
-            sb.append("EntityCreateRequest{");
-            super.toString(sb);
-            sb.append("entities=[");
-            AtlasBaseTypeDef.dumpObjects(getEntities(), sb);
-            sb.append("]");
-            sb.append("}");
-
-            return sb;
-        }
-    }
-
-    /**
-     * Hook message for updating entities(full update).
-     */
-    @JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
-    @JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
-    @JsonIgnoreProperties(ignoreUnknown=true)
-    @XmlRootElement
-    @XmlAccessorType(XmlAccessType.PROPERTY)
-    public static class EntityUpdateRequest extends EntityCreateRequest implements Serializable {
-        private static final long serialVersionUID = 1L;
-
-        public EntityUpdateRequest() {
-        }
-
-        public EntityUpdateRequest(String user, Referenceable... entities) {
-            this(user, Arrays.asList(entities));
-        }
-
-        public EntityUpdateRequest(String user, List<Referenceable> entities) {
-            super(HookNotificationType.ENTITY_FULL_UPDATE, entities, user);
-        }
-
-        @Override
-        public StringBuilder toString(StringBuilder sb) {
-            if (sb == null) {
-                sb = new StringBuilder();
-            }
-
-            sb.append("EntityUpdateRequest{");
-            super.toString(sb);
-            sb.append("entities=[");
-            AtlasBaseTypeDef.dumpObjects(getEntities(), sb);
-            sb.append("]");
-            sb.append("}");
-
-            return sb;
-        }
-    }
-
-    /**
-     * Hook message for updating entities(partial update).
-     */
-    public static class EntityPartialUpdateRequest extends HookNotificationMessage {
-        private static final long serialVersionUID = 1L;
-
-        private String        typeName;
-        private String        attribute;
-        private String        attributeValue;
-        private Referenceable entity;
-
-        public EntityPartialUpdateRequest() {
-        }
-
-        public EntityPartialUpdateRequest(String user, String typeName, String attribute, String attributeValue, Referenceable entity) {
-            super(HookNotificationType.ENTITY_PARTIAL_UPDATE, user);
-
-            this.typeName       = typeName;
-            this.attribute      = attribute;
-            this.attributeValue = attributeValue;
-            this.entity         = entity;
-        }
-
-        public String getTypeName() {
-            return typeName;
-        }
-
-        public void setTypeName(String typeName) {
-            this.typeName = typeName;
-        }
-
-        public String getAttribute() {
-            return attribute;
-        }
-
-        public void setAttribute(String attribute) {
-            this.attribute = attribute;
-        }
-
-        public String getAttributeValue() {
-            return attributeValue;
-        }
-
-        public void setAttributeValue(String attributeValue) {
-            this.attributeValue = attributeValue;
-        }
-
-        public Referenceable getEntity() {
-            return entity;
-        }
-
-        public void setEntity(Referenceable entity) {
-            this.entity = entity;
-        }
-
-        @Override
-        public void normalize() {
-            super.normalize();
-
-            if (entity != null) {
-                entity.normailze();
-            }
-        }
-
-        @Override
-        public StringBuilder toString(StringBuilder sb) {
-            if (sb == null) {
-                sb = new StringBuilder();
-            }
-
-            sb.append("EntityPartialUpdateRequest{");
-            super.toString(sb);
-            sb.append("typeName=").append(typeName);
-            sb.append("attribute=").append(attribute);
-            sb.append("attributeValue=").append(attributeValue);
-            sb.append("entity=");
-            if (entity != null) {
-                entity.toString(sb);
-            }
-            sb.append("}");
-
-            return sb;
-        }
-    }
-
-    /**
-     * Hook message for creating new entities.
-     */
-    @JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
-    @JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
-    @JsonIgnoreProperties(ignoreUnknown=true)
-    @XmlRootElement
-    @XmlAccessorType(XmlAccessType.PROPERTY)
-    public static class EntityDeleteRequest extends HookNotificationMessage implements Serializable {
-        private static final long serialVersionUID = 1L;
-
-        private String typeName;
-        private String attribute;
-        private String attributeValue;
-
-        public EntityDeleteRequest() {
-        }
-
-        public EntityDeleteRequest(String user, String typeName, String attribute, String attributeValue) {
-            this(HookNotificationType.ENTITY_DELETE, user, typeName, attribute, attributeValue);
-        }
-
-        protected EntityDeleteRequest(HookNotificationType type, String user, String typeName, String attribute, String attributeValue) {
-            super(type, user);
-
-            this.typeName       = typeName;
-            this.attribute      = attribute;
-            this.attributeValue = attributeValue;
-        }
-
-        public String getTypeName() {
-            return typeName;
-        }
-
-        public void setTypeName(String typeName) {
-            this.typeName = typeName;
-        }
-
-        public String getAttribute() {
-            return attribute;
-        }
-
-        public void setAttribute(String attribute) {
-            this.attribute = attribute;
-        }
-
-        public String getAttributeValue() {
-            return attributeValue;
-        }
-
-        public void setAttributeValue(String attributeValue) {
-            this.attributeValue = attributeValue;
-        }
-
-        @Override
-        public StringBuilder toString(StringBuilder sb) {
-            if (sb == null) {
-                sb = new StringBuilder();
-            }
-
-            sb.append("EntityDeleteRequest{");
-            super.toString(sb);
-            sb.append("typeName=").append(typeName);
-            sb.append("attribute=").append(attribute);
-            sb.append("attributeValue=").append(attributeValue);
-            sb.append("}");
-
-            return sb;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/intg/src/main/java/org/apache/atlas/v1/model/notification/HookNotificationV1.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/v1/model/notification/HookNotificationV1.java b/intg/src/main/java/org/apache/atlas/v1/model/notification/HookNotificationV1.java
new file mode 100644
index 0000000..c70e7d0
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/v1/model/notification/HookNotificationV1.java
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.v1.model.notification;
+
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.typedef.TypesDef;
+import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE;
+import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+/**
+ * Contains the structure of messages transferred from hooks to atlas.
+ */
+public class HookNotificationV1 {
+
+    /**
+     * Hook message for create type definitions.
+     */
+    @JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+    @JsonSerialize(include=JsonSerialize.Inclusion.ALWAYS)
+    @JsonIgnoreProperties(ignoreUnknown=true)
+    @XmlRootElement
+    @XmlAccessorType(XmlAccessType.PROPERTY)
+    public static class TypeRequest extends HookNotification implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        private TypesDef typesDef;
+
+        public TypeRequest() {
+        }
+
+        public TypeRequest(HookNotificationType type, TypesDef typesDef, String user) {
+            super(type, user);
+            this.typesDef = typesDef;
+        }
+
+        public TypesDef getTypesDef() {
+            return typesDef;
+        }
+
+        public void setTypesDef(TypesDef typesDef) {
+            this.typesDef = typesDef;
+        }
+
+        @Override
+        public StringBuilder toString(StringBuilder sb) {
+            if (sb == null) {
+                sb = new StringBuilder();
+            }
+
+            sb.append("TypeRequest{");
+            super.toString(sb);
+            sb.append("typesDef=");
+            if (typesDef != null) {
+                typesDef.toString(sb);
+            }
+            sb.append("}");
+
+            return sb;
+        }
+    }
+
+    /**
+     * Hook message for creating new entities.
+     */
+    @JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+    @JsonSerialize(include=JsonSerialize.Inclusion.ALWAYS)
+    @JsonIgnoreProperties(ignoreUnknown=true)
+    @XmlRootElement
+    @XmlAccessorType(XmlAccessType.PROPERTY)
+    public static class EntityCreateRequest extends HookNotification implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        private List<Referenceable> entities;
+
+        public EntityCreateRequest() {
+        }
+
+        public EntityCreateRequest(String user, Referenceable... entities) {
+            this(HookNotificationType.ENTITY_CREATE, Arrays.asList(entities), user);
+        }
+
+        public EntityCreateRequest(String user, List<Referenceable> entities) {
+            this(HookNotificationType.ENTITY_CREATE, entities, user);
+        }
+
+        protected EntityCreateRequest(HookNotificationType type, List<Referenceable> entities, String user) {
+            super(type, user);
+
+            this.entities = entities;
+        }
+
+        public List<Referenceable> getEntities() {
+            return entities;
+        }
+
+        public void setEntities(List<Referenceable> entities) {
+            this.entities = entities;
+        }
+
+        @Override
+        public void normalize() {
+            super.normalize();
+
+            if (entities != null) {
+                for (Referenceable entity : entities) {
+                    if (entity != null) {
+                        entity.normailze();
+                    }
+                }
+            }
+        }
+
+        @Override
+        public StringBuilder toString(StringBuilder sb) {
+            if (sb == null) {
+                sb = new StringBuilder();
+            }
+
+            sb.append("EntityCreateRequest{");
+            super.toString(sb);
+            sb.append("entities=[");
+            AtlasBaseTypeDef.dumpObjects(getEntities(), sb);
+            sb.append("]");
+            sb.append("}");
+
+            return sb;
+        }
+    }
+
+    /**
+     * Hook message for updating entities(full update).
+     */
+    @JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+    @JsonSerialize(include=JsonSerialize.Inclusion.ALWAYS)
+    @JsonIgnoreProperties(ignoreUnknown=true)
+    @XmlRootElement
+    @XmlAccessorType(XmlAccessType.PROPERTY)
+    public static class EntityUpdateRequest extends EntityCreateRequest implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        public EntityUpdateRequest() {
+        }
+
+        public EntityUpdateRequest(String user, Referenceable... entities) {
+            this(user, Arrays.asList(entities));
+        }
+
+        public EntityUpdateRequest(String user, List<Referenceable> entities) {
+            super(HookNotificationType.ENTITY_FULL_UPDATE, entities, user);
+        }
+
+        @Override
+        public StringBuilder toString(StringBuilder sb) {
+            if (sb == null) {
+                sb = new StringBuilder();
+            }
+
+            sb.append("EntityUpdateRequest{");
+            super.toString(sb);
+            sb.append("entities=[");
+            AtlasBaseTypeDef.dumpObjects(getEntities(), sb);
+            sb.append("]");
+            sb.append("}");
+
+            return sb;
+        }
+    }
+
+    /**
+     * Hook message for updating entities(partial update).
+     */
+    @JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+    @JsonSerialize(include=JsonSerialize.Inclusion.ALWAYS)
+    @JsonIgnoreProperties(ignoreUnknown=true)
+    @XmlRootElement
+    @XmlAccessorType(XmlAccessType.PROPERTY)
+    public static class EntityPartialUpdateRequest extends HookNotification {
+        private static final long serialVersionUID = 1L;
+
+        private String        typeName;
+        private String        attribute;
+        private String        attributeValue;
+        private Referenceable entity;
+
+        public EntityPartialUpdateRequest() {
+        }
+
+        public EntityPartialUpdateRequest(String user, String typeName, String attribute, String attributeValue, Referenceable entity) {
+            super(HookNotificationType.ENTITY_PARTIAL_UPDATE, user);
+
+            this.typeName       = typeName;
+            this.attribute      = attribute;
+            this.attributeValue = attributeValue;
+            this.entity         = entity;
+        }
+
+        public String getTypeName() {
+            return typeName;
+        }
+
+        public void setTypeName(String typeName) {
+            this.typeName = typeName;
+        }
+
+        public String getAttribute() {
+            return attribute;
+        }
+
+        public void setAttribute(String attribute) {
+            this.attribute = attribute;
+        }
+
+        public String getAttributeValue() {
+            return attributeValue;
+        }
+
+        public void setAttributeValue(String attributeValue) {
+            this.attributeValue = attributeValue;
+        }
+
+        public Referenceable getEntity() {
+            return entity;
+        }
+
+        public void setEntity(Referenceable entity) {
+            this.entity = entity;
+        }
+
+        @Override
+        public void normalize() {
+            super.normalize();
+
+            if (entity != null) {
+                entity.normailze();
+            }
+        }
+
+        @Override
+        public StringBuilder toString(StringBuilder sb) {
+            if (sb == null) {
+                sb = new StringBuilder();
+            }
+
+            sb.append("EntityPartialUpdateRequest{");
+            super.toString(sb);
+            sb.append("typeName=").append(typeName);
+            sb.append("attribute=").append(attribute);
+            sb.append("attributeValue=").append(attributeValue);
+            sb.append("entity=");
+            if (entity != null) {
+                entity.toString(sb);
+            }
+            sb.append("}");
+
+            return sb;
+        }
+    }
+
+    /**
+     * Hook message for entity delete.
+     */
+    @JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+    @JsonSerialize(include=JsonSerialize.Inclusion.ALWAYS)
+    @JsonIgnoreProperties(ignoreUnknown=true)
+    @XmlRootElement
+    @XmlAccessorType(XmlAccessType.PROPERTY)
+    public static class EntityDeleteRequest extends HookNotification implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        private String typeName;
+        private String attribute;
+        private String attributeValue;
+
+        public EntityDeleteRequest() {
+        }
+
+        public EntityDeleteRequest(String user, String typeName, String attribute, String attributeValue) {
+            this(HookNotificationType.ENTITY_DELETE, user, typeName, attribute, attributeValue);
+        }
+
+        protected EntityDeleteRequest(HookNotificationType type, String user, String typeName, String attribute, String attributeValue) {
+            super(type, user);
+
+            this.typeName       = typeName;
+            this.attribute      = attribute;
+            this.attributeValue = attributeValue;
+        }
+
+        public String getTypeName() {
+            return typeName;
+        }
+
+        public void setTypeName(String typeName) {
+            this.typeName = typeName;
+        }
+
+        public String getAttribute() {
+            return attribute;
+        }
+
+        public void setAttribute(String attribute) {
+            this.attribute = attribute;
+        }
+
+        public String getAttributeValue() {
+            return attributeValue;
+        }
+
+        public void setAttributeValue(String attributeValue) {
+            this.attributeValue = attributeValue;
+        }
+
+        @Override
+        public StringBuilder toString(StringBuilder sb) {
+            if (sb == null) {
+                sb = new StringBuilder();
+            }
+
+            sb.append("EntityDeleteRequest{");
+            super.toString(sb);
+            sb.append("typeName=").append(typeName);
+            sb.append("attribute=").append(attribute);
+            sb.append("attributeValue=").append(attributeValue);
+            sb.append("}");
+
+            return sb;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/intg/src/main/java/org/apache/atlas/v1/model/typedef/TypesDef.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/v1/model/typedef/TypesDef.java b/intg/src/main/java/org/apache/atlas/v1/model/typedef/TypesDef.java
index f8bcfa3..6a8bcb4 100644
--- a/intg/src/main/java/org/apache/atlas/v1/model/typedef/TypesDef.java
+++ b/intg/src/main/java/org/apache/atlas/v1/model/typedef/TypesDef.java
@@ -34,7 +34,7 @@ import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONL
 
 
 @JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include=JsonSerialize.Inclusion.ALWAYS)
 @JsonIgnoreProperties(ignoreUnknown=true)
 @XmlRootElement
 @XmlAccessorType(XmlAccessType.PROPERTY)


[2/4] atlas git commit: ATLAS-2251: notification module updates (#4)

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 779298a..456a778 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -29,13 +29,14 @@ import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.ha.HAConfiguration;
 import org.apache.atlas.kafka.AtlasKafkaMessage;
 import org.apache.atlas.listener.ActiveStateChangeHandler;
-import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.notification.NotificationInterface.NotificationType;
 import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.v1.model.notification.HookNotification.EntityCreateRequest;
-import org.apache.atlas.v1.model.notification.HookNotification.EntityDeleteRequest;
-import org.apache.atlas.v1.model.notification.HookNotification.EntityPartialUpdateRequest;
-import org.apache.atlas.v1.model.notification.HookNotification.EntityUpdateRequest;
-import org.apache.atlas.v1.model.notification.HookNotification.HookNotificationMessage;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityDeleteRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityPartialUpdateRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityUpdateRequest;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream;
@@ -56,10 +57,7 @@ import org.springframework.core.annotation.Order;
 import org.springframework.stereotype.Component;
 
 import javax.inject.Inject;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -76,37 +74,37 @@ import static org.apache.atlas.AtlasClientV2.API_V2.UPDATE_ENTITY_BY_ATTRIBUTE;
 @Order(4)
 @DependsOn(value = {"atlasTypeDefStoreInitializer", "atlasTypeDefGraphStoreV1"})
 public class NotificationHookConsumer implements Service, ActiveStateChangeHandler {
-    private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
-    private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger(NotificationHookConsumer.class);
-    private static final String LOCALHOST = "localhost";
-    private static Logger FAILED_LOG = LoggerFactory.getLogger("FAILED");
+    private static final Logger LOG        = LoggerFactory.getLogger(NotificationHookConsumer.class);
+    private static final Logger PERF_LOG   = AtlasPerfTracer.getPerfLogger(NotificationHookConsumer.class);
+    private static final Logger FAILED_LOG = LoggerFactory.getLogger("FAILED");
 
+    private static final String LOCALHOST         = "localhost";
     private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName();
 
-    public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
-    public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries";
+    public static final String CONSUMER_THREADS_PROPERTY         = "atlas.notification.hook.numthreads";
+    public static final String CONSUMER_RETRIES_PROPERTY         = "atlas.notification.hook.maxretries";
     public static final String CONSUMER_FAILEDCACHESIZE_PROPERTY = "atlas.notification.hook.failedcachesize";
-    public static final String CONSUMER_RETRY_INTERVAL = "atlas.notification.consumer.retry.interval";
-    public static final String CONSUMER_MIN_RETRY_INTERVAL = "atlas.notification.consumer.min.retry.interval";
-    public static final String CONSUMER_MAX_RETRY_INTERVAL = "atlas.notification.consumer.max.retry.interval";
-
+    public static final String CONSUMER_RETRY_INTERVAL           = "atlas.notification.consumer.retry.interval";
+    public static final String CONSUMER_MIN_RETRY_INTERVAL       = "atlas.notification.consumer.min.retry.interval";
+    public static final String CONSUMER_MAX_RETRY_INTERVAL       = "atlas.notification.consumer.max.retry.interval";
 
     public static final int SERVER_READY_WAIT_TIME_MS = 1000;
-    private final AtlasEntityStore atlasEntityStore;
-    private final ServiceState serviceState;
+
+    private final AtlasEntityStore       atlasEntityStore;
+    private final ServiceState           serviceState;
     private final AtlasInstanceConverter instanceConverter;
-    private final AtlasTypeRegistry typeRegistry;
-    private final int maxRetries;
-    private final int failedMsgCacheSize;
+    private final AtlasTypeRegistry      typeRegistry;
+    private final int                    maxRetries;
+    private final int                    failedMsgCacheSize;
+    private final int                    minWaitDuration;
+    private final int                    maxWaitDuration;
+
+    private NotificationInterface notificationInterface;
+    private ExecutorService       executors;
+    private Configuration         applicationProperties;
 
     @VisibleForTesting
     final int consumerRetryInterval;
-    private final int minWaitDuration;
-    private final int maxWaitDuration;
-
-    private NotificationInterface notificationInterface;
-    private ExecutorService executors;
-    private Configuration applicationProperties;
 
     @VisibleForTesting
     List<HookConsumer> consumers;
@@ -116,18 +114,17 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                                     ServiceState serviceState, AtlasInstanceConverter instanceConverter,
                                     AtlasTypeRegistry typeRegistry) throws AtlasException {
         this.notificationInterface = notificationInterface;
-        this.atlasEntityStore = atlasEntityStore;
-        this.serviceState = serviceState;
-        this.instanceConverter = instanceConverter;
-        this.typeRegistry = typeRegistry;
-
+        this.atlasEntityStore      = atlasEntityStore;
+        this.serviceState          = serviceState;
+        this.instanceConverter     = instanceConverter;
+        this.typeRegistry          = typeRegistry;
         this.applicationProperties = ApplicationProperties.get();
 
-        maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3);
-        failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 20);
+        maxRetries            = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3);
+        failedMsgCacheSize    = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 20);
         consumerRetryInterval = applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500);
-        minWaitDuration = applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, consumerRetryInterval); // 500 ms  by default
-        maxWaitDuration = applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 60);  //  30 sec by default
+        minWaitDuration       = applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, consumerRetryInterval); // 500 ms  by default
+        maxWaitDuration       = applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 60);  //  30 sec by default
     }
 
     @Override
@@ -144,21 +141,24 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         }
         if (!HAConfiguration.isHAEnabled(configuration)) {
             LOG.info("HA is disabled, starting consumers inline.");
+
             startConsumers(executorService);
         }
     }
 
     private void startConsumers(ExecutorService executorService) {
-        int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
-        List<NotificationConsumer<HookNotificationMessage>> notificationConsumers =
-                notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads);
+        int                                          numThreads            = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
+        List<NotificationConsumer<HookNotification>> notificationConsumers = notificationInterface.createConsumers(NotificationType.HOOK, numThreads);
+
         if (executorService == null) {
-            executorService = Executors.newFixedThreadPool(notificationConsumers.size(),
-                    new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build());
+            executorService = Executors.newFixedThreadPool(notificationConsumers.size(), new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build());
         }
+
         executors = executorService;
-        for (final NotificationConsumer<HookNotificationMessage> consumer : notificationConsumers) {
+
+        for (final NotificationConsumer<HookNotification> consumer : notificationConsumers) {
             HookConsumer hookConsumer = new HookConsumer(consumer);
+
             consumers.add(hookConsumer);
             executors.submit(hookConsumer);
         }
@@ -171,11 +171,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             stopConsumerThreads();
             if (executors != null) {
                 executors.shutdown();
+
                 if (!executors.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                     LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly");
                 }
+
                 executors = null;
             }
+
             notificationInterface.close();
         } catch (InterruptedException e) {
             LOG.error("Failure in shutting down consumers");
@@ -189,6 +192,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             for (HookConsumer consumer : consumers) {
                 consumer.shutdown();
             }
+
             consumers.clear();
         }
 
@@ -204,6 +208,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     @Override
     public void instanceIsActive() {
         LOG.info("Reacting to active state: initializing Kafka consumers");
+
         startConsumers(executors);
     }
 
@@ -216,6 +221,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     @Override
     public void instanceIsPassive() {
         LOG.info("Reacting to passive state: shutting down Kafka consumers.");
+
         stop();
     }
 
@@ -235,18 +241,17 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         private final long maxDuration;
         private final long minDuration;
         private final long resetInterval;
+        private       long lastWaitAt;
 
-        private long lastWaitAt;
         @VisibleForTesting
         long waitDuration;
 
         public AdaptiveWaiter(long minDuration, long maxDuration, long increment) {
-            this.minDuration = minDuration;
-            this.maxDuration = maxDuration;
-            this.increment = increment;
-
-            this.waitDuration = minDuration;
-            this.lastWaitAt = 0;
+            this.minDuration   = minDuration;
+            this.maxDuration   = maxDuration;
+            this.increment     = increment;
+            this.waitDuration  = minDuration;
+            this.lastWaitAt    = 0;
             this.resetInterval = maxDuration * 2;
         }
 
@@ -268,7 +273,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
         private void setWaitDurations() {
             long timeSinceLastWait = (lastWaitAt == 0) ? 0 : System.currentTimeMillis() - lastWaitAt;
+
             lastWaitAt = System.currentTimeMillis();
+
             if (timeSinceLastWait > resetInterval) {
                 waitDuration = minDuration;
             } else {
@@ -282,14 +289,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
     @VisibleForTesting
     class HookConsumer extends ShutdownableThread {
-        private final NotificationConsumer<HookNotificationMessage> consumer;
-        private final AtomicBoolean shouldRun = new AtomicBoolean(false);
-        private List<HookNotificationMessage> failedMessages = new ArrayList<>();
-
-        private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration);
+        private final NotificationConsumer<HookNotification> consumer;
+        private final AtomicBoolean                          shouldRun      = new AtomicBoolean(false);
+        private final List<HookNotification>                 failedMessages = new ArrayList<>();
+        private final AdaptiveWaiter                         adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration);
 
-        public HookConsumer(NotificationConsumer<HookNotificationMessage> consumer) {
+        public HookConsumer(NotificationConsumer<HookNotification> consumer) {
             super("atlas-hook-consumer-thread", false);
+
             this.consumer = consumer;
         }
 
@@ -306,8 +313,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             try {
                 while (shouldRun.get()) {
                     try {
-                        List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive();
-                        for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) {
+                        List<AtlasKafkaMessage<HookNotification>> messages = consumer.receive();
+
+                        for (AtlasKafkaMessage<HookNotification> msg : messages) {
                             handleMessage(msg);
                         }
                     } catch (IllegalStateException ex) {
@@ -315,6 +323,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                     } catch (Exception e) {
                         if (shouldRun.get()) {
                             LOG.warn("Exception in NotificationHookConsumer", e);
+
                             adaptiveWaiter.pause(e);
                         } else {
                             break;
@@ -324,6 +333,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             } finally {
                 if (consumer != null) {
                     LOG.info("closing NotificationConsumer");
+
                     consumer.close();
                 }
 
@@ -332,11 +342,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         }
 
         @VisibleForTesting
-        void handleMessage(AtlasKafkaMessage<HookNotificationMessage> kafkaMsg) throws AtlasServiceException, AtlasException {
-            AtlasPerfTracer perf = null;
-
-            HookNotificationMessage message = kafkaMsg.getMessage();
-            String messageUser = message.getUser();
+        void handleMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) throws AtlasServiceException, AtlasException {
+            AtlasPerfTracer  perf        = null;
+            HookNotification message     = kafkaMsg.getMessage();
+            String           messageUser = message.getUser();
 
             if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
                 perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, message.getType().name());
@@ -344,21 +353,25 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
             try {
                 // Used for intermediate conversions during create and update
-                AtlasEntity.AtlasEntitiesWithExtInfo entities = null;
+                AtlasEntitiesWithExtInfo entities = null;
+
                 for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("handleMessage({}): attempt {}", message.getType().name(), numRetries);
                     }
+
                     try {
                         RequestContextV1 requestContext = RequestContextV1.get();
+
                         requestContext.setUser(messageUser);
 
                         switch (message.getType()) {
                             case ENTITY_CREATE:
-                                EntityCreateRequest createRequest = (EntityCreateRequest) message;
+                                final EntityCreateRequest createRequest = (EntityCreateRequest) message;
 
                                 if (numRetries == 0) { // audit only on the first attempt
                                     AtlasBaseClient.API api = AtlasClient.API_V1.CREATE_ENTITY;
+
                                     audit(messageUser, api.getMethod(), api.getNormalizedPath());
                                 }
 
@@ -372,19 +385,16 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
                                 if (numRetries == 0) { // audit only on the first attempt
                                     AtlasBaseClient.API api = UPDATE_ENTITY_BY_ATTRIBUTE;
-                                    audit(messageUser, api.getMethod(),
-                                          String.format(api.getNormalizedPath(), partialUpdateRequest.getTypeName()));
+
+                                    audit(messageUser, api.getMethod(), String.format(api.getNormalizedPath(), partialUpdateRequest.getTypeName()));
                                 }
 
                                 Referenceable referenceable = partialUpdateRequest.getEntity();
+
                                 entities = instanceConverter.toAtlasEntity(referenceable);
 
                                 AtlasEntityType entityType = typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName());
-                                String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, new HashMap<String, Object>() {
-                                    {
-                                        put(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue());
-                                    }
-                                });
+                                String          guid       = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, Collections.singletonMap(partialUpdateRequest.getAttribute(), (Object)partialUpdateRequest.getAttributeValue()));
 
                                 // There should only be one root entity
                                 entities.getEntities().get(0).setGuid(guid);
@@ -397,30 +407,30 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
                                 if (numRetries == 0) { // audit only on the first attempt
                                     AtlasBaseClient.API api = DELETE_ENTITY_BY_ATTRIBUTE;
-                                    audit(messageUser, api.getMethod(),
-                                          String.format(api.getNormalizedPath(), deleteRequest.getTypeName()));
+
+                                    audit(messageUser, api.getMethod(), String.format(api.getNormalizedPath(), deleteRequest.getTypeName()));
                                 }
 
                                 try {
                                     AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName());
-                                    atlasEntityStore.deleteByUniqueAttributes(type,
-                                            new HashMap<String, Object>() {{
-                                                put(deleteRequest.getAttribute(), deleteRequest.getAttributeValue());
-                                            }});
+
+                                    atlasEntityStore.deleteByUniqueAttributes(type, Collections.singletonMap(deleteRequest.getAttribute(), (Object) deleteRequest.getAttributeValue()));
                                 } catch (ClassCastException cle) {
                                     LOG.error("Failed to do a partial update on Entity");
                                 }
                                 break;
 
                             case ENTITY_FULL_UPDATE:
-                                EntityUpdateRequest updateRequest = (EntityUpdateRequest) message;
+                                final EntityUpdateRequest updateRequest = (EntityUpdateRequest) message;
 
                                 if (numRetries == 0) { // audit only on the first attempt
                                     AtlasBaseClient.API api = UPDATE_ENTITY;
+
                                     audit(messageUser, api.getMethod(), api.getNormalizedPath());
                                 }
 
                                 entities = instanceConverter.toAtlasEntities(updateRequest.getEntities());
+
                                 atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
                                 break;
 
@@ -433,6 +443,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                         LOG.warn("Error handling message", e);
                         try {
                             LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
+
                             Thread.sleep(consumerRetryInterval);
                         } catch (InterruptedException ie) {
                             LOG.error("Notification consumer thread sleep interrupted");
@@ -440,7 +451,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
                         if (numRetries == (maxRetries - 1)) {
                             LOG.warn("Max retries exceeded for message {}", message, e);
+
                             failedMessages.add(message);
+
                             if (failedMessages.size() >= failedMsgCacheSize) {
                                 recordFailedMessages();
                             }
@@ -458,15 +471,18 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
         private void recordFailedMessages() {
             //logging failed messages
-            for (HookNotificationMessage message : failedMessages) {
+            for (HookNotification message : failedMessages) {
                 FAILED_LOG.error("[DROPPED_NOTIFICATION] {}", AbstractNotification.getMessageJson(message));
             }
+
             failedMessages.clear();
         }
 
-        private void commit(AtlasKafkaMessage<HookNotificationMessage> kafkaMessage) {
+        private void commit(AtlasKafkaMessage<HookNotification> kafkaMessage) {
             recordFailedMessages();
+
             TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition());
+
             consumer.commit(partition, kafkaMessage.getOffset() + 1);
         }
 
@@ -474,22 +490,23 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             try {
                 while (serviceState.getState() != ServiceState.ServiceStateValue.ACTIVE) {
                     try {
-                        LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...",
-                                SERVER_READY_WAIT_TIME_MS);
+                        LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...", SERVER_READY_WAIT_TIME_MS);
+
                         timer.sleep(SERVER_READY_WAIT_TIME_MS);
                     } catch (InterruptedException e) {
-                        LOG.info("Interrupted while waiting for Atlas Server to become ready, "
-                                + "exiting consumer thread.", e);
+                        LOG.info("Interrupted while waiting for Atlas Server to become ready, " + "exiting consumer thread.", e);
+
                         return false;
                     }
                 }
             } catch (Throwable e) {
-                LOG.info(
-                        "Handled AtlasServiceException while waiting for Atlas Server to become ready, "
-                                + "exiting consumer thread.", e);
+                LOG.info("Handled AtlasServiceException while waiting for Atlas Server to become ready, exiting consumer thread.", e);
+
                 return false;
             }
+
             LOG.info("Atlas Server is ready, can start reading Kafka events.");
+
             return true;
         }
 
@@ -504,12 +521,15 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             }
 
             super.initiateShutdown();
+
             shouldRun.set(false);
+
             if (consumer != null) {
                 consumer.wakeup();
             }
 
             super.awaitShutdown();
+
             LOG.info("<== HookConsumer shutdown()");
         }
     }
@@ -519,7 +539,6 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             LOG.debug("==> audit({},{}, {})", messageUser, method, path);
         }
 
-        AuditFilter.audit(messageUser, THREADNAME_PREFIX, method, LOCALHOST, path, LOCALHOST,
-                DateTimeHelper.formatDateUTC(new Date()));
+        AuditFilter.audit(messageUser, THREADNAME_PREFIX, method, LOCALHOST, path, LOCALHOST, DateTimeHelper.formatDateUTC(new Date()));
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
index 517d25f..5baafeb 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
@@ -21,11 +21,13 @@ package org.apache.atlas.notification;
 import com.google.common.collect.ImmutableSet;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.kafka.NotificationProvider;
+import org.apache.atlas.notification.NotificationInterface.NotificationType;
 import org.apache.atlas.v1.model.instance.Id;
 import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.atlas.v1.model.instance.Struct;
-import org.apache.atlas.v1.model.typedef.TraitTypeDefinition;
-import org.apache.atlas.v1.model.notification.EntityNotification;
+import org.apache.atlas.v1.model.notification.EntityNotificationV1;
+import org.apache.atlas.v1.model.notification.EntityNotificationV1.OperationType;
+import org.apache.atlas.v1.model.typedef.*;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.v1.typesystem.types.utils.TypesUtil;
 import org.apache.atlas.web.integration.BaseResourceIT;
@@ -42,33 +44,35 @@ import static org.testng.Assert.assertTrue;
  * Entity Notification Integration Tests.
  */
 public class EntityNotificationIT extends BaseResourceIT {
-
-    private final String DATABASE_NAME = "db" + randomString();
-    private final String TABLE_NAME = "table" + randomString();
-    private NotificationInterface notificationInterface = NotificationProvider.get();
-    private Id tableId;
-    private Id dbId;
-    private String traitName;
-    private NotificationConsumer notificationConsumer;
+    private final String                DATABASE_NAME         = "db" + randomString();
+    private final String                TABLE_NAME            = "table" + randomString();
+    private final NotificationInterface notificationInterface = NotificationProvider.get();
+    private       Id                    tableId;
+    private       Id                    dbId;
+    private       String                traitName;
+    private       NotificationConsumer  notificationConsumer;
 
     @BeforeClass
     public void setUp() throws Exception {
         super.setUp();
+
         createTypeDefinitionsV1();
+
         Referenceable HiveDBInstance = createHiveDBInstanceBuiltIn(DATABASE_NAME);
+
         dbId = createInstance(HiveDBInstance);
 
-        notificationConsumer = notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1).get(0);
+        notificationConsumer = notificationInterface.createConsumers(NotificationType.ENTITIES, 1).get(0);
     }
 
     public void testCreateEntity() throws Exception {
         Referenceable tableInstance = createHiveTableInstanceBuiltIn(DATABASE_NAME, TABLE_NAME, dbId);
+
         tableId = createInstance(tableInstance);
 
         final String guid = tableId._getId();
 
-        waitForNotification(notificationConsumer, MAX_WAIT_TIME,
-                newNotificationPredicate(EntityNotification.OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE_BUILTIN, guid));
+        waitForNotification(notificationConsumer, MAX_WAIT_TIME, newNotificationPredicate(OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE_BUILTIN, guid));
     }
 
     public void testUpdateEntity() throws Exception {
@@ -79,83 +83,83 @@ public class EntityNotificationIT extends BaseResourceIT {
 
         atlasClientV1.updateEntityAttribute(guid, property, newValue);
 
-        waitForNotification(notificationConsumer, MAX_WAIT_TIME,
-                newNotificationPredicate(EntityNotification.OperationType.ENTITY_UPDATE, HIVE_TABLE_TYPE_BUILTIN, guid));
+        waitForNotification(notificationConsumer, MAX_WAIT_TIME, newNotificationPredicate(OperationType.ENTITY_UPDATE, HIVE_TABLE_TYPE_BUILTIN, guid));
     }
 
     public void testDeleteEntity() throws Exception {
-        final String tableName = "table-" + randomString();
-        final String dbName = "db-" + randomString();
-        Referenceable HiveDBInstance = createHiveDBInstanceBuiltIn(dbName);
-        Id dbId = createInstance(HiveDBInstance);
+        final String        tableName      = "table-" + randomString();
+        final String        dbName         = "db-" + randomString();
+        final Referenceable HiveDBInstance = createHiveDBInstanceBuiltIn(dbName);
+        final Id            dbId           = createInstance(HiveDBInstance);
+        final Referenceable tableInstance  = createHiveTableInstanceBuiltIn(dbName, tableName, dbId);
+        final Id            tableId        = createInstance(tableInstance);
+        final String        guid           = tableId._getId();
 
-        Referenceable tableInstance = createHiveTableInstanceBuiltIn(dbName, tableName, dbId);
-        final Id tableId = createInstance(tableInstance);
-        final String guid = tableId._getId();
-
-        waitForNotification(notificationConsumer, MAX_WAIT_TIME,
-            newNotificationPredicate(EntityNotification.OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE_BUILTIN, guid));
+        waitForNotification(notificationConsumer, MAX_WAIT_TIME, newNotificationPredicate(OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE_BUILTIN, guid));
 
         final String name = (String) tableInstance.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME);
 
         atlasClientV1.deleteEntity(HIVE_TABLE_TYPE_BUILTIN, AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
 
-        waitForNotification(notificationConsumer, MAX_WAIT_TIME,
-            newNotificationPredicate(EntityNotification.OperationType.ENTITY_DELETE, HIVE_TABLE_TYPE_BUILTIN, guid));
+        waitForNotification(notificationConsumer, MAX_WAIT_TIME, newNotificationPredicate(OperationType.ENTITY_DELETE, HIVE_TABLE_TYPE_BUILTIN, guid));
     }
 
     public void testAddTrait() throws Exception {
         String superSuperTraitName = "SuperTrait" + randomString();
-        createTrait(superSuperTraitName);
-
-        String superTraitName = "SuperTrait" + randomString();
-        createTrait(superTraitName, superSuperTraitName);
+        String superTraitName      = "SuperTrait" + randomString();
 
         traitName = "Trait" + randomString();
+
+        createTrait(superSuperTraitName);
+        createTrait(superTraitName, superSuperTraitName);
         createTrait(traitName, superTraitName);
 
-        Struct traitInstance = new Struct(traitName);
+        Struct traitInstance     = new Struct(traitName);
         String traitInstanceJSON = AtlasType.toV1Json(traitInstance);
+
         LOG.debug("Trait instance = {}", traitInstanceJSON);
 
         final String guid = tableId._getId();
 
         atlasClientV1.addTrait(guid, traitInstance);
 
-        EntityNotification entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME,
-                newNotificationPredicate(EntityNotification.OperationType.TRAIT_ADD, HIVE_TABLE_TYPE_BUILTIN, guid));
+        EntityNotificationV1 entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME, newNotificationPredicate(OperationType.TRAIT_ADD, HIVE_TABLE_TYPE_BUILTIN, guid));
 
         Referenceable entity = entityNotification.getEntity();
+
         assertTrue(entity.getTraitNames().contains(traitName));
 
-        List<Struct> allTraits = entityNotification.getAllTraits();
+        List<Struct> allTraits     = entityNotification.getAllTraits();
         List<String> allTraitNames = new LinkedList<>();
 
         for (Struct struct : allTraits) {
             allTraitNames.add(struct.getTypeName());
         }
+
         assertTrue(allTraitNames.contains(traitName));
         assertTrue(allTraitNames.contains(superTraitName));
         assertTrue(allTraitNames.contains(superSuperTraitName));
 
         String anotherTraitName = "Trait" + randomString();
+
         createTrait(anotherTraitName, superTraitName);
 
-        traitInstance = new Struct(anotherTraitName);
+        traitInstance     = new Struct(anotherTraitName);
         traitInstanceJSON = AtlasType.toV1Json(traitInstance);
+
         LOG.debug("Trait instance = {}", traitInstanceJSON);
 
         atlasClientV1.addTrait(guid, traitInstance);
 
-        entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME,
-                newNotificationPredicate(EntityNotification.OperationType.TRAIT_ADD, HIVE_TABLE_TYPE_BUILTIN, guid));
+        entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME, newNotificationPredicate(OperationType.TRAIT_ADD, HIVE_TABLE_TYPE_BUILTIN, guid));
 
-        allTraits = entityNotification.getAllTraits();
+        allTraits     = entityNotification.getAllTraits();
         allTraitNames = new LinkedList<>();
 
         for (Struct struct : allTraits) {
             allTraitNames.add(struct.getTypeName());
         }
+
         assertTrue(allTraitNames.contains(traitName));
         assertTrue(allTraitNames.contains(anotherTraitName));
         // verify that the super type shows up twice in all traits
@@ -167,8 +171,8 @@ public class EntityNotificationIT extends BaseResourceIT {
 
         atlasClientV1.deleteTrait(guid, traitName);
 
-        EntityNotification entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME,
-                newNotificationPredicate(EntityNotification.OperationType.TRAIT_DELETE, HIVE_TABLE_TYPE_BUILTIN, guid));
+        EntityNotificationV1 entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME,
+                newNotificationPredicate(EntityNotificationV1.OperationType.TRAIT_DELETE, HIVE_TABLE_TYPE_BUILTIN, guid));
 
         assertFalse(entityNotification.getEntity().getTraitNames().contains(traitName));
     }
@@ -177,11 +181,15 @@ public class EntityNotificationIT extends BaseResourceIT {
     // ----- helper methods ---------------------------------------------------
 
     private void createTrait(String traitName, String ... superTraitNames) throws Exception {
-        TraitTypeDefinition trait =
-            TypesUtil.createTraitTypeDef(traitName, null, ImmutableSet.copyOf(superTraitNames));
+        TraitTypeDefinition traitDef = TypesUtil.createTraitTypeDef(traitName, null, ImmutableSet.copyOf(superTraitNames));
+        TypesDef            typesDef = new TypesDef(Collections.<EnumTypeDefinition>emptyList(),
+                                                    Collections.<StructTypeDefinition>emptyList(),
+                                                    Collections.singletonList(traitDef),
+                                                    Collections.<ClassTypeDefinition>emptyList());
+        String traitDefinitionJSON = AtlasType.toV1Json(typesDef);
 
-        String traitDefinitionJSON = AtlasType.toV1Json(trait);
         LOG.debug("Trait definition = {}", traitDefinitionJSON);
+
         createType(traitDefinitionJSON);
     }
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
index 1f045e4..f248593 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
@@ -20,14 +20,13 @@ package org.apache.atlas.notification;
 
 import org.apache.atlas.EntityAuditEvent;
 import org.apache.atlas.kafka.NotificationProvider;
+import org.apache.atlas.model.notification.HookNotification;
 import org.apache.atlas.v1.model.instance.Id;
 import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.v1.model.notification.HookNotification;
-import org.apache.atlas.v1.model.notification.HookNotification.HookNotificationMessage;
-import org.apache.atlas.v1.model.notification.HookNotification.EntityDeleteRequest;
-import org.apache.atlas.v1.model.notification.HookNotification.EntityPartialUpdateRequest;
-import org.apache.atlas.v1.model.notification.HookNotification.EntityCreateRequest;
-import org.apache.atlas.v1.model.notification.HookNotification.EntityUpdateRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityDeleteRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityPartialUpdateRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityUpdateRequest;
 import org.apache.atlas.web.integration.BaseResourceIT;
 import org.codehaus.jettison.json.JSONArray;
 import org.testng.annotations.AfterClass;
@@ -40,18 +39,19 @@ import static java.lang.Thread.sleep;
 import static org.testng.Assert.assertEquals;
 
 public class NotificationHookConsumerIT extends BaseResourceIT {
-
     private static final String TEST_USER = "testuser";
-    public static final String NAME = "name";
-    public static final String DESCRIPTION = "description";
+
+    public static final String NAME           = "name";
+    public static final String DESCRIPTION    = "description";
     public static final String QUALIFIED_NAME = "qualifiedName";
-    public static final String CLUSTER_NAME = "clusterName";
+    public static final String CLUSTER_NAME   = "clusterName";
 
-    private NotificationInterface notificationInterface = NotificationProvider.get();
+    private final NotificationInterface notificationInterface = NotificationProvider.get();
 
     @BeforeClass
     public void setUp() throws Exception {
         super.setUp();
+
         createTypeDefinitionsV1();
     }
 
@@ -60,29 +60,33 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
         notificationInterface.close();
     }
 
-    private void sendHookMessage(HookNotificationMessage message) throws NotificationException, InterruptedException {
+    private void sendHookMessage(HookNotification message) throws NotificationException, InterruptedException {
         notificationInterface.send(NotificationInterface.NotificationType.HOOK, message);
+
         sleep(1000);
     }
 
     @Test
     public void testMessageHandleFailureConsumerContinues() throws Exception {
         //send invalid message - update with invalid type
-        sendHookMessage(new HookNotification.EntityPartialUpdateRequest(TEST_USER, randomString(), null, null,
-                new Referenceable(randomString())));
+        sendHookMessage(new EntityPartialUpdateRequest(TEST_USER, randomString(), null, null, new Referenceable(randomString())));
 
         //send valid message
         final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN);
-        String dbName = "db" + randomString();
+        final String        dbName = "db" + randomString();
+
         entity.set(NAME, dbName);
         entity.set(DESCRIPTION, randomString());
         entity.set(QUALIFIED_NAME, dbName);
         entity.set(CLUSTER_NAME, randomString());
+
         sendHookMessage(new EntityCreateRequest(TEST_USER, entity));
+
         waitFor(MAX_WAIT_TIME, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
                 JSONArray results = searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE_BUILTIN, entity.get(NAME)));
+
                 return results.length() == 1;
             }
         });
@@ -91,24 +95,28 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
     @Test
     public void testCreateEntity() throws Exception {
         final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN);
-        String dbName = "db" + randomString();
+        final String        dbName = "db" + randomString();
+
         entity.set(NAME, dbName);
         entity.set(DESCRIPTION, randomString());
         entity.set(QUALIFIED_NAME, dbName);
         entity.set(CLUSTER_NAME, randomString());
 
         sendHookMessage(new EntityCreateRequest(TEST_USER, entity));
+
         waitFor(MAX_WAIT_TIME, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
                 JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, entity.get(QUALIFIED_NAME)));
+
                 return results.length() == 1;
             }
         });
 
         //Assert that user passed in hook message is used in audit
-        Referenceable instance = atlasClientV1.getEntity(DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, (String) entity.get(QUALIFIED_NAME));
-        List<EntityAuditEvent> events = atlasClientV1.getEntityAuditEvents(instance.getId()._getId(), (short) 1);
+        Referenceable          instance = atlasClientV1.getEntity(DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, (String) entity.get(QUALIFIED_NAME));
+        List<EntityAuditEvent> events   = atlasClientV1.getEntityAuditEvents(instance.getId()._getId(), (short) 1);
+
         assertEquals(events.size(), 1);
         assertEquals(events.get(0).getUser(), TEST_USER);
     }
@@ -116,7 +124,8 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
     @Test
     public void testUpdateEntityPartial() throws Exception {
         final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN);
-        final String dbName = "db" + randomString();
+        final String        dbName = "db" + randomString();
+
         entity.set(NAME, dbName);
         entity.set(DESCRIPTION, randomString());
         entity.set(QUALIFIED_NAME, dbName);
@@ -125,25 +134,31 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
         atlasClientV1.createEntity(entity);
 
         final Referenceable newEntity = new Referenceable(DATABASE_TYPE_BUILTIN);
+
         newEntity.set("owner", randomString());
+
         sendHookMessage(new EntityPartialUpdateRequest(TEST_USER, DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName, newEntity));
+
         waitFor(MAX_WAIT_TIME, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
                 Referenceable localEntity = atlasClientV1.getEntity(DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName);
+
                 return (localEntity.get("owner") != null && localEntity.get("owner").equals(newEntity.get("owner")));
             }
         });
 
         //Its partial update and un-set fields are not updated
         Referenceable actualEntity = atlasClientV1.getEntity(DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName);
+
         assertEquals(actualEntity.get(DESCRIPTION), entity.get(DESCRIPTION));
     }
 
     @Test
     public void testUpdatePartialUpdatingQualifiedName() throws Exception {
         final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN);
-        final String dbName = "db" + randomString();
+        final String        dbName = "db" + randomString();
+
         entity.set(NAME, dbName);
         entity.set(DESCRIPTION, randomString());
         entity.set(QUALIFIED_NAME, dbName);
@@ -152,28 +167,32 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
         atlasClientV1.createEntity(entity);
 
         final Referenceable newEntity = new Referenceable(DATABASE_TYPE_BUILTIN);
-        final String newName = "db" + randomString();
+        final String        newName   = "db" + randomString();
+
         newEntity.set(QUALIFIED_NAME, newName);
 
         sendHookMessage(new EntityPartialUpdateRequest(TEST_USER, DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName, newEntity));
+
         waitFor(MAX_WAIT_TIME, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
                 JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, newName));
+
                 return results.length() == 1;
             }
         });
 
         //no entity with the old qualified name
         JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, dbName));
-        assertEquals(results.length(), 0);
 
+        assertEquals(results.length(), 0);
     }
 
     @Test
     public void testDeleteByQualifiedName() throws Exception {
-        Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN);
-        final String dbName = "db" + randomString();
+        final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN);
+        final String        dbName = "db" + randomString();
+
         entity.set(NAME, dbName);
         entity.set(DESCRIPTION, randomString());
         entity.set(QUALIFIED_NAME, dbName);
@@ -182,10 +201,12 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
         final String dbId = atlasClientV1.createEntity(entity).get(0);
 
         sendHookMessage(new EntityDeleteRequest(TEST_USER, DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName));
+
         waitFor(MAX_WAIT_TIME, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
                 Referenceable getEntity = atlasClientV1.getEntity(dbId);
+
                 return getEntity.getId().getState() == Id.EntityState.DELETED;
             }
         });
@@ -193,8 +214,9 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
 
     @Test
     public void testUpdateEntityFullUpdate() throws Exception {
-        Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN);
-        final String dbName = "db" + randomString();
+        final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN);
+        final String        dbName = "db" + randomString();
+
         entity.set(NAME, dbName);
         entity.set(DESCRIPTION, randomString());
         entity.set(QUALIFIED_NAME, dbName);
@@ -203,6 +225,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
         atlasClientV1.createEntity(entity);
 
         final Referenceable newEntity = new Referenceable(DATABASE_TYPE_BUILTIN);
+
         newEntity.set(NAME, randomString());
         newEntity.set(DESCRIPTION, randomString());
         newEntity.set("owner", randomString());
@@ -211,18 +234,19 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
 
         //updating unique attribute
         sendHookMessage(new EntityUpdateRequest(TEST_USER, newEntity));
+
         waitFor(MAX_WAIT_TIME, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
                 JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, newEntity.get(QUALIFIED_NAME)));
+
                 return results.length() == 1;
             }
         });
 
         Referenceable actualEntity = atlasClientV1.getEntity(DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName);
+
         assertEquals(actualEntity.get(DESCRIPTION), newEntity.get(DESCRIPTION));
         assertEquals(actualEntity.get("owner"), newEntity.get("owner"));
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index 68497e0..4ea13c7 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -25,9 +25,10 @@ import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.kafka.AtlasKafkaMessage;
 import org.apache.atlas.kafka.KafkaNotification;
 import org.apache.atlas.kafka.NotificationProvider;
-import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.notification.HookNotification;
 import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.v1.model.notification.HookNotification;
+import org.apache.atlas.v1.model.notification.HookNotificationV1;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.v1.EntityStream;
@@ -41,7 +42,7 @@ import org.testng.Assert;
 import org.testng.annotations.AfterTest;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
-import static org.apache.atlas.v1.model.notification.HookNotification.HookNotificationMessage;
+
 import java.util.List;
 
 import org.apache.atlas.kafka.AtlasKafkaConsumer;
@@ -57,11 +58,11 @@ import static org.testng.Assert.*;
 
 
 public class NotificationHookConsumerKafkaTest {
-
-    public static final String NAME = "name";
-    public static final String DESCRIPTION = "description";
+    public static final String NAME           = "name";
+    public static final String DESCRIPTION    = "description";
     public static final String QUALIFIED_NAME = "qualifiedName";
-    private NotificationInterface notificationInterface = NotificationProvider.get();
+
+    private final NotificationInterface notificationInterface = NotificationProvider.get();
 
 
     @Mock
@@ -81,10 +82,14 @@ public class NotificationHookConsumerKafkaTest {
     @BeforeTest
     public void setup() throws AtlasException, InterruptedException, AtlasBaseException {
         MockitoAnnotations.initMocks(this);
-        AtlasType mockType = mock(AtlasType.class);
+
+        AtlasType                mockType   = mock(AtlasType.class);
+        AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntitiesWithExtInfo.class);
+
         when(typeRegistry.getType(anyString())).thenReturn(mockType);
-        AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntity.AtlasEntitiesWithExtInfo.class);
+
         when(instanceConverter.toAtlasEntities(anyList())).thenReturn(mockEntity);
+
         kafkaNotification = startKafkaServer();
     }
 
@@ -97,19 +102,20 @@ public class NotificationHookConsumerKafkaTest {
     @Test
     public void testConsumerConsumesNewMessageWithAutoCommitDisabled() throws AtlasException, InterruptedException, AtlasBaseException {
         try {
-            produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity()));
+            produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity()));
 
-            NotificationConsumer<HookNotificationMessage> consumer = createNewConsumer(kafkaNotification, false);
-            NotificationHookConsumer notificationHookConsumer =
-                    new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
-            NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
+            NotificationConsumer<HookNotification> consumer                 = createNewConsumer(kafkaNotification, false);
+            NotificationHookConsumer               notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+            NotificationHookConsumer.HookConsumer  hookConsumer             = notificationHookConsumer.new HookConsumer(consumer);
 
             consumeOneMessage(consumer, hookConsumer);
+
             verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean());
 
             // produce another message, and make sure it moves ahead. If commit succeeded, this would work.
-            produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity()));
+            produceMessage(new HookNotificationV1.EntityCreateRequest("test_user2", createEntity()));
             consumeOneMessage(consumer, hookConsumer);
+
             verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), anyBoolean());
             reset(atlasEntityStore);
         }
@@ -121,22 +127,20 @@ public class NotificationHookConsumerKafkaTest {
     @Test(dependsOnMethods = "testConsumerConsumesNewMessageWithAutoCommitDisabled")
     public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled() throws Exception {
         try {
-            produceMessage(new HookNotification.EntityCreateRequest("test_user3", createEntity()));
+            produceMessage(new HookNotificationV1.EntityCreateRequest("test_user3", createEntity()));
 
-            NotificationConsumer<HookNotificationMessage> consumer = createNewConsumer(kafkaNotification, true);
+            NotificationConsumer<HookNotification> consumer = createNewConsumer(kafkaNotification, true);
 
             assertNotNull (consumer);
 
-            NotificationHookConsumer notificationHookConsumer =
-                    new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
-            NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
-
+            NotificationHookConsumer              notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+            NotificationHookConsumer.HookConsumer hookConsumer             = notificationHookConsumer.new HookConsumer(consumer);
 
             consumeOneMessage(consumer, hookConsumer);
             verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean());
 
             // produce another message, but this will not be consumed, as commit code is not executed in hook consumer.
-            produceMessage(new HookNotification.EntityCreateRequest("test_user4", createEntity()));
+            produceMessage(new HookNotificationV1.EntityCreateRequest("test_user4", createEntity()));
 
             consumeOneMessage(consumer, hookConsumer);
             verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), anyBoolean());
@@ -146,18 +150,19 @@ public class NotificationHookConsumerKafkaTest {
         }
     }
 
-    AtlasKafkaConsumer<HookNotificationMessage> createNewConsumer(KafkaNotification kafkaNotification, boolean autoCommitEnabled) {
+    AtlasKafkaConsumer<HookNotification> createNewConsumer(KafkaNotification kafkaNotification, boolean autoCommitEnabled) {
         return (AtlasKafkaConsumer) kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1, autoCommitEnabled).get(0);
     }
 
-    void consumeOneMessage(NotificationConsumer<HookNotificationMessage> consumer,
+    void consumeOneMessage(NotificationConsumer<HookNotification> consumer,
                            NotificationHookConsumer.HookConsumer hookConsumer) throws InterruptedException {
         try {
             long startTime = System.currentTimeMillis(); //fetch starting time
+
             while ((System.currentTimeMillis() - startTime) < 10000) {
-                List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive();
+                List<AtlasKafkaMessage<HookNotification>> messages = consumer.receive();
 
-                for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) {
+                for (AtlasKafkaMessage<HookNotification> msg : messages) {
                     hookConsumer.handleMessage(msg);
                 }
 
@@ -172,19 +177,25 @@ public class NotificationHookConsumerKafkaTest {
 
     Referenceable createEntity() {
         final Referenceable entity = new Referenceable(AtlasClient.DATA_SET_SUPER_TYPE);
+
         entity.set(NAME, "db" + randomString());
         entity.set(DESCRIPTION, randomString());
         entity.set(QUALIFIED_NAME, randomString());
+
         return entity;
     }
 
     KafkaNotification startKafkaServer() throws AtlasException, InterruptedException {
         Configuration applicationProperties = ApplicationProperties.get();
+
         applicationProperties.setProperty("atlas.kafka.data", "target/" + RandomStringUtils.randomAlphanumeric(5));
 
         kafkaNotification = new KafkaNotification(applicationProperties);
+
         kafkaNotification.start();
+
         Thread.sleep(2000);
+
         return kafkaNotification;
     }
 
@@ -192,8 +203,7 @@ public class NotificationHookConsumerKafkaTest {
         return RandomStringUtils.randomAlphanumeric(10);
     }
 
-    private void produceMessage(HookNotificationMessage message) throws NotificationException {
+    private void produceMessage(HookNotification message) throws NotificationException {
         kafkaNotification.send(NotificationInterface.NotificationType.HOOK, message);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index 2d3d5ba..f8bd9a1 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -22,10 +22,12 @@ import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.ha.HAConfiguration;
 import org.apache.atlas.kafka.AtlasKafkaMessage;
-import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.notification.HookNotification.HookNotificationType;
+import org.apache.atlas.notification.NotificationInterface.NotificationType;
 import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.v1.model.notification.HookNotification;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.v1.EntityStream;
@@ -43,6 +45,7 @@ import org.testng.annotations.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
@@ -75,20 +78,24 @@ public class NotificationHookConsumerTest {
     @BeforeMethod
     public void setup() throws AtlasBaseException {
         MockitoAnnotations.initMocks(this);
-        AtlasType mockType = mock(AtlasType.class);
+
+        AtlasType                mockType   = mock(AtlasType.class);
+        AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntitiesWithExtInfo.class);
+
         when(typeRegistry.getType(anyString())).thenReturn(mockType);
-        AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntity.AtlasEntitiesWithExtInfo.class);
         when(instanceConverter.toAtlasEntities(anyList())).thenReturn(mockEntity);
+
         EntityMutationResponse mutationResponse = mock(EntityMutationResponse.class);
+
         when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenReturn(mutationResponse);
     }
 
     @Test
     public void testConsumerCanProceedIfServerIsReady() throws Exception {
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
-        NotificationHookConsumer.HookConsumer hookConsumer =
-                notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
-        NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
+        NotificationHookConsumer              notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+        NotificationHookConsumer.HookConsumer hookConsumer             = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
+        NotificationHookConsumer.Timer        timer                    = mock(NotificationHookConsumer.Timer.class);
+
         when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
 
         assertTrue(hookConsumer.serverAvailable(timer));
@@ -98,10 +105,9 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws Exception {
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
-        NotificationHookConsumer.HookConsumer hookConsumer =
-                notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
-        NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
+        NotificationHookConsumer              notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+        NotificationHookConsumer.HookConsumer hookConsumer             = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
+        NotificationHookConsumer.Timer        timer                    = mock(NotificationHookConsumer.Timer.class);
 
         when(serviceState.getState())
                 .thenReturn(ServiceState.ServiceStateValue.PASSIVE)
@@ -116,35 +122,30 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException, AtlasException {
-        NotificationHookConsumer notificationHookConsumer =
-                new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
-        NotificationConsumer consumer = mock(NotificationConsumer.class);
-        NotificationHookConsumer.HookConsumer hookConsumer =
-                notificationHookConsumer.new HookConsumer(consumer);
-        HookNotification.EntityCreateRequest message = mock(HookNotification.EntityCreateRequest.class);
+        NotificationHookConsumer               notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+        NotificationConsumer                   consumer                 = mock(NotificationConsumer.class);
+        NotificationHookConsumer.HookConsumer  hookConsumer             = notificationHookConsumer.new HookConsumer(consumer);
+        EntityCreateRequest                    message                  = mock(EntityCreateRequest.class);
+        Referenceable                          mock                     = mock(Referenceable.class);
+
         when(message.getUser()).thenReturn("user");
-        when(message.getType()).thenReturn(HookNotification.HookNotificationType.ENTITY_CREATE);
-        Referenceable mock = mock(Referenceable.class);
+        when(message.getType()).thenReturn(HookNotificationType.ENTITY_CREATE);
         when(message.getEntities()).thenReturn(Arrays.asList(mock));
 
         hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1));
+
         verify(consumer).commit(any(TopicPartition.class), anyInt());
     }
 
     @Test
     public void testCommitIsNotCalledEvenWhenMessageProcessingFails() throws AtlasServiceException, AtlasException, AtlasBaseException {
-        NotificationHookConsumer notificationHookConsumer =
-                new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
-        NotificationConsumer consumer = mock(NotificationConsumer.class);
-        NotificationHookConsumer.HookConsumer hookConsumer =
-                notificationHookConsumer.new HookConsumer(consumer);
-        HookNotification.EntityCreateRequest message = new HookNotification.EntityCreateRequest("user",
-                new ArrayList<Referenceable>() {
-                    {
-                        add(mock(Referenceable.class));
-                    }
-                });
+        NotificationHookConsumer              notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+        NotificationConsumer                  consumer                 = mock(NotificationConsumer.class);
+        NotificationHookConsumer.HookConsumer hookConsumer             = notificationHookConsumer.new HookConsumer(consumer);
+        EntityCreateRequest                   message                  = new EntityCreateRequest("user", Collections.singletonList(mock(Referenceable.class)));
+
         when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenThrow(new RuntimeException("Simulating exception in processing message"));
+
         hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1));
 
         verifyZeroInteractions(consumer);
@@ -152,10 +153,10 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testConsumerProceedsWithFalseIfInterrupted() throws Exception {
-        NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
-        NotificationHookConsumer.HookConsumer hookConsumer =
-                notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
-        NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
+        NotificationHookConsumer              notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+        NotificationHookConsumer.HookConsumer hookConsumer             = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
+        NotificationHookConsumer.Timer        timer                    = mock(NotificationHookConsumer.Timer.class);
+
         doThrow(new InterruptedException()).when(timer).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS);
         when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
 
@@ -164,58 +165,75 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testConsumersStartedIfHAIsDisabled() throws Exception {
+        List<NotificationConsumer<Object>> consumers = new ArrayList();
+        NotificationConsumer               notificationConsumerMock = mock(NotificationConsumer.class);
+
+        consumers.add(notificationConsumerMock);
+
         when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false);
         when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
-        List<NotificationConsumer<Object>> consumers = new ArrayList();
-        consumers.add(mock(NotificationConsumer.class));
-        when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
-                thenReturn(consumers);
+        when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
+
         NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+
         notificationHookConsumer.startInternal(configuration, executorService);
-        verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1);
+
+        verify(notificationInterface).createConsumers(NotificationType.HOOK, 1);
         verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
     }
 
     @Test
     public void testConsumersAreNotStartedIfHAIsEnabled() throws Exception {
+        List<NotificationConsumer<Object>> consumers = new ArrayList();
+        NotificationConsumer               notificationConsumerMock = mock(NotificationConsumer.class);
+
+        consumers.add(notificationConsumerMock);
+
         when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
         when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
         when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
-        List<NotificationConsumer<Object>> consumers = new ArrayList();
-        consumers.add(mock(NotificationConsumer.class));
-        when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
-                thenReturn(consumers);
+        when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
+
         NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+
         notificationHookConsumer.startInternal(configuration, executorService);
+
         verifyZeroInteractions(notificationInterface);
     }
 
     @Test
     public void testConsumersAreStartedWhenInstanceBecomesActive() throws Exception {
+        List<NotificationConsumer<Object>> consumers = new ArrayList();
+        NotificationConsumer               notificationConsumerMock = mock(NotificationConsumer.class);
+
+        consumers.add(notificationConsumerMock);
+
         when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
         when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
         when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
-        List<NotificationConsumer<Object>> consumers = new ArrayList();
-        consumers.add(mock(NotificationConsumer.class));
-        when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
-                thenReturn(consumers);
+        when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
+
         NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+
         notificationHookConsumer.startInternal(configuration, executorService);
         notificationHookConsumer.instanceIsActive();
-        verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1);
+
+        verify(notificationInterface).createConsumers(NotificationType.HOOK, 1);
         verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
     }
 
     @Test
     public void testConsumersAreStoppedWhenInstanceBecomesPassive() throws Exception {
+        List<NotificationConsumer<Object>> consumers = new ArrayList();
+        NotificationConsumer               notificationConsumerMock = mock(NotificationConsumer.class);
+
+        consumers.add(notificationConsumerMock);
+
         when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
         when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
         when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
-        List<NotificationConsumer<Object>> consumers = new ArrayList();
-        NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class);
-        consumers.add(notificationConsumerMock);
+        when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
 
-        when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).thenReturn(consumers);
         final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
 
         doAnswer(new Answer() {
@@ -223,12 +241,14 @@ public class NotificationHookConsumerTest {
             public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                 notificationHookConsumer.consumers.get(0).start();
                 Thread.sleep(500);
+
                 return null;
             }
         }).when(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
 
         notificationHookConsumer.startInternal(configuration, executorService);
         notificationHookConsumer.instanceIsPassive();
+
         verify(notificationInterface).close();
         verify(executorService).shutdown();
         verify(notificationConsumerMock).wakeup();
@@ -236,18 +256,21 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void consumersStoppedBeforeStarting() throws Exception {
+        List<NotificationConsumer<Object>> consumers                = new ArrayList();
+        NotificationConsumer               notificationConsumerMock = mock(NotificationConsumer.class);
+
+        consumers.add(notificationConsumerMock);
+
         when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
         when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
         when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
-        List<NotificationConsumer<Object>> consumers = new ArrayList();
-        NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class);
-        consumers.add(notificationConsumerMock);
+        when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
 
-        when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).thenReturn(consumers);
         final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
 
         notificationHookConsumer.startInternal(configuration, executorService);
         notificationHookConsumer.instanceIsPassive();
+
         verify(notificationInterface).close();
         verify(executorService).shutdown();
     }
@@ -261,13 +284,16 @@ public class NotificationHookConsumerTest {
             public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                 notificationHookConsumer.consumers.get(0).start();
                 Thread.sleep(1000);
+
                 return null;
             }
         }).when(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
 
         notificationHookConsumer.startInternal(configuration, executorService);
         Thread.sleep(1000);
+
         assertTrue(notificationHookConsumer.consumers.get(0).isAlive());
+
         notificationHookConsumer.consumers.get(0).shutdown();
     }
 
@@ -280,27 +306,32 @@ public class NotificationHookConsumerTest {
             public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                 notificationHookConsumer.consumers.get(0).start();
                 Thread.sleep(500);
+
                 return null;
             }
         }).when(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
 
         notificationHookConsumer.startInternal(configuration, executorService);
         Thread.sleep(500);
+
         notificationHookConsumer.consumers.get(0).shutdown();
         Thread.sleep(500);
+
         assertFalse(notificationHookConsumer.consumers.get(0).isAlive());
     }
 
     private NotificationHookConsumer setupNotificationHookConsumer() throws AtlasException {
+        List<NotificationConsumer<Object>> consumers                = new ArrayList();
+        NotificationConsumer               notificationConsumerMock = mock(NotificationConsumer.class);
+
+        consumers.add(notificationConsumerMock);
+
         when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
         when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
         when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
-        List<NotificationConsumer<Object>> consumers = new ArrayList();
-        NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class);
         when(notificationConsumerMock.receive()).thenThrow(new IllegalStateException());
-        consumers.add(notificationConsumerMock);
+        when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
 
-        when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).thenReturn(consumers);
         return new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
     }
 }


[3/4] atlas git commit: ATLAS-2251: notification module updates (#4)

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
index 3bc4fba..bf6a36c 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
@@ -21,22 +21,20 @@ package org.apache.atlas.hook;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.kafka.NotificationProvider;
+import org.apache.atlas.model.notification.HookNotification;
 import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.atlas.notification.NotificationException;
 import org.apache.atlas.notification.NotificationInterface;
-import org.apache.atlas.v1.model.notification.HookNotification;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
 import org.apache.atlas.security.InMemoryJAASConfiguration;
-import org.apache.atlas.type.AtlasType;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.codehaus.jettison.json.JSONArray;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 
 
@@ -102,9 +100,9 @@ public abstract class AtlasHook {
     protected abstract String getNumberOfRetriesPropertyKey();
 
     protected void notifyEntities(String user, List<Referenceable> entities) {
-        List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>();
-        hookNotificationMessages.add(new HookNotification.EntityCreateRequest(user, entities));
-        notifyEntities(hookNotificationMessages);
+        List<HookNotification> hookNotifications = new ArrayList<>();
+        hookNotifications.add(new EntityCreateRequest(user, entities));
+        notifyEntities(hookNotifications);
     }
 
     /**
@@ -116,12 +114,12 @@ public abstract class AtlasHook {
      * @param messages   hook notification messages
      * @param maxRetries maximum number of retries while sending message to messaging system
      */
-    public static void notifyEntities(List<HookNotification.HookNotificationMessage> messages, int maxRetries) {
+    public static void notifyEntities(List<HookNotification> messages, int maxRetries) {
         notifyEntitiesInternal(messages, maxRetries, notificationInterface, logFailedMessages, failedMessagesLogger);
     }
 
     @VisibleForTesting
-    static void notifyEntitiesInternal(List<HookNotification.HookNotificationMessage> messages, int maxRetries,
+    static void notifyEntitiesInternal(List<HookNotification> messages, int maxRetries,
                                        NotificationInterface notificationInterface,
                                        boolean shouldLogFailedMessages, FailedMessagesLogger logger) {
         if (messages == null || messages.isEmpty()) {
@@ -168,7 +166,7 @@ public abstract class AtlasHook {
      *
      * @param messages hook notification messages
      */
-    protected void notifyEntities(List<HookNotification.HookNotificationMessage> messages) {
+    protected void notifyEntities(List<HookNotification> messages) {
         final int maxRetries = atlasProperties.getInt(getNumberOfRetriesPropertyKey(), 3);
         notifyEntities(messages, maxRetries);
     }

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
index 975967d..6caf7e2 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
@@ -17,16 +17,9 @@
  */
 package org.apache.atlas.notification;
 
-import com.google.gson.reflect.TypeToken;
-import org.apache.atlas.model.notification.AtlasNotificationMessage;
 import org.apache.atlas.notification.entity.EntityMessageDeserializer;
 import org.apache.atlas.notification.hook.HookMessageDeserializer;
-import org.apache.atlas.v1.model.notification.EntityNotification;
-import org.apache.atlas.v1.model.notification.HookNotification;
-import org.codehaus.jackson.type.TypeReference;
-import scala.reflect.internal.Types;
 
-import java.lang.reflect.Type;
 import java.util.List;
 
 /**
@@ -46,19 +39,6 @@ public interface NotificationInterface {
     String PROPERTY_PREFIX = "atlas.notification";
 
     /**
-     * Notification message class types.
-     */
-    Class<HookNotification.HookNotificationMessage> HOOK_NOTIFICATION_CLASS =
-        HookNotification.HookNotificationMessage.class;
-
-    Class<EntityNotification> ENTITY_NOTIFICATION_CLASS = EntityNotification.class;
-
-    /**
-     * Versioned notification message class types.
-     */
-    Type ENTITY_VERSIONED_MESSAGE_TYPE = new TypeToken<AtlasNotificationMessage<EntityNotification>>(){}.getType();
-
-    /**
      * Atlas notification types.
      */
     enum NotificationType {

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java
index f1e1992..fa160cf 100644
--- a/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java
@@ -19,9 +19,9 @@
 package org.apache.atlas.notification.entity;
 
 import org.apache.atlas.model.notification.AtlasNotificationMessage;
+import org.apache.atlas.model.notification.EntityNotification;
 import org.apache.atlas.notification.AbstractMessageDeserializer;
 import org.apache.atlas.notification.AbstractNotification;
-import org.apache.atlas.v1.model.notification.EntityNotification;
 import org.codehaus.jackson.type.TypeReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java
index 6dff821..cab442f 100644
--- a/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java
@@ -19,9 +19,9 @@
 package org.apache.atlas.notification.hook;
 
 import org.apache.atlas.model.notification.AtlasNotificationMessage;
+import org.apache.atlas.model.notification.HookNotification;
 import org.apache.atlas.notification.AbstractMessageDeserializer;
 import org.apache.atlas.notification.AbstractNotification;
-import org.apache.atlas.v1.model.notification.HookNotification.HookNotificationMessage;
 import org.codehaus.jackson.type.TypeReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Hook notification message deserializer.
  */
-public class HookMessageDeserializer extends AbstractMessageDeserializer<HookNotificationMessage> {
+public class HookMessageDeserializer extends AbstractMessageDeserializer<HookNotification> {
 
     /**
      * Logger for hook notification messages.
@@ -45,14 +45,14 @@ public class HookMessageDeserializer extends AbstractMessageDeserializer<HookNot
      * Create a hook notification message deserializer.
      */
     public HookMessageDeserializer() {
-        super(new TypeReference<HookNotificationMessage>() {},
-              new TypeReference<AtlasNotificationMessage<HookNotificationMessage>>() {},
+        super(new TypeReference<HookNotification>() {},
+              new TypeReference<AtlasNotificationMessage<HookNotification>>() {},
               AbstractNotification.CURRENT_MESSAGE_VERSION, NOTIFICATION_LOGGER);
     }
 
     @Override
-    public HookNotificationMessage deserialize(String messageJson) {
-        final HookNotificationMessage ret = super.deserialize(messageJson);
+    public HookNotification deserialize(String messageJson) {
+        final HookNotification ret = super.deserialize(messageJson);
 
         if (ret != null) {
             ret.normalize();

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
index 9ce2a50..0a0620f 100644
--- a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
+++ b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
@@ -18,9 +18,10 @@
 
 package org.apache.atlas.hook;
 
+import org.apache.atlas.model.notification.HookNotification;
 import org.apache.atlas.notification.NotificationException;
 import org.apache.atlas.notification.NotificationInterface;
-import org.apache.atlas.v1.model.notification.HookNotification;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.testng.annotations.BeforeMethod;
@@ -51,41 +52,41 @@ public class AtlasHookTest {
 
     @Test (timeOut = 10000)
     public void testNotifyEntitiesDoesNotHangOnException() throws Exception {
-        List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>();
+        List<HookNotification> hookNotifications = new ArrayList<>();
         doThrow(new NotificationException(new Exception())).when(notificationInterface)
-                .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages);
-        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 0, notificationInterface, false,
+                .send(NotificationInterface.NotificationType.HOOK, hookNotifications);
+        AtlasHook.notifyEntitiesInternal(hookNotifications, 0, notificationInterface, false,
                 failedMessagesLogger);
         // if we've reached here, the method finished OK.
     }
 
     @Test
     public void testNotifyEntitiesRetriesOnException() throws NotificationException {
-        List<HookNotification.HookNotificationMessage> hookNotificationMessages =
-                new ArrayList<HookNotification.HookNotificationMessage>() {{
-                    add(new HookNotification.EntityCreateRequest("user"));
+        List<HookNotification> hookNotifications =
+                new ArrayList<HookNotification>() {{
+                    add(new EntityCreateRequest("user"));
                 }
             };
         doThrow(new NotificationException(new Exception())).when(notificationInterface)
-                .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages);
-        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, false,
+                .send(NotificationInterface.NotificationType.HOOK, hookNotifications);
+        AtlasHook.notifyEntitiesInternal(hookNotifications, 2, notificationInterface, false,
                 failedMessagesLogger);
 
         verify(notificationInterface, times(2)).
-                send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages);
+                send(NotificationInterface.NotificationType.HOOK, hookNotifications);
     }
 
     @Test
     public void testFailedMessageIsLoggedIfRequired() throws NotificationException {
-        List<HookNotification.HookNotificationMessage> hookNotificationMessages =
-                new ArrayList<HookNotification.HookNotificationMessage>() {{
-                    add(new HookNotification.EntityCreateRequest("user"));
+        List<HookNotification> hookNotifications =
+                new ArrayList<HookNotification>() {{
+                    add(new EntityCreateRequest("user"));
                 }
             };
         doThrow(new NotificationException(new Exception(), Arrays.asList("test message")))
                 .when(notificationInterface)
-                .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages);
-        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, true,
+                .send(NotificationInterface.NotificationType.HOOK, hookNotifications);
+        AtlasHook.notifyEntitiesInternal(hookNotifications, 2, notificationInterface, true,
                 failedMessagesLogger);
 
         verify(failedMessagesLogger, times(1)).log("test message");
@@ -93,11 +94,11 @@ public class AtlasHookTest {
 
     @Test
     public void testFailedMessageIsNotLoggedIfNotRequired() throws NotificationException {
-        List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>();
+        List<HookNotification> hookNotifications = new ArrayList<>();
         doThrow(new NotificationException(new Exception(), Arrays.asList("test message")))
                 .when(notificationInterface)
-                .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages);
-        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, false,
+                .send(NotificationInterface.NotificationType.HOOK, hookNotifications);
+        AtlasHook.notifyEntitiesInternal(hookNotifications, 2, notificationInterface, false,
                 failedMessagesLogger);
 
         verifyZeroInteractions(failedMessagesLogger);
@@ -105,15 +106,15 @@ public class AtlasHookTest {
 
     @Test
     public void testAllFailedMessagesAreLogged() throws NotificationException {
-        List<HookNotification.HookNotificationMessage> hookNotificationMessages =
-                new ArrayList<HookNotification.HookNotificationMessage>() {{
-                    add(new HookNotification.EntityCreateRequest("user"));
+        List<HookNotification> hookNotifications =
+                new ArrayList<HookNotification>() {{
+                    add(new EntityCreateRequest("user"));
                 }
             };
         doThrow(new NotificationException(new Exception(), Arrays.asList("test message1", "test message2")))
                 .when(notificationInterface)
-                .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages);
-        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, true,
+                .send(NotificationInterface.NotificationType.HOOK, hookNotifications);
+        AtlasHook.notifyEntitiesInternal(hookNotifications, 2, notificationInterface, true,
                 failedMessagesLogger);
 
         verify(failedMessagesLogger, times(1)).log("test message1");
@@ -122,10 +123,10 @@ public class AtlasHookTest {
 
     @Test
     public void testFailedMessageIsNotLoggedIfNotANotificationException() throws Exception {
-        List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>();
+        List<HookNotification> hookNotifications = new ArrayList<>();
         doThrow(new RuntimeException("test message")).when(notificationInterface)
-                .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages);
-        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, true,
+                .send(NotificationInterface.NotificationType.HOOK, hookNotifications);
+        AtlasHook.notifyEntitiesInternal(hookNotifications, 2, notificationInterface, true,
                 failedMessagesLogger);
 
         verifyZeroInteractions(failedMessagesLogger);

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
index f1fc741..2e8abd7 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
@@ -18,13 +18,14 @@
 
 package org.apache.atlas.kafka;
 
-import kafka.message.MessageAndMetadata;
+import org.apache.atlas.model.notification.HookNotification;
 import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.atlas.v1.model.instance.Struct;
-import org.apache.atlas.notification.*;
+import org.apache.atlas.notification.IncompatibleVersionException;
+import org.apache.atlas.notification.NotificationInterface.NotificationType;
 import org.apache.atlas.model.notification.AtlasNotificationMessage;
 import org.apache.atlas.notification.entity.EntityNotificationTest;
-import org.apache.atlas.v1.model.notification.HookNotification;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityUpdateRequest;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.model.notification.MessageVersion;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -32,21 +33,15 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
-import org.codehaus.jettison.json.JSONException;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import java.util.Collections;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Map;
 
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -56,7 +51,6 @@ import static org.testng.Assert.*;
  * KafkaConsumer tests.
  */
 public class KafkaConsumerTest {
-
     private static final String TRAIT_NAME = "MyTrait";
 
 
@@ -71,88 +65,62 @@ public class KafkaConsumerTest {
 
     @Test
     public void testReceive() throws Exception {
-
-
-        MessageAndMetadata<String, String> messageAndMetadata = mock(MessageAndMetadata.class);
-
-        Referenceable entity = getEntity(TRAIT_NAME);
-
-        HookNotification.EntityUpdateRequest message =
-            new HookNotification.EntityUpdateRequest("user1", entity);
-
-        String json = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), message));
-
-        kafkaConsumer.assign(Arrays.asList(new TopicPartition("ATLAS_HOOK", 0)));
-        List<ConsumerRecord> klist = new ArrayList<>();
-        klist.add(new ConsumerRecord<String, String>("ATLAS_HOOK",
-                0, 0L, "mykey", json));
-
-        TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
-        Map mp = new HashMap();
-        mp.put(tp,klist);
-        ConsumerRecords records = new ConsumerRecords(mp);
-
+        Referenceable                        entity  = getEntity(TRAIT_NAME);
+        EntityUpdateRequest                  message = new EntityUpdateRequest("user1", entity);
+        String                               json    = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), message));
+        TopicPartition                       tp      = new TopicPartition("ATLAS_HOOK", 0);
+        List<ConsumerRecord<String, String>> klist   = Collections.singletonList(new ConsumerRecord<>("ATLAS_HOOK", 0, 0L, "mykey", json));
+        Map                                  mp      = Collections.singletonMap(tp, klist);
+        ConsumerRecords                      records = new ConsumerRecords(mp);
 
         when(kafkaConsumer.poll(100)).thenReturn(records);
-        when(messageAndMetadata.message()).thenReturn(json);
 
+        kafkaConsumer.assign(Collections.singletonList(tp));
+
+        AtlasKafkaConsumer                        consumer    = new AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer, false, 100L);
+        List<AtlasKafkaMessage<HookNotification>> messageList = consumer.receive();
 
-        AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK, kafkaConsumer, false, 100L);
-        List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive();
         assertTrue(messageList.size() > 0);
 
-        HookNotification.HookNotificationMessage consumedMessage  = messageList.get(0).getMessage();
+        HookNotification consumedMessage  = messageList.get(0).getMessage();
 
         assertMessagesEqual(message, consumedMessage, entity);
-
     }
 
     @Test
     public void testNextVersionMismatch() throws Exception {
+        Referenceable                        entity  = getEntity(TRAIT_NAME);
+        EntityUpdateRequest                  message = new EntityUpdateRequest("user1", entity);
+        String                               json    = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("2.0.0"), message));
+        TopicPartition                       tp      = new TopicPartition("ATLAS_HOOK",0);
+        List<ConsumerRecord<String, String>> klist   = Collections.singletonList(new ConsumerRecord<>("ATLAS_HOOK", 0, 0L, "mykey", json));
+        Map                                  mp      = Collections.singletonMap(tp,klist);
+        ConsumerRecords                      records = new ConsumerRecords(mp);
 
-        MessageAndMetadata<String, String> messageAndMetadata = mock(MessageAndMetadata.class);
-
-        Referenceable entity = getEntity(TRAIT_NAME);
-
-        HookNotification.EntityUpdateRequest message =
-            new HookNotification.EntityUpdateRequest("user1", entity);
-
-        String json = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("2.0.0"), message));
-
-        kafkaConsumer.assign(Arrays.asList(new TopicPartition("ATLAS_HOOK", 0)));
-        List<ConsumerRecord> klist = new ArrayList<>();
-        klist.add(new ConsumerRecord<String, String>("ATLAS_HOOK",
-                0, 0L, "mykey", json));
-
-        TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
-        Map mp = new HashMap();
-        mp.put(tp,klist);
-        ConsumerRecords records = new ConsumerRecords(mp);
+        kafkaConsumer.assign(Collections.singletonList(tp));
 
         when(kafkaConsumer.poll(100L)).thenReturn(records);
-        when(messageAndMetadata.message()).thenReturn(json);
 
-        AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK, kafkaConsumer ,false, 100L);
+        AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer ,false, 100L);
+
         try {
-            List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive();
+            List<AtlasKafkaMessage<HookNotification>> messageList = consumer.receive();
+
             assertTrue(messageList.size() > 0);
 
-            HookNotification.HookNotificationMessage consumedMessage  = messageList.get(0).getMessage();
+            HookNotification consumedMessage  = messageList.get(0).getMessage();
 
             fail("Expected VersionMismatchException!");
         } catch (IncompatibleVersionException e) {
             e.printStackTrace();
         }
-
   }
 
 
     @Test
     public void testCommitIsCalledIfAutoCommitDisabled() {
-
-        TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
-
-        AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK, kafkaConsumer, false, 100L);
+        TopicPartition     tp       = new TopicPartition("ATLAS_HOOK",0);
+        AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer, false, 100L);
 
         consumer.commit(tp, 1);
 
@@ -161,10 +129,8 @@ public class KafkaConsumerTest {
 
     @Test
     public void testCommitIsNotCalledIfAutoCommitEnabled() {
-
-        TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
-
-        AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK, kafkaConsumer, true , 100L);
+        TopicPartition     tp       = new TopicPartition("ATLAS_HOOK",0);
+        AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer, true , 100L);
 
         consumer.commit(tp, 1);
 
@@ -172,26 +138,21 @@ public class KafkaConsumerTest {
     }
 
     private Referenceable getEntity(String traitName) {
-        Referenceable entity = EntityNotificationTest.getEntity("id");
-        List<Struct> traitInfo = new LinkedList<>();
-        Struct trait = new Struct(traitName, Collections.<String, Object>emptyMap());
-        traitInfo.add(trait);
-        return entity;
+        return EntityNotificationTest.getEntity("id", new Struct(traitName, Collections.<String, Object>emptyMap()));
     }
 
-    private void assertMessagesEqual(HookNotification.EntityUpdateRequest message,
-                                     HookNotification.HookNotificationMessage consumedMessage,
-                                     Referenceable entity) throws JSONException {
-
+    private void assertMessagesEqual(EntityUpdateRequest message,
+                                     HookNotification    consumedMessage,
+                                     Referenceable       entity) {
         assertEquals(consumedMessage.getType(), message.getType());
         assertEquals(consumedMessage.getUser(), message.getUser());
 
-        assertTrue(consumedMessage instanceof HookNotification.EntityUpdateRequest);
+        assertTrue(consumedMessage instanceof EntityUpdateRequest);
 
-        HookNotification.EntityUpdateRequest deserializedEntityUpdateRequest =
-            (HookNotification.EntityUpdateRequest) consumedMessage;
+        EntityUpdateRequest deserializedEntityUpdateRequest = (EntityUpdateRequest) consumedMessage;
 
         Referenceable deserializedEntity = deserializedEntityUpdateRequest.getEntities().get(0);
+
         assertEquals(deserializedEntity.getId(), entity.getId());
         assertEquals(deserializedEntity.getTypeName(), entity.getTypeName());
         assertEquals(deserializedEntity.getTraits(), entity.getTraits());

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
index fe019e1..e0655f3 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
@@ -22,25 +22,25 @@ import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.atlas.notification.NotificationConsumer;
 import org.apache.atlas.notification.NotificationInterface;
-import org.apache.atlas.v1.model.notification.HookNotification;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.RandomStringUtils;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
-import static org.apache.atlas.v1.model.notification.HookNotification.HookNotificationMessage;
+import org.apache.atlas.model.notification.HookNotification;
 
 import java.util.List;
 
 import static org.testng.Assert.assertEquals;
 
 public class KafkaNotificationTest {
-
     private KafkaNotification kafkaNotification;
 
     @BeforeClass
     public void setup() throws Exception {
         Configuration properties = ApplicationProperties.get();
+
         properties.setProperty("atlas.kafka.data", "target/" + RandomStringUtils.randomAlphanumeric(5));
 
         kafkaNotification = new KafkaNotification(properties);
@@ -55,29 +55,27 @@ public class KafkaNotificationTest {
 
     @Test
     public void testReceiveKafkaMessages() throws Exception {
-        kafkaNotification.send(NotificationInterface.NotificationType.HOOK,
-                new HookNotification.EntityCreateRequest("u1", new Referenceable("type")));
-        kafkaNotification.send(NotificationInterface.NotificationType.HOOK,
-                new HookNotification.EntityCreateRequest("u2", new Referenceable("type")));
-        kafkaNotification.send(NotificationInterface.NotificationType.HOOK,
-                new HookNotification.EntityCreateRequest("u3", new Referenceable("type")));
-        kafkaNotification.send(NotificationInterface.NotificationType.HOOK,
-                new HookNotification.EntityCreateRequest("u4", new Referenceable("type")));
-
-        NotificationConsumer<Object> consumer =
-                kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0);
-        List<AtlasKafkaMessage<Object>> messages = null ;
-        long startTime = System.currentTimeMillis(); //fetch starting time
+        kafkaNotification.send(NotificationInterface.NotificationType.HOOK, new EntityCreateRequest("u1", new Referenceable("type")));
+        kafkaNotification.send(NotificationInterface.NotificationType.HOOK, new EntityCreateRequest("u2", new Referenceable("type")));
+        kafkaNotification.send(NotificationInterface.NotificationType.HOOK, new EntityCreateRequest("u3", new Referenceable("type")));
+        kafkaNotification.send(NotificationInterface.NotificationType.HOOK, new EntityCreateRequest("u4", new Referenceable("type")));
+
+        NotificationConsumer<Object>    consumer  = kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0);
+        List<AtlasKafkaMessage<Object>> messages  = null ;
+        long                            startTime = System.currentTimeMillis(); //fetch starting time
+
         while ((System.currentTimeMillis() - startTime) < 10000) {
              messages = consumer.receive();
+
             if (messages.size() > 0) {
                 break;
             }
         }
 
-        int i=1;
+        int i = 1;
         for (AtlasKafkaMessage<Object> msg :  messages){
-            HookNotification.HookNotificationMessage message =  (HookNotificationMessage) msg.getMessage();
+            HookNotification message =  (HookNotification) msg.getMessage();
+
             assertEquals(message.getUser(), "u"+i++);
         }
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
index 98d7d2c..94cb70d 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
@@ -19,13 +19,14 @@
 package org.apache.atlas.notification;
 
 import org.apache.atlas.AtlasException;
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.model.notification.HookNotification.HookNotificationType;
+import org.apache.atlas.notification.NotificationInterface.NotificationType;
 import org.apache.atlas.type.AtlasType;
-import org.apache.atlas.v1.model.notification.HookNotification;
 import org.apache.commons.configuration.Configuration;
-import org.testng.annotations.Test;
 
 import java.util.ArrayList;
-import java.util.LinkedList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
@@ -37,62 +38,56 @@ import static org.testng.Assert.*;
  */
 public class AbstractNotificationTest {
 
-    @Test
+    @org.testng.annotations.Test
     public void testSend() throws Exception {
-        Configuration configuration = mock(Configuration.class);
+        Configuration    configuration = mock(Configuration.class);
+        TestNotification notification  = new TestNotification(configuration);
+        Test             message1      = new Test(HookNotificationType.ENTITY_CREATE, "user1");
+        Test             message2      = new Test(HookNotificationType.TYPE_CREATE, "user1");
+        Test             message3      = new Test(HookNotificationType.ENTITY_FULL_UPDATE, "user1");
+        List<String>     messageJson   = new ArrayList<>();
 
-        TestNotification notification = new TestNotification(configuration);
-
-        TestMessage message1 = new TestMessage(HookNotification.HookNotificationType.ENTITY_CREATE, "user1");
-        TestMessage message2 = new TestMessage(HookNotification.HookNotificationType.TYPE_CREATE, "user1");
-        TestMessage message3 = new TestMessage(HookNotification.HookNotificationType.ENTITY_FULL_UPDATE, "user1");
-
-        List<String> messageJson = new ArrayList<>();
         AbstractNotification.createNotificationMessages(message1, messageJson);
         AbstractNotification.createNotificationMessages(message2, messageJson);
         AbstractNotification.createNotificationMessages(message3, messageJson);
 
-        notification.send(NotificationInterface.NotificationType.HOOK, message1, message2, message3);
+        notification.send(NotificationType.HOOK, message1, message2, message3);
 
-        assertEquals(NotificationInterface.NotificationType.HOOK, notification.type);
+        assertEquals(NotificationType.HOOK, notification.type);
         assertEquals(3, notification.messages.size());
+
         for (int i = 0; i < notification.messages.size(); i++) {
             assertEqualsMessageJson(notification.messages.get(i), messageJson.get(i));
         }
     }
 
-    @Test
+    @org.testng.annotations.Test
     public void testSend2() throws Exception {
-        Configuration configuration = mock(Configuration.class);
-
-        TestNotification notification = new TestNotification(configuration);
+        Configuration    configuration = mock(Configuration.class);
+        TestNotification notification  = new TestNotification(configuration);
+        Test             message1      = new Test(HookNotificationType.ENTITY_CREATE, "user1");
+        Test             message2      = new Test(HookNotificationType.TYPE_CREATE, "user1");
+        Test             message3      = new Test(HookNotificationType.ENTITY_FULL_UPDATE, "user1");
+        List<Test>       messages      = Arrays.asList(message1, message2, message3);
+        List<String>     messageJson   = new ArrayList<>();
 
-        TestMessage message1 = new TestMessage(HookNotification.HookNotificationType.ENTITY_CREATE, "user1");
-        TestMessage message2 = new TestMessage(HookNotification.HookNotificationType.TYPE_CREATE, "user1");
-        TestMessage message3 = new TestMessage(HookNotification.HookNotificationType.ENTITY_FULL_UPDATE, "user1");
-
-        List<TestMessage> messages = new LinkedList<>();
-        messages.add(message1);
-        messages.add(message2);
-        messages.add(message3);
-
-        List<String> messageJson = new ArrayList<>();
         AbstractNotification.createNotificationMessages(message1, messageJson);
         AbstractNotification.createNotificationMessages(message2, messageJson);
         AbstractNotification.createNotificationMessages(message3, messageJson);
 
         notification.send(NotificationInterface.NotificationType.HOOK, messages);
 
-        assertEquals(notification.type, NotificationInterface.NotificationType.HOOK);
+        assertEquals(notification.type, NotificationType.HOOK);
         assertEquals(notification.messages.size(), messageJson.size());
+
         for (int i = 0; i < notification.messages.size(); i++) {
             assertEqualsMessageJson(notification.messages.get(i), messageJson.get(i));
         }
     }
 
-    public static class TestMessage extends HookNotification.HookNotificationMessage {
+    public static class Test extends HookNotification {
 
-        public TestMessage(HookNotification.HookNotificationType type, String user) {
+        public Test(HookNotificationType type, String user) {
             super(type, user);
         }
     }
@@ -120,7 +115,7 @@ public class AbstractNotificationTest {
         protected void sendInternal(NotificationType notificationType, List<String> notificationMessages)
             throws NotificationException {
 
-            type = notificationType;
+            type     = notificationType;
             messages = notificationMessages;
         }
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java b/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java
deleted file mode 100644
index ddb63b5..0000000
--- a/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.notification.entity;
-
-import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.v1.model.instance.Struct;
-import org.apache.atlas.notification.AbstractNotification;
-import org.apache.atlas.v1.model.notification.EntityNotification;
-import org.testng.annotations.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-
-import static org.testng.Assert.assertEquals;
-
-/**
- * EntityMessageDeserializer tests.
- */
-public class EntityMessageDeserializerTest {
-    private EntityMessageDeserializer deserializer = new EntityMessageDeserializer();
-
-    @Test
-    public void testDeserialize() throws Exception {
-        Referenceable entity = EntityNotificationTest.getEntity("id");
-        String traitName = "MyTrait";
-        List<Struct> traitInfo = new LinkedList<>();
-        Struct trait = new Struct(traitName, Collections.<String, Object>emptyMap());
-        traitInfo.add(trait);
-
-        EntityNotification notification =
-            new EntityNotification(entity, EntityNotification.OperationType.TRAIT_ADD, traitInfo);
-
-        List<String> jsonMsgList = new ArrayList<>();
-
-        AbstractNotification.createNotificationMessages(notification, jsonMsgList);
-
-        EntityNotification deserializedNotification = null;
-
-        for (String jsonMsg : jsonMsgList) {
-            deserializedNotification =  deserializer.deserialize(jsonMsg);
-
-            if (deserializedNotification != null) {
-                break;
-            }
-        }
-
-        assertEquals(deserializedNotification.getOperationType(), notification.getOperationType());
-        assertEquals(deserializedNotification.getEntity().getId(), notification.getEntity().getId());
-        assertEquals(deserializedNotification.getEntity().getTypeName(), notification.getEntity().getTypeName());
-        assertEquals(deserializedNotification.getEntity().getTraits(), notification.getEntity().getTraits());
-        assertEquals(deserializedNotification.getEntity().getTrait(traitName),
-            notification.getEntity().getTrait(traitName));
-    }
-}

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationDeserializerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationDeserializerTest.java b/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationDeserializerTest.java
new file mode 100644
index 0000000..13eafb6
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationDeserializerTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification.entity;
+
+import org.apache.atlas.model.notification.EntityNotification;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.instance.Struct;
+import org.apache.atlas.notification.AbstractNotification;
+import org.apache.atlas.v1.model.notification.EntityNotificationV1;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+/**
+ * EntityMessageDeserializer tests.
+ */
+public class EntityNotificationDeserializerTest {
+    private EntityMessageDeserializer deserializer = new EntityMessageDeserializer();
+
+    @Test
+    public void testDeserialize() throws Exception {
+        Referenceable        entity       = EntityNotificationTest.getEntity("id");
+        String               traitName    = "MyTrait";
+        List<Struct>         traits       = Collections.singletonList(new Struct(traitName, Collections.<String, Object>emptyMap()));
+        EntityNotificationV1 notification = new EntityNotificationV1(entity, EntityNotificationV1.OperationType.TRAIT_ADD, traits);
+        List<String>         jsonMsgList  = new ArrayList<>();
+
+        AbstractNotification.createNotificationMessages(notification, jsonMsgList);
+
+        EntityNotification deserializedNotification = null;
+
+        for (String jsonMsg : jsonMsgList) {
+            deserializedNotification =  deserializer.deserialize(jsonMsg);
+
+            if (deserializedNotification != null) {
+                break;
+            }
+        }
+
+        assertTrue(deserializedNotification instanceof EntityNotificationV1);
+
+        EntityNotificationV1 entityNotificationV1 = (EntityNotificationV1)deserializedNotification;
+
+        assertEquals(entityNotificationV1.getOperationType(), notification.getOperationType());
+        assertEquals(entityNotificationV1.getEntity().getId(), notification.getEntity().getId());
+        assertEquals(entityNotificationV1.getEntity().getTypeName(), notification.getEntity().getTypeName());
+        assertEquals(entityNotificationV1.getEntity().getTraits(), notification.getEntity().getTraits());
+        assertEquals(entityNotificationV1.getEntity().getTrait(traitName), notification.getEntity().getTrait(traitName));
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationTest.java
index cedfc01..232b21d 100644
--- a/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationTest.java
@@ -22,7 +22,8 @@ import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.atlas.v1.model.instance.Struct;
 import org.apache.atlas.type.AtlasClassificationType;
 import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.atlas.v1.model.notification.EntityNotification;
+import org.apache.atlas.v1.model.notification.EntityNotificationV1;
+import org.apache.atlas.v1.model.notification.EntityNotificationV1.OperationType;
 import org.testng.annotations.Test;
 
 import java.util.Collections;
@@ -38,62 +39,48 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
 /**
- * EntityNotification tests.
+ * EntityNotificationV1 tests.
  */
 public class EntityNotificationTest {
 
     @Test
     public void testGetEntity() throws Exception {
-        Referenceable entity = getEntity("id");
-
-        EntityNotification entityNotification =
-            new EntityNotification(entity, EntityNotification.OperationType.ENTITY_CREATE,
-                Collections.<Struct>emptyList());
+        Referenceable        entity             = getEntity("id");
+        EntityNotificationV1 entityNotification = new EntityNotificationV1(entity, OperationType.ENTITY_CREATE, Collections.<Struct>emptyList());
 
         assertEquals(entity, entityNotification.getEntity());
     }
 
     @Test
     public void testGetOperationType() throws Exception {
-        Referenceable entity = getEntity("id");
-
-        EntityNotification entityNotification =
-            new EntityNotification(entity, EntityNotification.OperationType.ENTITY_CREATE,
-                Collections.<Struct>emptyList());
+        Referenceable        entity             = getEntity("id");
+        EntityNotificationV1 entityNotification = new EntityNotificationV1(entity, OperationType.ENTITY_CREATE, Collections.<Struct>emptyList());
 
-        assertEquals(EntityNotification.OperationType.ENTITY_CREATE, entityNotification.getOperationType());
+        assertEquals(EntityNotificationV1.OperationType.ENTITY_CREATE, entityNotification.getOperationType());
     }
 
     @Test
     public void testGetAllTraits() throws Exception {
-        Referenceable entity = getEntity("id");
-        String traitName = "MyTrait";
-        List<Struct> traitInfo = new LinkedList<>();
-        Struct trait = new Struct(traitName, Collections.<String, Object>emptyMap());
-        traitInfo.add(trait);
+        Referenceable entity    = getEntity("id");
+        String        traitName = "MyTrait";
+        List<Struct>  traitInfo = Collections.singletonList(new Struct(traitName, Collections.<String, Object>emptyMap()));
 
-        EntityNotification entityNotification =
-            new EntityNotification(entity, EntityNotification.OperationType.TRAIT_ADD, traitInfo);
+        EntityNotificationV1 entityNotification = new EntityNotificationV1(entity, OperationType.TRAIT_ADD, traitInfo);
 
         assertEquals(traitInfo, entityNotification.getAllTraits());
     }
 
     @Test
     public void testGetAllTraitsSuperTraits() throws Exception {
-        AtlasTypeRegistry typeRegistry = mock(AtlasTypeRegistry.class);
-
-        String traitName = "MyTrait";
-        Struct myTrait = new Struct(traitName);
-
-        String superTraitName = "MySuperTrait";
-
-        AtlasClassificationType traitType = mock(AtlasClassificationType.class);
-        Set<String> superTypeNames = Collections.singleton(superTraitName);
-
-        AtlasClassificationType superTraitType = mock(AtlasClassificationType.class);
-        Set<String> superSuperTypeNames = Collections.emptySet();
-
-        Referenceable entity = getEntity("id", myTrait);
+        AtlasTypeRegistry       typeRegistry        = mock(AtlasTypeRegistry.class);
+        String                  traitName           = "MyTrait";
+        Struct                  myTrait             = new Struct(traitName);
+        String                  superTraitName      = "MySuperTrait";
+        AtlasClassificationType traitType           = mock(AtlasClassificationType.class);
+        Set<String>             superTypeNames      = Collections.singleton(superTraitName);
+        AtlasClassificationType superTraitType      = mock(AtlasClassificationType.class);
+        Set<String>             superSuperTypeNames = Collections.emptySet();
+        Referenceable           entity              = getEntity("id", myTrait);
 
         when(typeRegistry.getClassificationTypeByName(traitName)).thenReturn(traitType);
         when(typeRegistry.getClassificationTypeByName(superTraitName)).thenReturn(superTraitType);
@@ -101,8 +88,7 @@ public class EntityNotificationTest {
         when(traitType.getAllSuperTypes()).thenReturn(superTypeNames);
         when(superTraitType.getAllSuperTypes()).thenReturn(superSuperTypeNames);
 
-        EntityNotification entityNotification =
-            new EntityNotification(entity, EntityNotification.OperationType.TRAIT_ADD, typeRegistry);
+        EntityNotificationV1 entityNotification = new EntityNotificationV1(entity, OperationType.TRAIT_ADD, typeRegistry);
 
         List<Struct> allTraits = entityNotification.getAllTraits();
 
@@ -110,32 +96,25 @@ public class EntityNotificationTest {
 
         for (Struct trait : allTraits) {
             String typeName = trait.getTypeName();
+
             assertTrue(typeName.equals(traitName) || typeName.equals(superTraitName));
         }
     }
 
     @Test
     public void testEquals() throws Exception {
-        Referenceable entity = getEntity("id");
-
-        EntityNotification entityNotification2 =
-            new EntityNotification(entity, EntityNotification.OperationType.ENTITY_CREATE,
-                Collections.<Struct>emptyList());
-
-        EntityNotification entityNotification =
-            new EntityNotification(entity, EntityNotification.OperationType.ENTITY_CREATE,
-                Collections.<Struct>emptyList());
+        Referenceable        entity              = getEntity("id");
+        EntityNotificationV1 entityNotification2 = new EntityNotificationV1(entity, OperationType.ENTITY_CREATE, Collections.<Struct>emptyList());
+        EntityNotificationV1 entityNotification  = new EntityNotificationV1(entity, OperationType.ENTITY_CREATE, Collections.<Struct>emptyList());
 
         assertTrue(entityNotification.equals(entityNotification2));
         assertTrue(entityNotification2.equals(entityNotification));
     }
 
     public static Referenceable getEntity(String id, Struct... traits) {
-        String typeName = "typeName";
-        Map<String, Object> values = new HashMap<>();
-
-        List<String> traitNames = new LinkedList<>();
-        Map<String, Struct> traitMap = new HashMap<>();
+        String              typeName   = "typeName";
+        List<String>        traitNames = new LinkedList<>();
+        Map<String, Struct> traitMap   = new HashMap<>();
 
         for (Struct trait : traits) {
             String traitName = trait.getTypeName();
@@ -143,6 +122,7 @@ public class EntityNotificationTest {
             traitNames.add(traitName);
             traitMap.put(traitName, trait);
         }
-        return new Referenceable(id, typeName, values, traitNames, traitMap);
+
+        return new Referenceable(id, typeName, new HashMap<String, Object>(), traitNames, traitMap);
     }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/notification/src/test/java/org/apache/atlas/notification/hook/HookMessageDeserializerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/hook/HookMessageDeserializerTest.java b/notification/src/test/java/org/apache/atlas/notification/hook/HookMessageDeserializerTest.java
deleted file mode 100644
index 17facf8..0000000
--- a/notification/src/test/java/org/apache/atlas/notification/hook/HookMessageDeserializerTest.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.notification.hook;
-
-import org.apache.atlas.notification.entity.EntityNotificationTest;
-import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.v1.model.instance.Struct;
-import org.apache.atlas.notification.AbstractNotification;
-import org.apache.atlas.v1.model.notification.HookNotification.EntityUpdateRequest;
-import org.apache.atlas.v1.model.notification.HookNotification.HookNotificationMessage;
-import org.apache.atlas.type.AtlasType;
-import org.apache.commons.lang3.RandomStringUtils;
-import org.testng.annotations.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-
-/**
- * HookMessageDeserializer tests.
- */
-public class HookMessageDeserializerTest {
-    private HookMessageDeserializer deserializer = new HookMessageDeserializer();
-
-    @Test
-    public void testDeserialize() throws Exception {
-        Referenceable       entity  = generateEntityWithTrait();
-        EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
-
-        List<String> jsonMsgList = new ArrayList<>();
-
-        AbstractNotification.createNotificationMessages(message, jsonMsgList);
-
-        HookNotificationMessage deserializedMessage = deserialize(jsonMsgList);
-
-        assertEqualMessage(deserializedMessage, message);
-    }
-
-    // validate deserialization of legacy message, which doesn't use MessageVersion
-    @Test
-    public void testDeserializeLegacyMessage() throws Exception {
-        Referenceable       entity  = generateEntityWithTrait();
-        EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
-
-        String                  jsonMsg             = AtlasType.toV1Json(message);
-        HookNotificationMessage deserializedMessage = deserialize(Collections.singletonList(jsonMsg));
-
-        assertEqualMessage(deserializedMessage, message);
-    }
-
-    @Test
-    public void testDeserializeCompressedMessage() throws Exception {
-        Referenceable       entity  = generateLargeEntityWithTrait();
-        EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
-
-        List<String> jsonMsgList = new ArrayList<>();
-
-        AbstractNotification.createNotificationMessages(message, jsonMsgList);
-
-        assertTrue(jsonMsgList.size() == 1);
-
-        String compressedMsg   = jsonMsgList.get(0);
-        String uncompressedMsg = AtlasType.toV1Json(message);
-
-        assertTrue(compressedMsg.length() < uncompressedMsg.length(), "Compressed message (" + compressedMsg.length() + ") should be shorter than uncompressed message (" + uncompressedMsg.length() + ")");
-
-        HookNotificationMessage deserializedMessage = deserialize(jsonMsgList);
-
-        assertEqualMessage(deserializedMessage, message);
-    }
-
-    @Test
-    public void testDeserializeSplitMessage() throws Exception {
-        Referenceable       entity  = generateVeryLargeEntityWithTrait();
-        EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
-
-        List<String> jsonMsgList = new ArrayList<>();
-
-        AbstractNotification.createNotificationMessages(message, jsonMsgList);
-
-        assertTrue(jsonMsgList.size() > 1);
-
-        HookNotificationMessage deserializedMessage = deserialize(jsonMsgList);
-
-        assertEqualMessage(deserializedMessage, message);
-    }
-
-    private Referenceable generateEntityWithTrait() {
-        Referenceable ret = EntityNotificationTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap()));
-
-        return ret;
-    }
-
-    private HookNotificationMessage deserialize(List<String> jsonMsgList) {
-        HookNotificationMessage deserializedMessage = null;
-
-        for (String jsonMsg : jsonMsgList) {
-            deserializedMessage = deserializer.deserialize(jsonMsg);
-
-            if (deserializedMessage != null) {
-                break;
-            }
-        }
-
-        return deserializedMessage;
-    }
-
-    private void assertEqualMessage(HookNotificationMessage deserializedMessage, EntityUpdateRequest message) throws Exception {
-        assertNotNull(deserializedMessage);
-        assertEquals(deserializedMessage.getType(), message.getType());
-        assertEquals(deserializedMessage.getUser(), message.getUser());
-
-        assertTrue(deserializedMessage instanceof EntityUpdateRequest);
-
-        EntityUpdateRequest deserializedEntityUpdateRequest = (EntityUpdateRequest) deserializedMessage;
-        Referenceable       deserializedEntity              = deserializedEntityUpdateRequest.getEntities().get(0);
-        Referenceable       entity                          = message.getEntities().get(0);
-        String              traitName                       = entity.getTraitNames().get(0);
-
-        assertEquals(deserializedEntity.getId(), entity.getId());
-        assertEquals(deserializedEntity.getTypeName(), entity.getTypeName());
-        assertEquals(deserializedEntity.getTraits(), entity.getTraits());
-        assertEquals(deserializedEntity.getTrait(traitName).hashCode(), entity.getTrait(traitName).hashCode());
-
-    }
-
-    private Referenceable generateLargeEntityWithTrait() {
-        Referenceable ret = EntityNotificationTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap()));
-
-        // add 100 attributes, each with value of size 10k
-        // Json Size=1,027,984; GZipped Size=16,387 ==> will compress, but not split
-        String attrValue = RandomStringUtils.randomAlphanumeric(10 * 1024); // use the same value for all attributes - to aid better compression
-        for (int i = 0; i < 100; i++) {
-            ret.set("attr_" + i, attrValue);
-        }
-
-        return ret;
-    }
-
-    private Referenceable generateVeryLargeEntityWithTrait() {
-        Referenceable ret = EntityNotificationTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap()));
-
-        // add 300 attributes, each with value of size 10k
-        // Json Size=3,082,384; GZipped Size=2,313,357 ==> will compress & split
-        for (int i = 0; i < 300; i++) {
-            ret.set("attr_" + i, RandomStringUtils.randomAlphanumeric(10 * 1024));
-        }
-
-        return ret;
-    }
-}

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationDeserializerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationDeserializerTest.java b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationDeserializerTest.java
new file mode 100644
index 0000000..d048170
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationDeserializerTest.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification.hook;
+
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.notification.entity.EntityNotificationTest;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.instance.Struct;
+import org.apache.atlas.notification.AbstractNotification;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityUpdateRequest;
+import org.apache.atlas.type.AtlasType;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+/**
+ * HookMessageDeserializer tests.
+ */
+public class HookNotificationDeserializerTest {
+    private HookMessageDeserializer deserializer = new HookMessageDeserializer();
+
+    @Test
+    public void testDeserialize() throws Exception {
+        Referenceable       entity      = generateEntityWithTrait();
+        EntityUpdateRequest message     = new EntityUpdateRequest("user1", entity);
+        List<String>        jsonMsgList = new ArrayList<>();
+
+        AbstractNotification.createNotificationMessages(message, jsonMsgList);
+
+        HookNotification deserializedMessage = deserialize(jsonMsgList);
+
+        assertEqualMessage(deserializedMessage, message);
+    }
+
+    // validate deserialization of legacy message, which doesn't use MessageVersion
+    @Test
+    public void testDeserializeLegacyMessage() throws Exception {
+        Referenceable       entity              = generateEntityWithTrait();
+        EntityUpdateRequest message             = new EntityUpdateRequest("user1", entity);
+        String              jsonMsg             = AtlasType.toV1Json(message);
+        HookNotification    deserializedMessage = deserialize(Collections.singletonList(jsonMsg));
+
+        assertEqualMessage(deserializedMessage, message);
+    }
+
+    @Test
+    public void testDeserializeCompressedMessage() throws Exception {
+        Referenceable       entity     = generateLargeEntityWithTrait();
+        EntityUpdateRequest message    = new EntityUpdateRequest("user1", entity);
+        List<String>       jsonMsgList = new ArrayList<>();
+
+        AbstractNotification.createNotificationMessages(message, jsonMsgList);
+
+        assertTrue(jsonMsgList.size() == 1);
+
+        String compressedMsg   = jsonMsgList.get(0);
+        String uncompressedMsg = AtlasType.toV1Json(message);
+
+        assertTrue(compressedMsg.length() < uncompressedMsg.length(), "Compressed message (" + compressedMsg.length() + ") should be shorter than uncompressed message (" + uncompressedMsg.length() + ")");
+
+        HookNotification deserializedMessage = deserialize(jsonMsgList);
+
+        assertEqualMessage(deserializedMessage, message);
+    }
+
+    @Test
+    public void testDeserializeSplitMessage() throws Exception {
+        Referenceable       entity      = generateVeryLargeEntityWithTrait();
+        EntityUpdateRequest message     = new EntityUpdateRequest("user1", entity);
+        List<String>        jsonMsgList = new ArrayList<>();
+
+        AbstractNotification.createNotificationMessages(message, jsonMsgList);
+
+        assertTrue(jsonMsgList.size() > 1);
+
+        HookNotification deserializedMessage = deserialize(jsonMsgList);
+
+        assertEqualMessage(deserializedMessage, message);
+    }
+
+    private Referenceable generateEntityWithTrait() {
+        Referenceable ret = EntityNotificationTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap()));
+
+        return ret;
+    }
+
+    private HookNotification deserialize(List<String> jsonMsgList) {
+        HookNotification deserializedMessage = null;
+
+        for (String jsonMsg : jsonMsgList) {
+            deserializedMessage = deserializer.deserialize(jsonMsg);
+
+            if (deserializedMessage != null) {
+                break;
+            }
+        }
+
+        return deserializedMessage;
+    }
+
+    private void assertEqualMessage(HookNotification deserializedMessage, EntityUpdateRequest message) throws Exception {
+        assertNotNull(deserializedMessage);
+        assertEquals(deserializedMessage.getType(), message.getType());
+        assertEquals(deserializedMessage.getUser(), message.getUser());
+
+        assertTrue(deserializedMessage instanceof EntityUpdateRequest);
+
+        EntityUpdateRequest deserializedEntityUpdateRequest = (EntityUpdateRequest) deserializedMessage;
+        Referenceable       deserializedEntity              = deserializedEntityUpdateRequest.getEntities().get(0);
+        Referenceable       entity                          = message.getEntities().get(0);
+        String              traitName                       = entity.getTraitNames().get(0);
+
+        assertEquals(deserializedEntity.getId(), entity.getId());
+        assertEquals(deserializedEntity.getTypeName(), entity.getTypeName());
+        assertEquals(deserializedEntity.getTraits(), entity.getTraits());
+        assertEquals(deserializedEntity.getTrait(traitName).hashCode(), entity.getTrait(traitName).hashCode());
+
+    }
+
+    private Referenceable generateLargeEntityWithTrait() {
+        Referenceable ret = EntityNotificationTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap()));
+
+        // add 100 attributes, each with value of size 10k
+        // Json Size=1,027,984; GZipped Size=16,387 ==> will compress, but not split
+        String attrValue = RandomStringUtils.randomAlphanumeric(10 * 1024); // use the same value for all attributes - to aid better compression
+        for (int i = 0; i < 100; i++) {
+            ret.set("attr_" + i, attrValue);
+        }
+
+        return ret;
+    }
+
+    private Referenceable generateVeryLargeEntityWithTrait() {
+        Referenceable ret = EntityNotificationTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap()));
+
+        // add 300 attributes, each with value of size 10k
+        // Json Size=3,082,384; GZipped Size=2,313,357 ==> will compress & split
+        for (int i = 0; i < 300; i++) {
+            ret.set("attr_" + i, RandomStringUtils.randomAlphanumeric(10 * 1024));
+        }
+
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java
index a8d4926..cf691af 100644
--- a/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java
@@ -17,14 +17,15 @@
  */
 package org.apache.atlas.notification.hook;
 
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.model.notification.HookNotification.HookNotificationType;
 import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.atlas.type.AtlasType;
-import org.apache.atlas.v1.model.notification.HookNotification.EntityCreateRequest;
-import org.apache.atlas.v1.model.notification.HookNotification.HookNotificationMessage;
-import org.apache.atlas.v1.model.notification.HookNotification.HookNotificationType;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 
 
 public class HookNotificationTest {
@@ -37,18 +38,21 @@ public class HookNotificationTest {
         entity1.set("complex", new Referenceable("othertype"));
         Referenceable entity2 = new Referenceable("newtype");
         String user = "user";
-        EntityCreateRequest request = new EntityCreateRequest(user, entity1, entity2);
 
-        String notificationJson = AtlasType.toV1Json(request);
-        HookNotificationMessage actualNotification = deserializer.deserialize(notificationJson);
+        EntityCreateRequest request           = new EntityCreateRequest(user, entity1, entity2);
+        String              notificationJson  = AtlasType.toV1Json(request);
+        HookNotification    actualNotification = deserializer.deserialize(notificationJson);
 
         assertEquals(actualNotification.getType(), HookNotificationType.ENTITY_CREATE);
         assertEquals(actualNotification.getUser(), user);
+        assertTrue(actualNotification instanceof EntityCreateRequest);
 
         EntityCreateRequest createRequest = (EntityCreateRequest) actualNotification;
+
         assertEquals(createRequest.getEntities().size(), 2);
 
         Referenceable actualEntity1 = createRequest.getEntities().get(0);
+
         assertEquals(actualEntity1.getTypeName(), "sometype");
         assertEquals(((Referenceable)actualEntity1.get("complex")).getTypeName(), "othertype");
         assertEquals(createRequest.getEntities().get(1).getTypeName(), "newtype");
@@ -59,9 +63,10 @@ public class HookNotificationTest {
         //Code to generate the json, use it for hard-coded json used later in this test
         Referenceable entity = new Referenceable("sometype");
         entity.set("attr", "value");
-        EntityCreateRequest request = new EntityCreateRequest(null, entity);
 
-        String notificationJsonFromCode = AtlasType.toV1Json(request);
+        EntityCreateRequest request                  = new EntityCreateRequest(null, entity);
+        String              notificationJsonFromCode = AtlasType.toV1Json(request);
+
         System.out.println(notificationJsonFromCode);
 
         //Json without user and assert that the string can be deserialised
@@ -88,9 +93,9 @@ public class HookNotificationTest {
                 + "}";
 
 
-        HookNotificationMessage actualNotification = deserializer.deserialize(notificationJson);
+        HookNotification actualNotification = deserializer.deserialize(notificationJson);
 
         assertEquals(actualNotification.getType(), HookNotificationType.ENTITY_CREATE);
-        assertEquals(actualNotification.getUser(), HookNotificationMessage.UNKNOW_USER);
+        assertEquals(actualNotification.getUser(), HookNotification.UNKNOW_USER);
     }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
index acbc996..4633de9 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
@@ -21,9 +21,11 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.listener.EntityChangeListener;
+import org.apache.atlas.notification.NotificationInterface.NotificationType;
 import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.atlas.v1.model.instance.Struct;
-import org.apache.atlas.v1.model.notification.EntityNotification;
+import org.apache.atlas.v1.model.notification.EntityNotificationV1;
+import org.apache.atlas.v1.model.notification.EntityNotificationV1.OperationType;
 import org.apache.atlas.repository.graph.GraphHelper;
 import org.apache.atlas.type.AtlasClassificationType;
 import org.apache.atlas.type.AtlasTypeRegistry;
@@ -69,32 +71,32 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
 
     @Override
     public void onEntitiesAdded(Collection<Referenceable> entities, boolean isImport) throws AtlasException {
-        notifyOfEntityEvent(entities, EntityNotification.OperationType.ENTITY_CREATE);
+        notifyOfEntityEvent(entities, OperationType.ENTITY_CREATE);
     }
 
     @Override
     public void onEntitiesUpdated(Collection<Referenceable> entities, boolean isImport) throws AtlasException {
-        notifyOfEntityEvent(entities, EntityNotification.OperationType.ENTITY_UPDATE);
+        notifyOfEntityEvent(entities, OperationType.ENTITY_UPDATE);
     }
 
     @Override
     public void onTraitsAdded(Referenceable entity, Collection<? extends Struct> traits) throws AtlasException {
-        notifyOfEntityEvent(Collections.singleton(entity), EntityNotification.OperationType.TRAIT_ADD);
+        notifyOfEntityEvent(Collections.singleton(entity), OperationType.TRAIT_ADD);
     }
 
     @Override
     public void onTraitsDeleted(Referenceable entity, Collection<String> traitNames) throws AtlasException {
-        notifyOfEntityEvent(Collections.singleton(entity), EntityNotification.OperationType.TRAIT_DELETE);
+        notifyOfEntityEvent(Collections.singleton(entity), OperationType.TRAIT_DELETE);
     }
 
     @Override
     public void onTraitsUpdated(Referenceable entity, Collection<? extends Struct> traits) throws AtlasException {
-        notifyOfEntityEvent(Collections.singleton(entity), EntityNotification.OperationType.TRAIT_UPDATE);
+        notifyOfEntityEvent(Collections.singleton(entity), OperationType.TRAIT_UPDATE);
     }
 
     @Override
     public void onEntitiesDeleted(Collection<Referenceable> entities, boolean isImport) throws AtlasException {
-        notifyOfEntityEvent(entities, EntityNotification.OperationType.ENTITY_DELETE);
+        notifyOfEntityEvent(entities, OperationType.ENTITY_DELETE);
     }
 
 
@@ -145,8 +147,8 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
 
     // send notification of entity change
     private void notifyOfEntityEvent(Collection<Referenceable> entityDefinitions,
-                                     EntityNotification.OperationType operationType) throws AtlasException {
-        List<EntityNotification> messages = new LinkedList<>();
+                                     OperationType             operationType) throws AtlasException {
+        List<EntityNotificationV1> messages = new ArrayList<>();
 
         for (Referenceable entityDefinition : entityDefinitions) {
             if(GraphHelper.isInternalType(entityDefinition.getTypeName())) {
@@ -165,13 +167,13 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
                 }
             }
 
-            EntityNotification notification = new EntityNotification(entity, operationType, getAllTraits(entity, typeRegistry));
+            EntityNotificationV1 notification = new EntityNotificationV1(entity, operationType, getAllTraits(entity, typeRegistry));
 
             messages.add(notification);
         }
 
         if (!messages.isEmpty()) {
-            notificationInterface.send(NotificationInterface.NotificationType.ENTITIES, messages);
+            notificationInterface.send(NotificationType.ENTITIES, messages);
         }
     }