You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sh...@apache.org on 2015/08/05 11:51:09 UTC

incubator-atlas git commit: ATLAS-74 Create notification framework (shwethags)

Repository: incubator-atlas
Updated Branches:
  refs/heads/master 751b4c876 -> b627a681e


ATLAS-74 Create notification framework (shwethags)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/b627a681
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/b627a681
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/b627a681

Branch: refs/heads/master
Commit: b627a681edc795c71ccf13e75a79ef102e75a916
Parents: 751b4c8
Author: Shwetha GS <ss...@hortonworks.com>
Authored: Wed Aug 5 15:21:00 2015 +0530
Committer: Shwetha GS <ss...@hortonworks.com>
Committed: Wed Aug 5 15:21:00 2015 +0530

----------------------------------------------------------------------
 addons/hive-bridge/pom.xml                      |   4 -
 .../src/test/resources/application.properties   |  64 -----
 .../org/apache/atlas/ApplicationProperties.java |  10 +-
 client/src/main/resources/client.properties     |  37 +++
 notification/pom.xml                            |  75 +++++
 .../org/apache/atlas/kafka/KafkaConsumer.java   |  50 ++++
 .../apache/atlas/kafka/KafkaNotification.java   | 284 +++++++++++++++++++
 .../notification/NotificationConsumer.java      |  32 +++
 .../notification/NotificationException.java     |  26 ++
 .../notification/NotificationHookConsumer.java  |  87 ++++++
 .../notification/NotificationInterface.java     |  77 +++++
 .../atlas/notification/NotificationModule.java  |  28 ++
 .../atlas/kafka/KafkaNotificationTest.java      |  68 +++++
 pom.xml                                         |  29 +-
 release-log.txt                                 |   1 +
 .../src/test/resources/application.properties   |  65 -----
 src/conf/application.properties                 |   4 +
 src/conf/client.properties                      |  10 +-
 .../src/main/resources/application.properties   |  47 ++-
 typesystem/src/main/resources/log4j.xml         |  26 +-
 webapp/src/main/java/org/apache/atlas/Main.java |  27 +-
 .../atlas/web/service/EmbeddedServer.java       |  20 +-
 .../src/main/resources/application.properties   |  60 ----
 webapp/src/main/resources/log4j.xml             |   2 +-
 .../java/org/apache/atlas/web/TestUtils.java    |   5 +
 .../atlas/web/security/BaseSecurityTest.java    |  10 +-
 .../web/service/SecureEmbeddedServerIT.java     |   5 +-
 .../web/service/SecureEmbeddedServerITBase.java |  33 +--
 28 files changed, 906 insertions(+), 280 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/addons/hive-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/pom.xml b/addons/hive-bridge/pom.xml
index 66b5f74..914d8c6 100755
--- a/addons/hive-bridge/pom.xml
+++ b/addons/hive-bridge/pom.xml
@@ -256,10 +256,6 @@
                             <name>atlas.log.dir</name>
                             <value>${project.build.directory}/logs</value>
                         </systemProperty>
-                        <systemProperty>
-                            <name>atlas.conf</name>
-                            <value>${project.build.directory}/test-classes</value>
-                        </systemProperty>
                     </systemProperties>
                     <stopKey>atlas-stop</stopKey>
                     <stopPort>41001</stopPort>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/addons/hive-bridge/src/test/resources/application.properties
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/test/resources/application.properties b/addons/hive-bridge/src/test/resources/application.properties
deleted file mode 100644
index dda9a18..0000000
--- a/addons/hive-bridge/src/test/resources/application.properties
+++ /dev/null
@@ -1,64 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-#########  Graph Database Configs  #########
-#Refer http://s3.thinkaurelius.com/docs/titan/0.5.1/titan-config-ref.html
-# Graph Storage
-atlas.graph.storage.backend=${titan.storage.backend}
-
-#Berkeley storage directory
-atlas.graph.storage.directory=target/data/berkley
-
-#hbase
-#For standalone mode , specify localhost
-#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2
-atlas.graph.storage.hostname=${titan.storage.hostname}
-
-# Graph Search Index Backend
-atlas.graph.index.search.backend=${titan.index.backend}
-
-#lucene
-#atlas.graph.index.search.directory=target/data/lucene
-
-#elasticsearch
-atlas.graph.index.search.directory=./target/data/es
-atlas.graph.index.search.elasticsearch.client-only=false
-atlas.graph.index.search.elasticsearch.local-mode=true
-atlas.graph.index.search.elasticsearch.create.sleep=2000
-
-#solr in cloud mode
-atlas.graph.index.search.solr.mode=cloud
-atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address}
-
-#solr in http mode
-atlas.graph.index.search.solr.http-urls=http://localhost:8983/solr
-
-#########  Hive Lineage Configs  #########
-#atlas.lineage.hive.table.type.name=DataSet
-#atlas.lineage.hive.process.type.name=Process
-#atlas.lineage.hive.process.inputs.name=inputs
-#atlas.lineage.hive.process.outputs.name=outputs
-
-## Schema
-#atlas.lineage.hive.table.schema.query.hive_table=hive_table where name='%s'\, columns
-
-
-#########  Security Properties  #########
-
-# SSL config
-atlas.enableTLS=false

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/client/src/main/java/org/apache/atlas/ApplicationProperties.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/ApplicationProperties.java b/client/src/main/java/org/apache/atlas/ApplicationProperties.java
index 15cca47..738ec53 100644
--- a/client/src/main/java/org/apache/atlas/ApplicationProperties.java
+++ b/client/src/main/java/org/apache/atlas/ApplicationProperties.java
@@ -17,19 +17,17 @@
 
 package org.apache.atlas;
 
-import org.apache.commons.configuration.AbstractConfiguration;
+import org.apache.commons.configuration.CompositeConfiguration;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.net.URL;
+import java.util.Arrays;
 import java.util.Iterator;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 public class ApplicationProperties extends PropertiesConfiguration {
     private static final Logger LOG = LoggerFactory.getLogger(ApplicationProperties.class);
@@ -47,7 +45,9 @@ public class ApplicationProperties extends PropertiesConfiguration {
         if (INSTANCE == null) {
             synchronized (ApplicationProperties.class) {
                 if (INSTANCE == null) {
-                   INSTANCE = get(APPLICATION_PROPERTIES);
+                    Configuration applicationProperties = get(APPLICATION_PROPERTIES);
+                    Configuration clientProperties = get(CLIENT_PROPERTIES);
+                    INSTANCE = new CompositeConfiguration(Arrays.asList(applicationProperties, clientProperties));
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/client/src/main/resources/client.properties
----------------------------------------------------------------------
diff --git a/client/src/main/resources/client.properties b/client/src/main/resources/client.properties
new file mode 100755
index 0000000..722d029
--- /dev/null
+++ b/client/src/main/resources/client.properties
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#########  Security Properties  #########
+
+# SSL config
+
+atlas.enableTLS=false
+#truststore.file=/path/to/truststore.jks
+#cert.stores.credential.provider.path=jceks://file/path/to/credentialstore.jceks
+
+#following only required for 2-way SSL
+#keystore.file=/path/to/keystore.jks
+
+# Authentication config
+
+# enabled:  true or false
+atlas.http.authentication.enabled=false
+# type:  simple or kerberos
+atlas.http.authentication.type=simple
+
+#########  Security Properties  #########

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/notification/pom.xml
----------------------------------------------------------------------
diff --git a/notification/pom.xml b/notification/pom.xml
new file mode 100644
index 0000000..b036855
--- /dev/null
+++ b/notification/pom.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>apache-atlas</artifactId>
+        <groupId>org.apache.atlas</groupId>
+        <version>0.6-incubating-SNAPSHOT</version>
+    </parent>
+    <artifactId>atlas-notification</artifactId>
+    <description>Apache Atlas Client</description>
+    <name>Apache Atlas Notification</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.atlas</groupId>
+            <artifactId>atlas-client</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.atlas</groupId>
+            <artifactId>atlas-typesystem</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_${scala.binary.version}</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>commons-configuration</groupId>
+            <artifactId>commons-configuration</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
new file mode 100644
index 0000000..70bb5d6
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.kafka;
+
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.message.MessageAndMetadata;
+import org.apache.atlas.notification.NotificationConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaConsumer implements NotificationConsumer {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
+
+    private final int consumerId;
+    private final ConsumerIterator iterator;
+
+    public KafkaConsumer(KafkaStream<String, String> stream, int consumerId) {
+        this.iterator = stream.iterator();
+        this.consumerId = consumerId;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return iterator.hasNext();
+    }
+
+    @Override
+    public String next() {
+        MessageAndMetadata message = iterator.next();
+        LOG.debug("Read message: conumerId: {}, topic - {}, partition - {}, offset - {}, message - {}",
+                consumerId, message.topic(), message.partition(), message.offset(), message.message());
+        return (String) message.message();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
new file mode 100644
index 0000000..9978275
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.kafka;
+
+import com.google.inject.Singleton;
+import kafka.consumer.Consumer;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.serializer.StringDecoder;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.Time;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.atlas.notification.NotificationException;
+import org.apache.atlas.notification.NotificationInterface;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+@Singleton
+public class KafkaNotification extends NotificationInterface {
+    public static final Logger LOG = LoggerFactory.getLogger(KafkaNotification.class);
+
+    public static final String PROPERTY_PREFIX = NotificationInterface.PROPERTY_PREFIX + ".kafka";
+
+    private static final int ATLAS_ZK_PORT = 9026;
+    private static final int ATLAS_KAFKA_PORT = 9027;
+    private static final String ATLAS_KAFKA_DATA = "data";
+
+    public static final String ATLAS_HOOK_TOPIC = "ATLAS_HOOK";
+    public static final String ATLAS_ENTITIES_TOPIC = "ATLAS_ENTITIES";
+    public static final String ATLAS_TYPES_TOPIC = "ATLAS_TYPES";
+
+    private static final String ATLAS_GROUP = "atlas";
+    private KafkaServer kafkaServer;
+    private ServerCnxnFactory factory;
+    private Properties properties;
+
+    private KafkaProducer producer = null;
+    private List<ConsumerConnector> consumerConnectors = new ArrayList<>();
+
+    private KafkaConsumer consumer;
+
+    private static final Map<NotificationType, String> topicMap = new HashMap<NotificationType, String>() {{
+        put(NotificationType.HOOK, ATLAS_HOOK_TOPIC);
+        put(NotificationType.ENTITIES, ATLAS_ENTITIES_TOPIC);
+        put(NotificationType.TYPES, ATLAS_TYPES_TOPIC);
+    }};
+
+    private synchronized void createProducer() {
+        if (producer == null) {
+            producer = new KafkaProducer(properties);
+        }
+    }
+
+    @Override
+    public void initialize(Configuration applicationProperties) throws AtlasException {
+        super.initialize(applicationProperties);
+        Configuration subsetConfiguration =
+                ApplicationProperties.getSubsetConfiguration(applicationProperties, PROPERTY_PREFIX);
+        properties = ConfigurationConverter.getProperties(subsetConfiguration);
+        //override to store offset in kafka
+        //todo do we need ability to replay?
+
+        //Override default configs
+        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.StringSerializer");
+        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.StringSerializer");
+
+        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+
+        //todo take group id as argument to allow multiple consumers??
+        properties.put(ConsumerConfig.GROUP_ID_CONFIG, ATLAS_GROUP);
+        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.StringDeserializer");
+        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.StringDeserializer");
+        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "roundrobin");
+        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "smallest");
+
+        if (isEmbedded()) {
+            properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + ATLAS_KAFKA_PORT);
+            properties.setProperty("zookeeper.connect", "localhost:" + ATLAS_ZK_PORT);
+        }
+
+        //todo new APIs not available yet
+//        consumer = new KafkaConsumer(properties);
+//        consumer.subscribe(ATLAS_HOOK_TOPIC);
+    }
+
+    @Override
+    protected void _startService() throws IOException {
+        startZk();
+        startKafka();
+    }
+
+    private String startZk() throws IOException {
+        //todo read zk endpoint from config
+        this.factory = NIOServerCnxnFactory.createFactory(new InetSocketAddress("0.0.0.0", ATLAS_ZK_PORT), 1024);
+        File snapshotDir = constructDir("zk/txn");
+        File logDir = constructDir("zk/snap");
+
+        try {
+            factory.startup(new ZooKeeperServer(snapshotDir, logDir, 500));
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+        return factory.getLocalAddress().getAddress().toString();
+    }
+
+    private void startKafka() {
+        Properties brokerConfig = properties;
+        brokerConfig.setProperty("broker.id", "1");
+        //todo read kafka endpoint from config
+        brokerConfig.setProperty("host.name", "0.0.0.0");
+        brokerConfig.setProperty("port", String.valueOf(ATLAS_KAFKA_PORT));
+        brokerConfig.setProperty("log.dirs", constructDir("kafka").getAbsolutePath());
+        brokerConfig.setProperty("log.flush.interval.messages", String.valueOf(1));
+
+        kafkaServer = new KafkaServer(new KafkaConfig(brokerConfig), new SystemTime());
+        kafkaServer.startup();
+        LOG.debug("Embedded kafka server started with broker config {}", brokerConfig);
+    }
+
+    private static class SystemTime implements Time {
+        @Override
+        public long milliseconds() {
+            return System.currentTimeMillis();
+        }
+
+        @Override
+        public long nanoseconds() {
+            return System.nanoTime();
+        }
+
+        @Override
+        public void sleep(long arg0) {
+            try {
+                Thread.sleep(arg0);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private File constructDir(String dirPrefix) {
+        File file = new File(properties.getProperty(ATLAS_KAFKA_DATA), dirPrefix);
+        if (!file.exists() && !file.mkdirs()) {
+            throw new RuntimeException("could not create temp directory: " + file.getAbsolutePath());
+        }
+        return file;
+    }
+
+    @Override
+    public void _shutdown() {
+        if (producer != null) {
+            producer.close();
+        }
+
+        if (consumer != null) {
+            consumer.close();
+        }
+
+        for (ConsumerConnector consumerConnector : consumerConnectors) {
+            consumerConnector.shutdown();
+        }
+
+        if (kafkaServer != null) {
+            kafkaServer.shutdown();
+        }
+
+        if (factory != null) {
+            factory.shutdown();
+        }
+    }
+
+    @Override
+    public List<NotificationConsumer> createConsumers(NotificationType type, int numConsumers) {
+        String topic = topicMap.get(type);
+
+        ConsumerConnector consumerConnector =
+                Consumer.createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(properties));
+        Map<String, Integer> topicCountMap = new HashMap<>();
+        topicCountMap.put(topic, numConsumers);
+        StringDecoder decoder = new StringDecoder(null);
+        Map<String, List<KafkaStream<String, String>>> streamsMap =
+                consumerConnector.createMessageStreams(topicCountMap, decoder, decoder);
+        List<KafkaStream<String, String>> kafkaConsumers = streamsMap.get(topic);
+        List<NotificationConsumer> consumers = new ArrayList<>(numConsumers);
+        int consumerId = 0;
+        for (KafkaStream stream : kafkaConsumers) {
+            consumers.add(new org.apache.atlas.kafka.KafkaConsumer(stream, consumerId++));
+        }
+        consumerConnectors.add(consumerConnector);
+
+        return consumers;
+    }
+
+    @Override
+    public void send(NotificationType type, String... messages) throws NotificationException {
+        if (producer == null) {
+            createProducer();
+        }
+
+        String topic = topicMap.get(type);
+        List<Future<RecordMetadata>> futures = new ArrayList<>();
+        for (String message : messages) {
+            ProducerRecord record = new ProducerRecord(topic, message);
+            LOG.debug("Sending message for topic {}: {}", topic, message);
+            futures.add(producer.send(record));
+        }
+
+        for (Future<RecordMetadata> future : futures) {
+            try {
+                RecordMetadata response = future.get();
+                LOG.debug("Sent message for topic - {}, partition - {}, offset - {}", response.topic(),
+                        response.partition(), response.offset());
+            } catch (Exception e) {
+                throw new NotificationException(e);
+            }
+        }
+    }
+
+    //New API, not used now
+    private List<String> receive(long timeout) throws NotificationException {
+        Map<String, ConsumerRecords> recordsMap = consumer.poll(timeout);
+        List<String> messages = new ArrayList<>();
+        if (recordsMap != null) {
+            for (ConsumerRecords records : recordsMap.values()) {
+                List<ConsumerRecord> recordList = records.records();
+                for (ConsumerRecord record : recordList) {
+                    try {
+                        String message = (String) record.value();
+                        LOG.debug("Received message from topic {}: {}", ATLAS_HOOK_TOPIC, message);
+                        messages.add(message);
+                    } catch (Exception e) {
+                        throw new NotificationException(e);
+                    }
+                }
+            }
+        }
+        return messages;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
new file mode 100644
index 0000000..c3ac23b
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+public interface NotificationConsumer {
+    /**
+     * If there are more messages
+     * @return
+     */
+    boolean hasNext();
+
+    /**
+     * Next message - blocking call
+     * @return
+     */
+    String next();
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/notification/src/main/java/org/apache/atlas/notification/NotificationException.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationException.java b/notification/src/main/java/org/apache/atlas/notification/NotificationException.java
new file mode 100644
index 0000000..e6b02fb
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import org.apache.atlas.AtlasException;
+
+public class NotificationException extends AtlasException {
+    public NotificationException(Exception e) {
+        super(e);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
new file mode 100644
index 0000000..36a62f0
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import com.google.inject.Inject;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class NotificationHookConsumer {
+    private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
+
+    public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
+    public static final String ATLAS_ENDPOINT_PROPERTY = "atlas.rest.address";
+
+    @Inject
+    private static NotificationInterface notificationInterface;
+
+    private static ExecutorService executors;
+    private static AtlasClient atlasClient;
+
+    public static void start() throws AtlasException {
+        Configuration applicationProperties = ApplicationProperties.get();
+        notificationInterface.initialize(applicationProperties);
+
+        String atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000");
+        atlasClient = new AtlasClient(atlasEndpoint);
+        int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 2);
+        List<NotificationConsumer> consumers =
+                notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads);
+        executors = Executors.newFixedThreadPool(consumers.size());
+
+        for (final NotificationConsumer consumer : consumers) {
+            executors.submit(new HookConsumer(consumer));
+        }
+    }
+
+    public static void stop() {
+        notificationInterface.shutdown();
+        executors.shutdown();
+    }
+
+    static class HookConsumer implements Runnable {
+        private final NotificationConsumer consumer;
+
+        public HookConsumer(NotificationConsumer consumerInterface) {
+            this.consumer = consumerInterface;
+        }
+
+        @Override
+        public void run() {
+            while(consumer.hasNext()) {
+                String entityJson = consumer.next();
+                LOG.debug("Processing message {}", entityJson);
+                try {
+                    atlasClient.createEntity(entityJson);
+                } catch (AtlasServiceException e) {
+                    //todo handle failures
+                    LOG.warn("Error handling message {}", entityJson);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
new file mode 100644
index 0000000..0951124
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import org.apache.atlas.AtlasException;
+import org.apache.commons.configuration.Configuration;
+
+import java.io.IOException;
+import java.util.List;
+
+public abstract class NotificationInterface {
+    public static final String PROPERTY_PREFIX = "atlas.notification";
+    private static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded";
+    private boolean embedded;
+
+
+    public enum NotificationType {
+        HOOK, ENTITIES, TYPES
+    }
+
+    /**
+     * Initialise
+     * @param applicationProperties
+     * @throws AtlasException
+     */
+    public void initialize(Configuration applicationProperties) throws AtlasException {
+        this.embedded = applicationProperties.getBoolean(PROPERTY_EMBEDDED, false);
+    }
+
+    /**
+     * Start embedded notification service on atlast server
+     * @throws IOException
+     */
+    public final void startService() throws IOException {
+        if (embedded) {
+            _startService();
+        }
+    }
+
+    /**
+     * Is the notification service embedded in atlas server
+     * @return
+     */
+    protected final boolean isEmbedded() {
+        return embedded;
+    }
+
+    protected abstract void _startService() throws IOException;
+
+    /**
+     * Shutdown - close all the connections
+     */
+    public final void shutdown() {
+        _shutdown();
+    }
+
+    protected abstract void _shutdown();
+
+    public abstract List<NotificationConsumer> createConsumers(NotificationType type, int numConsumers);
+
+    public abstract void send(NotificationType type, String... messages) throws NotificationException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java b/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java
new file mode 100644
index 0000000..db17e35
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import com.google.inject.AbstractModule;
+import org.apache.atlas.kafka.KafkaNotification;
+
+public class NotificationModule extends AbstractModule {
+    @Override
+    protected void configure() {
+        bind(NotificationInterface.class).to(KafkaNotification.class).asEagerSingleton();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
new file mode 100644
index 0000000..02752dc
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.kafka;
+
+import com.google.inject.Inject;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.atlas.notification.NotificationInterface;
+import org.apache.atlas.notification.NotificationModule;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.RandomStringUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+@Guice(modules = NotificationModule.class)
+public class KafkaNotificationTest {
+
+    @Inject
+    private NotificationInterface kafka;
+
+    @BeforeClass
+    public void setUp() throws Exception {
+        Configuration conf = ApplicationProperties.get();
+        conf.setProperty(KafkaNotification.PROPERTY_PREFIX + ".data", "target/data/kafka" + random());
+        kafka.initialize(conf);
+        kafka.startService();
+    }
+
+    @Test
+    public void testSendMessage() throws AtlasException {
+        String msg1 = "message" + random();
+        String msg2 = "message" + random();
+        kafka.send(NotificationInterface.NotificationType.HOOK, msg1, msg2);
+        NotificationConsumer consumer = kafka.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0);
+        Assert.assertTrue(consumer.hasNext());
+        Assert.assertEquals(msg1, consumer.next());
+        Assert.assertTrue(consumer.hasNext());
+        Assert.assertEquals(msg2, consumer.next());
+    }
+
+    private String random() {
+        return RandomStringUtils.randomAlphanumeric(5);
+    }
+
+    @AfterClass
+    public void teardown() throws Exception {
+        kafka.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 66182e7..facd539 100755
--- a/pom.xml
+++ b/pom.xml
@@ -329,7 +329,8 @@
         <titan.version>0.5.4</titan.version>
         <hadoop.version>2.7.0</hadoop.version>
         <hbase.version>0.98.9-hadoop2</hbase.version>
-
+        <kafka.version>0.8.2.0</kafka.version>
+        
         <!-- scala versions -->
         <scala.version>2.10.4</scala.version>
         <scala.binary.version>2.10</scala.binary.version>
@@ -420,6 +421,7 @@
     </profiles>
     <modules>
         <module>typesystem</module>
+        <module>notification</module>
         <module>client</module>
         <module>repository</module>
         <module>webapp</module>
@@ -933,6 +935,12 @@
 
             <dependency>
                 <groupId>org.apache.atlas</groupId>
+                <artifactId>atlas-notification</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.atlas</groupId>
                 <artifactId>atlas-client</artifactId>
                 <version>${project.version}</version>
             </dependency>
@@ -1114,6 +1122,25 @@
                 <artifactId>commons-lang3</artifactId>
                 <version>3.4</version>
             </dependency>
+
+            <!-- kafka -->
+            <dependency>
+                <groupId>org.apache.kafka</groupId>
+                <artifactId>kafka-clients</artifactId>
+                <version>${kafka.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.kafka</groupId>
+                <artifactId>kafka_${scala.binary.version}</artifactId>
+                <version>${kafka.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>junit</groupId>
+                        <artifactId>junit</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 4e53ad5..a2d2b68 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -8,6 +8,7 @@ ATLAS-54 Rename configs in hive hook (shwethags)
 ATLAS-3 Mixed Index creation fails with Date types (suma.shivaprasad via shwethags)
 
 ALL CHANGES:
+ATLAS-74 Create notification framework (shwethags)
 ATLAS-93 import-hive.sh reports FileNotFoundException (shwethags)
 ATLAS-92 import-hive.sh failed to find HiveMetaStoreBridge (airbots via shwethags)
 ATLAS-16 jersey jaxb exception (shwethags)

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/repository/src/test/resources/application.properties
----------------------------------------------------------------------
diff --git a/repository/src/test/resources/application.properties b/repository/src/test/resources/application.properties
deleted file mode 100755
index d0eaa8c..0000000
--- a/repository/src/test/resources/application.properties
+++ /dev/null
@@ -1,65 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-#########  Graph Database Configs  #########
-#Refer http://s3.thinkaurelius.com/docs/titan/0.5.1/titan-config-ref.html
-# Graph Storage
-
-atlas.graph.storage.backend=${titan.storage.backend}
-
-#Berkeley storage directory
-atlas.graph.storage.directory=target/data/berkley
-
-#hbase
-#For standalone mode , specify localhost
-#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2
-atlas.graph.storage.hostname=${titan.storage.hostname}
-
-# Graph Search Index Backend
-atlas.graph.index.search.backend=${titan.index.backend}
-
-#lucene
-#atlas.graph.index.search.directory=target/data/lucene
-
-#elasticsearch
-atlas.graph.index.search.directory=./target/data/es
-atlas.graph.index.search.elasticsearch.client-only=false
-atlas.graph.index.search.elasticsearch.local-mode=true
-atlas.graph.index.search.elasticsearch.create.sleep=2000
-
-#solr in cloud mode
-atlas.graph.index.search.solr.mode=cloud
-atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address}
-
-#solr in http mode
-atlas.graph.index.search.solr.http-urls=http://localhost:8983/solr
-
-#########  Hive Lineage Configs  #########
-#atlas.lineage.hive.table.type.name=DataSet
-#atlas.lineage.hive.process.type.name=Process
-#atlas.lineage.hive.process.inputs.name=inputs
-#atlas.lineage.hive.process.outputs.name=outputs
-
-## Schema
-atlas.lineage.hive.table.schema.query.hive_table=hive_table where name='%s'\, columns
-
-
-#########  Security Properties  #########
-
-# SSL config
-atlas.enableTLS=false

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/src/conf/application.properties
----------------------------------------------------------------------
diff --git a/src/conf/application.properties b/src/conf/application.properties
index 5487749..bf323a7 100755
--- a/src/conf/application.properties
+++ b/src/conf/application.properties
@@ -45,6 +45,10 @@ atlas.graph.index.search.elasticsearch.client-only=false
 atlas.graph.index.search.elasticsearch.local-mode=true
 atlas.graph.index.search.elasticsearch.create.sleep=2000
 
+#########  Notification Configs  #########
+atlas.notification.embedded=true
+atlas.notification.kafka.data=${sys:atlas.home}/data/kafka
+
 #########  Hive Lineage Configs  #########
 # This models reflects the base super types for Data and Process
 #atlas.lineage.hive.table.type.name=DataSet

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/src/conf/client.properties
----------------------------------------------------------------------
diff --git a/src/conf/client.properties b/src/conf/client.properties
index b64755f..ab6ef2f 100755
--- a/src/conf/client.properties
+++ b/src/conf/client.properties
@@ -21,10 +21,12 @@
 # SSL config
 
 atlas.enableTLS=false
-truststore.file=/path/to/truststore.jks
-cert.stores.credential.provider.path=jceks://file/path/to/credentialstore.jceks
-# following only required for 2-way SSL
-keystore.file=/path/to/keystore.jks
+
+#truststore.file=/path/to/truststore.jks
+#cert.stores.credential.provider.path=jceks://file/path/to/credentialstore.jceks
+
+#following only required for 2-way SSL
+#keystore.file=/path/to/keystore.jks
 
 # Authentication config
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/typesystem/src/main/resources/application.properties
----------------------------------------------------------------------
diff --git a/typesystem/src/main/resources/application.properties b/typesystem/src/main/resources/application.properties
index 29c933f..f7e2774 100644
--- a/typesystem/src/main/resources/application.properties
+++ b/typesystem/src/main/resources/application.properties
@@ -18,8 +18,47 @@
 
 #########  Graph Database Configs  #########
 # Graph Storage
-atlas.graph.storage.backend=inmemory
+atlas.graph.storage.backend=${titan.storage.backend}
 
-# Graph Search Index
-atlas.graph.index.search.backend=lucene
-atlas.graph.index.search.directory=target/data/lucene
+# Graph Search Index Backend
+atlas.graph.index.search.backend=${titan.index.backend}
+
+#Berkeley storage directory
+atlas.graph.storage.directory=target/data/berkley
+
+#hbase
+#For standalone mode , specify localhost
+#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2
+atlas.graph.storage.hostname=${titan.storage.hostname}
+
+#ElasticSearch
+atlas.graph.index.search.directory=target/data/es
+atlas.graph.index.search.elasticsearch.client-only=false
+atlas.graph.index.search.elasticsearch.local-mode=true
+atlas.graph.index.search.elasticsearch.create.sleep=2000
+
+# Solr cloud mode properties
+atlas.graph.index.search.solr.mode=cloud
+atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address}
+
+#########  Hive Lineage Configs  #########
+# This models reflects the base super types for Data and Process
+#atlas.lineage.hive.table.type.name=DataSet
+#atlas.lineage.hive.process.type.name=Process
+#atlas.lineage.hive.process.inputs.name=inputs
+#atlas.lineage.hive.process.outputs.name=outputs
+
+## Schema
+atlas.lineage.hive.table.schema.query.hive_table=hive_table where name='%s'\, columns
+
+#########  Notification Configs  #########
+atlas.notification.embedded=true
+atlas.notification.implementation=org.apache.atlas.kafka.KafkaNotification
+atlas.notification.kafka.data=target/data/kafka
+
+#########  Security Properties  #########
+
+# SSL config
+atlas.enableTLS=false
+
+#########  Security Properties  #########

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/typesystem/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/typesystem/src/main/resources/log4j.xml b/typesystem/src/main/resources/log4j.xml
index 999caad..528881a 100755
--- a/typesystem/src/main/resources/log4j.xml
+++ b/typesystem/src/main/resources/log4j.xml
@@ -27,15 +27,6 @@
         </layout>
     </appender>
 
-    <appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender">
-        <param name="File" value="${user.dir}/target/logs/application.log"/>
-        <param name="Append" value="true"/>
-        <param name="Threshold" value="debug"/>
-        <layout class="org.apache.log4j.PatternLayout">
-            <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%c{1}:%L)%n"/>
-        </layout>
-    </appender>
-
     <appender name="AUDIT" class="org.apache.log4j.DailyRollingFileAppender">
         <param name="File" value="${user.dir}/target/logs/audit.log"/>
         <param name="Append" value="true"/>
@@ -55,23 +46,8 @@
         <appender-ref ref="console"/>
     </logger>
 
-    <logger name="com.thinkaurelius.titan" additivity="false">
-        <level value="warn"/>
-        <appender-ref ref="console"/>
-    </logger>
-
-    <logger name="org.elasticsearch" additivity="false">
-        <level value="warn"/>
-        <appender-ref ref="console"/>
-    </logger>
-
-    <logger name="org.apache.lucene" additivity="false">
-        <level value="warn"/>
-        <appender-ref ref="console"/>
-    </logger>
-
     <root>
-        <priority value="info"/>
+        <priority value="warn"/>
         <appender-ref ref="console"/>
     </root>
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/webapp/src/main/java/org/apache/atlas/Main.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/Main.java b/webapp/src/main/java/org/apache/atlas/Main.java
index b71ca82..7b13f3d 100755
--- a/webapp/src/main/java/org/apache/atlas/Main.java
+++ b/webapp/src/main/java/org/apache/atlas/Main.java
@@ -41,10 +41,27 @@ public final class Main {
     private static final String APP_PORT = "port";
     private static final String ATLAS_HOME = "atlas.home";
     private static final String ATLAS_LOG_DIR = "atlas.log.dir";
-    public static final String ATLAS_SERVER_HTTPS_PORT =
-        "atlas.server.https.port";
-    public static final String ATLAS_SERVER_HTTP_PORT =
-        "atlas.server.http.port";
+    public static final String ATLAS_SERVER_HTTPS_PORT = "atlas.server.https.port";
+    public static final String ATLAS_SERVER_HTTP_PORT = "atlas.server.http.port";
+
+    private static EmbeddedServer server;
+
+    static {
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                try {
+                    shutdown();
+                } catch (Exception e) {
+                    LOG.debug("Failed to shutdown", e);
+                }
+            }
+        });
+    }
+
+    private static void shutdown() {
+        server.stop();
+    }
 
     /**
      * Prevent users from constructing this.
@@ -84,7 +101,7 @@ public final class Main {
         configuration.setProperty("atlas.enableTLS", String.valueOf(enableTLS));
 
         showStartupInfo(buildConfiguration, enableTLS, appPort);
-        EmbeddedServer server = EmbeddedServer.newServer(appPort, appPath, enableTLS);
+        server = EmbeddedServer.newServer(appPort, appPath, enableTLS);
         server.start();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java b/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java
index 80df87d..871d857 100755
--- a/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java
+++ b/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java
@@ -18,14 +18,16 @@
 
 package org.apache.atlas.web.service;
 
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.commons.configuration.Configuration;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.HttpConfiguration;
 import org.eclipse.jetty.server.HttpConnectionFactory;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
 import org.eclipse.jetty.webapp.WebAppContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
@@ -33,6 +35,8 @@ import java.io.IOException;
  * This class embeds a Jetty server and a connector.
  */
 public class EmbeddedServer {
+    public static final Logger LOG = LoggerFactory.getLogger(EmbeddedServer.class);
+
     private static final int DEFAULT_BUFFER_SIZE = 16192;
 
     protected final Server server = new Server();
@@ -71,9 +75,9 @@ public class EmbeddedServer {
 
     protected Integer getBufferSize() {
         try {
-            PropertiesConfiguration configuration = new PropertiesConfiguration("application.properties");
+            Configuration configuration = ApplicationProperties.get();
             return configuration.getInt("atlas.jetty.request.buffer.size", DEFAULT_BUFFER_SIZE);
-        } catch (ConfigurationException e) {
+        } catch (Exception e) {
             // do nothing
         }
 
@@ -85,7 +89,11 @@ public class EmbeddedServer {
         server.join();
     }
 
-    public void stop() throws Exception {
-        server.stop();
+    public void stop() {
+        try {
+            server.stop();
+        } catch (Exception e) {
+            LOG.warn("Error during shutdown", e);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/webapp/src/main/resources/application.properties
----------------------------------------------------------------------
diff --git a/webapp/src/main/resources/application.properties b/webapp/src/main/resources/application.properties
deleted file mode 100755
index ecfdc38..0000000
--- a/webapp/src/main/resources/application.properties
+++ /dev/null
@@ -1,60 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-#########  Graph Database Configs  #########
-# Graph Storage
-atlas.graph.storage.backend=${titan.storage.backend}
-
-# Graph Search Index Backend
-atlas.graph.index.search.backend=${titan.index.backend}
-
-#Berkeley storage directory
-atlas.graph.storage.directory=target/data/berkley
-
-#hbase
-#For standalone mode , specify localhost
-#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2
-atlas.graph.storage.hostname=${titan.storage.hostname}
-
-#ElasticSearch
-atlas.graph.index.search.directory=target/data/es
-atlas.graph.index.search.elasticsearch.client-only=false
-atlas.graph.index.search.elasticsearch.local-mode=true
-atlas.graph.index.search.elasticsearch.create.sleep=2000
-
-# Solr cloud mode properties
-atlas.graph.index.search.solr.mode=cloud
-atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address}
-
-#########  Hive Lineage Configs  #########
-# This models reflects the base super types for Data and Process
-#atlas.lineage.hive.table.type.name=DataSet
-#atlas.lineage.hive.process.type.name=Process
-#atlas.lineage.hive.process.inputs.name=inputs
-#atlas.lineage.hive.process.outputs.name=outputs
-
-## Schema
-atlas.lineage.hive.table.schema.query.hive_table=hive_table where name='%s'\, columns
-
-
-#########  Security Properties  #########
-
-# SSL config
-atlas.enableTLS=false
-
-#########  Security Properties  #########

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/webapp/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/webapp/src/main/resources/log4j.xml b/webapp/src/main/resources/log4j.xml
index 5c42c98..7827c1a 100755
--- a/webapp/src/main/resources/log4j.xml
+++ b/webapp/src/main/resources/log4j.xml
@@ -57,7 +57,7 @@
     </logger>
 
     <root>
-        <priority value="info"/>
+        <priority value="warn"/>
         <appender-ref ref="FILE"/>
     </root>
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/webapp/src/test/java/org/apache/atlas/web/TestUtils.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/TestUtils.java b/webapp/src/test/java/org/apache/atlas/web/TestUtils.java
index ede041e..47bea1f 100644
--- a/webapp/src/test/java/org/apache/atlas/web/TestUtils.java
+++ b/webapp/src/test/java/org/apache/atlas/web/TestUtils.java
@@ -46,4 +46,9 @@ public class TestUtils {
     public static String getTempDirectory() {
         return System.getProperty("projectBaseDir") + "/webapp/target/" + random();
     }
+
+    public static String getWarPath() {
+        return System.getProperty("projectBaseDir") + String.format("/webapp/target/atlas-webapp-%s",
+                System.getProperty("project.version"));
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/webapp/src/test/java/org/apache/atlas/web/security/BaseSecurityTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/security/BaseSecurityTest.java b/webapp/src/test/java/org/apache/atlas/web/security/BaseSecurityTest.java
index 614638c..8af4a7e 100644
--- a/webapp/src/test/java/org/apache/atlas/web/security/BaseSecurityTest.java
+++ b/webapp/src/test/java/org/apache/atlas/web/security/BaseSecurityTest.java
@@ -16,9 +16,9 @@
  */
 package org.apache.atlas.web.security;
 
+import org.apache.atlas.web.TestUtils;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.commons.lang.RandomStringUtils;
 import org.apache.hadoop.minikdc.MiniKdc;
 import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.security.ssl.SSLHostnameVerifier;
@@ -35,10 +35,7 @@ import java.nio.file.Files;
 import java.util.Locale;
 import java.util.Properties;
 
-import static org.apache.atlas.security.SecurityProperties.CERT_STORES_CREDENTIAL_PROVIDER_PATH;
-import static org.apache.atlas.security.SecurityProperties.KEYSTORE_FILE_KEY;
-import static org.apache.atlas.security.SecurityProperties.TLS_ENABLED;
-import static org.apache.atlas.security.SecurityProperties.TRUSTSTORE_FILE_KEY;
+import static org.apache.atlas.security.SecurityProperties.*;
 
 /**
  *
@@ -110,8 +107,7 @@ public class BaseSecurityTest {
     }
 
     protected String getWarPath() {
-        return System.getProperty("projectBaseDir") + String.format("/webapp/target/atlas-webapp-%s",
-                System.getProperty("project.version"));
+        return TestUtils.getWarPath();
     }
 
     protected PropertiesConfiguration getSSLConfiguration(String providerUrl) {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerIT.java b/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerIT.java
index 7eb36d8..e1f9b54 100644
--- a/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerIT.java
@@ -17,6 +17,7 @@
 
 package org.apache.atlas.web.service;
 
+import org.apache.atlas.web.TestUtils;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -37,9 +38,7 @@ public class SecureEmbeddedServerIT extends SecureEmbeddedServerITBase {
 
         SecureEmbeddedServer secureEmbeddedServer = null;
         try {
-            String appPath = System.getProperty("user.dir") + getWarPath();
-
-            secureEmbeddedServer = new SecureEmbeddedServer(21443, appPath) {
+            secureEmbeddedServer = new SecureEmbeddedServer(21443, TestUtils.getWarPath()) {
                 @Override
                 protected PropertiesConfiguration getConfiguration() {
                     return configuration;

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerITBase.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerITBase.java b/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerITBase.java
index 9a5b8ad..f7c3625 100755
--- a/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerITBase.java
+++ b/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerITBase.java
@@ -19,6 +19,7 @@ package org.apache.atlas.web.service;
 import com.sun.jersey.api.client.Client;
 import com.sun.jersey.api.client.WebResource;
 import com.sun.jersey.api.client.config.DefaultClientConfig;
+import org.apache.atlas.web.TestUtils;
 import org.apache.atlas.web.resources.AdminJerseyResourceIT;
 import org.apache.atlas.web.resources.BaseResourceIT;
 import org.apache.atlas.web.resources.EntityJerseyResourceIT;
@@ -31,7 +32,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.alias.CredentialProvider;
 import org.apache.hadoop.security.alias.CredentialProviderFactory;
 import org.apache.hadoop.security.alias.JavaKeyStoreProvider;
-import org.eclipse.jetty.webapp.WebAppContext;
 import org.testng.Assert;
 import org.testng.TestListenerAdapter;
 import org.testng.TestNG;
@@ -45,11 +45,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 
-import static org.apache.atlas.security.SecurityProperties.CERT_STORES_CREDENTIAL_PROVIDER_PATH;
-import static org.apache.atlas.security.SecurityProperties.DEFAULT_KEYSTORE_FILE_LOCATION;
-import static org.apache.atlas.security.SecurityProperties.KEYSTORE_PASSWORD_KEY;
-import static org.apache.atlas.security.SecurityProperties.SERVER_CERT_PASSWORD_KEY;
-import static org.apache.atlas.security.SecurityProperties.TRUSTSTORE_PASSWORD_KEY;
+import static org.apache.atlas.security.SecurityProperties.*;
 
 /**
  * Secure Test class for jersey resources.
@@ -106,18 +102,13 @@ public class SecureEmbeddedServerITBase {
     public void testNoConfiguredCredentialProvider() throws Exception {
 
         try {
-            secureEmbeddedServer = new SecureEmbeddedServer(21443, "webapp/target/apache-atlas");
-            WebAppContext webapp = new WebAppContext();
-            webapp.setContextPath("/");
-            webapp.setWar(System.getProperty("user.dir") + getWarPath());
-            secureEmbeddedServer.server.setHandler(webapp);
-
+            secureEmbeddedServer = new SecureEmbeddedServer(21443, TestUtils.getWarPath());
             secureEmbeddedServer.server.start();
 
             Assert.fail("Should have thrown an exception");
         } catch (IOException e) {
-            Assert.assertEquals("No credential provider path configured for storage of certificate store passwords",
-                    e.getMessage());
+            Assert.assertEquals(e.getMessage(),
+                    "No credential provider path configured for storage of certificate store passwords");
         } finally {
             secureEmbeddedServer.server.stop();
         }
@@ -130,7 +121,7 @@ public class SecureEmbeddedServerITBase {
         configuration.setProperty(CERT_STORES_CREDENTIAL_PROVIDER_PATH, providerUrl);
 
         try {
-            secureEmbeddedServer = new SecureEmbeddedServer(21443, "webapp/target/apache-atlas") {
+            secureEmbeddedServer = new SecureEmbeddedServer(21443, TestUtils.getWarPath()) {
                 @Override
                 protected PropertiesConfiguration getConfiguration() {
                     return configuration;
@@ -157,17 +148,12 @@ public class SecureEmbeddedServerITBase {
         setupCredentials();
 
         try {
-            secureEmbeddedServer = new SecureEmbeddedServer(21443, "webapp/target/apache-atlas") {
+            secureEmbeddedServer = new SecureEmbeddedServer(21443, TestUtils.getWarPath()) {
                 @Override
                 protected PropertiesConfiguration getConfiguration() {
                     return configuration;
                 }
             };
-            WebAppContext webapp = new WebAppContext();
-            webapp.setContextPath("/");
-            webapp.setWar(System.getProperty("user.dir") + getWarPath());
-            secureEmbeddedServer.server.setHandler(webapp);
-
             secureEmbeddedServer.server.start();
 
             TestListenerAdapter tla = new TestListenerAdapter();
@@ -184,11 +170,6 @@ public class SecureEmbeddedServerITBase {
 
     }
 
-    protected String getWarPath() {
-        return String
-                .format("/target/atlas-webapp-%s", System.getProperty("project.version"));
-    }
-
     protected void setupCredentials() throws Exception {
         Configuration conf = new Configuration(false);