You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sh...@apache.org on 2015/08/05 11:51:09 UTC
incubator-atlas git commit: ATLAS-74 Create notification framework
(shwethags)
Repository: incubator-atlas
Updated Branches:
refs/heads/master 751b4c876 -> b627a681e
ATLAS-74 Create notification framework (shwethags)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/b627a681
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/b627a681
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/b627a681
Branch: refs/heads/master
Commit: b627a681edc795c71ccf13e75a79ef102e75a916
Parents: 751b4c8
Author: Shwetha GS <ss...@hortonworks.com>
Authored: Wed Aug 5 15:21:00 2015 +0530
Committer: Shwetha GS <ss...@hortonworks.com>
Committed: Wed Aug 5 15:21:00 2015 +0530
----------------------------------------------------------------------
addons/hive-bridge/pom.xml | 4 -
.../src/test/resources/application.properties | 64 -----
.../org/apache/atlas/ApplicationProperties.java | 10 +-
client/src/main/resources/client.properties | 37 +++
notification/pom.xml | 75 +++++
.../org/apache/atlas/kafka/KafkaConsumer.java | 50 ++++
.../apache/atlas/kafka/KafkaNotification.java | 284 +++++++++++++++++++
.../notification/NotificationConsumer.java | 32 +++
.../notification/NotificationException.java | 26 ++
.../notification/NotificationHookConsumer.java | 87 ++++++
.../notification/NotificationInterface.java | 77 +++++
.../atlas/notification/NotificationModule.java | 28 ++
.../atlas/kafka/KafkaNotificationTest.java | 68 +++++
pom.xml | 29 +-
release-log.txt | 1 +
.../src/test/resources/application.properties | 65 -----
src/conf/application.properties | 4 +
src/conf/client.properties | 10 +-
.../src/main/resources/application.properties | 47 ++-
typesystem/src/main/resources/log4j.xml | 26 +-
webapp/src/main/java/org/apache/atlas/Main.java | 27 +-
.../atlas/web/service/EmbeddedServer.java | 20 +-
.../src/main/resources/application.properties | 60 ----
webapp/src/main/resources/log4j.xml | 2 +-
.../java/org/apache/atlas/web/TestUtils.java | 5 +
.../atlas/web/security/BaseSecurityTest.java | 10 +-
.../web/service/SecureEmbeddedServerIT.java | 5 +-
.../web/service/SecureEmbeddedServerITBase.java | 33 +--
28 files changed, 906 insertions(+), 280 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/addons/hive-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/pom.xml b/addons/hive-bridge/pom.xml
index 66b5f74..914d8c6 100755
--- a/addons/hive-bridge/pom.xml
+++ b/addons/hive-bridge/pom.xml
@@ -256,10 +256,6 @@
<name>atlas.log.dir</name>
<value>${project.build.directory}/logs</value>
</systemProperty>
- <systemProperty>
- <name>atlas.conf</name>
- <value>${project.build.directory}/test-classes</value>
- </systemProperty>
</systemProperties>
<stopKey>atlas-stop</stopKey>
<stopPort>41001</stopPort>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/addons/hive-bridge/src/test/resources/application.properties
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/test/resources/application.properties b/addons/hive-bridge/src/test/resources/application.properties
deleted file mode 100644
index dda9a18..0000000
--- a/addons/hive-bridge/src/test/resources/application.properties
+++ /dev/null
@@ -1,64 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-######### Graph Database Configs #########
-#Refer http://s3.thinkaurelius.com/docs/titan/0.5.1/titan-config-ref.html
-# Graph Storage
-atlas.graph.storage.backend=${titan.storage.backend}
-
-#Berkeley storage directory
-atlas.graph.storage.directory=target/data/berkley
-
-#hbase
-#For standalone mode , specify localhost
-#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2
-atlas.graph.storage.hostname=${titan.storage.hostname}
-
-# Graph Search Index Backend
-atlas.graph.index.search.backend=${titan.index.backend}
-
-#lucene
-#atlas.graph.index.search.directory=target/data/lucene
-
-#elasticsearch
-atlas.graph.index.search.directory=./target/data/es
-atlas.graph.index.search.elasticsearch.client-only=false
-atlas.graph.index.search.elasticsearch.local-mode=true
-atlas.graph.index.search.elasticsearch.create.sleep=2000
-
-#solr in cloud mode
-atlas.graph.index.search.solr.mode=cloud
-atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address}
-
-#solr in http mode
-atlas.graph.index.search.solr.http-urls=http://localhost:8983/solr
-
-######### Hive Lineage Configs #########
-#atlas.lineage.hive.table.type.name=DataSet
-#atlas.lineage.hive.process.type.name=Process
-#atlas.lineage.hive.process.inputs.name=inputs
-#atlas.lineage.hive.process.outputs.name=outputs
-
-## Schema
-#atlas.lineage.hive.table.schema.query.hive_table=hive_table where name='%s'\, columns
-
-
-######### Security Properties #########
-
-# SSL config
-atlas.enableTLS=false
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/client/src/main/java/org/apache/atlas/ApplicationProperties.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/ApplicationProperties.java b/client/src/main/java/org/apache/atlas/ApplicationProperties.java
index 15cca47..738ec53 100644
--- a/client/src/main/java/org/apache/atlas/ApplicationProperties.java
+++ b/client/src/main/java/org/apache/atlas/ApplicationProperties.java
@@ -17,19 +17,17 @@
package org.apache.atlas;
-import org.apache.commons.configuration.AbstractConfiguration;
+import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.URL;
+import java.util.Arrays;
import java.util.Iterator;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
public class ApplicationProperties extends PropertiesConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(ApplicationProperties.class);
@@ -47,7 +45,9 @@ public class ApplicationProperties extends PropertiesConfiguration {
if (INSTANCE == null) {
synchronized (ApplicationProperties.class) {
if (INSTANCE == null) {
- INSTANCE = get(APPLICATION_PROPERTIES);
+ Configuration applicationProperties = get(APPLICATION_PROPERTIES);
+ Configuration clientProperties = get(CLIENT_PROPERTIES);
+ INSTANCE = new CompositeConfiguration(Arrays.asList(applicationProperties, clientProperties));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/client/src/main/resources/client.properties
----------------------------------------------------------------------
diff --git a/client/src/main/resources/client.properties b/client/src/main/resources/client.properties
new file mode 100755
index 0000000..722d029
--- /dev/null
+++ b/client/src/main/resources/client.properties
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######### Security Properties #########
+
+# SSL config
+
+atlas.enableTLS=false
+#truststore.file=/path/to/truststore.jks
+#cert.stores.credential.provider.path=jceks://file/path/to/credentialstore.jceks
+
+#following only required for 2-way SSL
+#keystore.file=/path/to/keystore.jks
+
+# Authentication config
+
+# enabled: true or false
+atlas.http.authentication.enabled=false
+# type: simple or kerberos
+atlas.http.authentication.type=simple
+
+######### Security Properties #########
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/notification/pom.xml
----------------------------------------------------------------------
diff --git a/notification/pom.xml b/notification/pom.xml
new file mode 100644
index 0000000..b036855
--- /dev/null
+++ b/notification/pom.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>apache-atlas</artifactId>
+ <groupId>org.apache.atlas</groupId>
+ <version>0.6-incubating-SNAPSHOT</version>
+ </parent>
+ <artifactId>atlas-notification</artifactId>
+ <description>Apache Atlas Client</description>
+ <name>Apache Atlas Notification</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>atlas-client</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>atlas-typesystem</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${scala.binary.version}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-configuration</groupId>
+ <artifactId>commons-configuration</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
new file mode 100644
index 0000000..70bb5d6
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.kafka;
+
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.message.MessageAndMetadata;
+import org.apache.atlas.notification.NotificationConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaConsumer implements NotificationConsumer {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
+
+ private final int consumerId;
+ private final ConsumerIterator iterator;
+
+ public KafkaConsumer(KafkaStream<String, String> stream, int consumerId) {
+ this.iterator = stream.iterator();
+ this.consumerId = consumerId;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public String next() {
+ MessageAndMetadata message = iterator.next();
+ LOG.debug("Read message: conumerId: {}, topic - {}, partition - {}, offset - {}, message - {}",
+ consumerId, message.topic(), message.partition(), message.offset(), message.message());
+ return (String) message.message();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
new file mode 100644
index 0000000..9978275
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.kafka;
+
+import com.google.inject.Singleton;
+import kafka.consumer.Consumer;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.serializer.StringDecoder;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.Time;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.atlas.notification.NotificationException;
+import org.apache.atlas.notification.NotificationInterface;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+@Singleton
+public class KafkaNotification extends NotificationInterface {
+ public static final Logger LOG = LoggerFactory.getLogger(KafkaNotification.class);
+
+ public static final String PROPERTY_PREFIX = NotificationInterface.PROPERTY_PREFIX + ".kafka";
+
+ private static final int ATLAS_ZK_PORT = 9026;
+ private static final int ATLAS_KAFKA_PORT = 9027;
+ private static final String ATLAS_KAFKA_DATA = "data";
+
+ public static final String ATLAS_HOOK_TOPIC = "ATLAS_HOOK";
+ public static final String ATLAS_ENTITIES_TOPIC = "ATLAS_ENTITIES";
+ public static final String ATLAS_TYPES_TOPIC = "ATLAS_TYPES";
+
+ private static final String ATLAS_GROUP = "atlas";
+ private KafkaServer kafkaServer;
+ private ServerCnxnFactory factory;
+ private Properties properties;
+
+ private KafkaProducer producer = null;
+ private List<ConsumerConnector> consumerConnectors = new ArrayList<>();
+
+ private KafkaConsumer consumer;
+
+ private static final Map<NotificationType, String> topicMap = new HashMap<NotificationType, String>() {{
+ put(NotificationType.HOOK, ATLAS_HOOK_TOPIC);
+ put(NotificationType.ENTITIES, ATLAS_ENTITIES_TOPIC);
+ put(NotificationType.TYPES, ATLAS_TYPES_TOPIC);
+ }};
+
+ private synchronized void createProducer() {
+ if (producer == null) {
+ producer = new KafkaProducer(properties);
+ }
+ }
+
+ @Override
+ public void initialize(Configuration applicationProperties) throws AtlasException {
+ super.initialize(applicationProperties);
+ Configuration subsetConfiguration =
+ ApplicationProperties.getSubsetConfiguration(applicationProperties, PROPERTY_PREFIX);
+ properties = ConfigurationConverter.getProperties(subsetConfiguration);
+ //override to store offset in kafka
+ //todo do we need ability to replay?
+
+ //Override default configs
+ properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringSerializer");
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringSerializer");
+
+ properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+
+ //todo take group id as argument to allow multiple consumers??
+ properties.put(ConsumerConfig.GROUP_ID_CONFIG, ATLAS_GROUP);
+ properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+ properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+ properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "roundrobin");
+ properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "smallest");
+
+ if (isEmbedded()) {
+ properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + ATLAS_KAFKA_PORT);
+ properties.setProperty("zookeeper.connect", "localhost:" + ATLAS_ZK_PORT);
+ }
+
+ //todo new APIs not available yet
+// consumer = new KafkaConsumer(properties);
+// consumer.subscribe(ATLAS_HOOK_TOPIC);
+ }
+
+ @Override
+ protected void _startService() throws IOException {
+ startZk();
+ startKafka();
+ }
+
+ private String startZk() throws IOException {
+ //todo read zk endpoint from config
+ this.factory = NIOServerCnxnFactory.createFactory(new InetSocketAddress("0.0.0.0", ATLAS_ZK_PORT), 1024);
+ File snapshotDir = constructDir("zk/txn");
+ File logDir = constructDir("zk/snap");
+
+ try {
+ factory.startup(new ZooKeeperServer(snapshotDir, logDir, 500));
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ return factory.getLocalAddress().getAddress().toString();
+ }
+
+ private void startKafka() {
+ Properties brokerConfig = properties;
+ brokerConfig.setProperty("broker.id", "1");
+ //todo read kafka endpoint from config
+ brokerConfig.setProperty("host.name", "0.0.0.0");
+ brokerConfig.setProperty("port", String.valueOf(ATLAS_KAFKA_PORT));
+ brokerConfig.setProperty("log.dirs", constructDir("kafka").getAbsolutePath());
+ brokerConfig.setProperty("log.flush.interval.messages", String.valueOf(1));
+
+ kafkaServer = new KafkaServer(new KafkaConfig(brokerConfig), new SystemTime());
+ kafkaServer.startup();
+ LOG.debug("Embedded kafka server started with broker config {}", brokerConfig);
+ }
+
+ private static class SystemTime implements Time {
+ @Override
+ public long milliseconds() {
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ public long nanoseconds() {
+ return System.nanoTime();
+ }
+
+ @Override
+ public void sleep(long arg0) {
+ try {
+ Thread.sleep(arg0);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private File constructDir(String dirPrefix) {
+ File file = new File(properties.getProperty(ATLAS_KAFKA_DATA), dirPrefix);
+ if (!file.exists() && !file.mkdirs()) {
+ throw new RuntimeException("could not create temp directory: " + file.getAbsolutePath());
+ }
+ return file;
+ }
+
+ @Override
+ public void _shutdown() {
+ if (producer != null) {
+ producer.close();
+ }
+
+ if (consumer != null) {
+ consumer.close();
+ }
+
+ for (ConsumerConnector consumerConnector : consumerConnectors) {
+ consumerConnector.shutdown();
+ }
+
+ if (kafkaServer != null) {
+ kafkaServer.shutdown();
+ }
+
+ if (factory != null) {
+ factory.shutdown();
+ }
+ }
+
+ @Override
+ public List<NotificationConsumer> createConsumers(NotificationType type, int numConsumers) {
+ String topic = topicMap.get(type);
+
+ ConsumerConnector consumerConnector =
+ Consumer.createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(properties));
+ Map<String, Integer> topicCountMap = new HashMap<>();
+ topicCountMap.put(topic, numConsumers);
+ StringDecoder decoder = new StringDecoder(null);
+ Map<String, List<KafkaStream<String, String>>> streamsMap =
+ consumerConnector.createMessageStreams(topicCountMap, decoder, decoder);
+ List<KafkaStream<String, String>> kafkaConsumers = streamsMap.get(topic);
+ List<NotificationConsumer> consumers = new ArrayList<>(numConsumers);
+ int consumerId = 0;
+ for (KafkaStream stream : kafkaConsumers) {
+ consumers.add(new org.apache.atlas.kafka.KafkaConsumer(stream, consumerId++));
+ }
+ consumerConnectors.add(consumerConnector);
+
+ return consumers;
+ }
+
+ @Override
+ public void send(NotificationType type, String... messages) throws NotificationException {
+ if (producer == null) {
+ createProducer();
+ }
+
+ String topic = topicMap.get(type);
+ List<Future<RecordMetadata>> futures = new ArrayList<>();
+ for (String message : messages) {
+ ProducerRecord record = new ProducerRecord(topic, message);
+ LOG.debug("Sending message for topic {}: {}", topic, message);
+ futures.add(producer.send(record));
+ }
+
+ for (Future<RecordMetadata> future : futures) {
+ try {
+ RecordMetadata response = future.get();
+ LOG.debug("Sent message for topic - {}, partition - {}, offset - {}", response.topic(),
+ response.partition(), response.offset());
+ } catch (Exception e) {
+ throw new NotificationException(e);
+ }
+ }
+ }
+
+ //New API, not used now
+ private List<String> receive(long timeout) throws NotificationException {
+ Map<String, ConsumerRecords> recordsMap = consumer.poll(timeout);
+ List<String> messages = new ArrayList<>();
+ if (recordsMap != null) {
+ for (ConsumerRecords records : recordsMap.values()) {
+ List<ConsumerRecord> recordList = records.records();
+ for (ConsumerRecord record : recordList) {
+ try {
+ String message = (String) record.value();
+ LOG.debug("Received message from topic {}: {}", ATLAS_HOOK_TOPIC, message);
+ messages.add(message);
+ } catch (Exception e) {
+ throw new NotificationException(e);
+ }
+ }
+ }
+ }
+ return messages;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
new file mode 100644
index 0000000..c3ac23b
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+public interface NotificationConsumer {
+ /**
+ * If there are more messages
+ * @return
+ */
+ boolean hasNext();
+
+ /**
+ * Next message - blocking call
+ * @return
+ */
+ String next();
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/notification/src/main/java/org/apache/atlas/notification/NotificationException.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationException.java b/notification/src/main/java/org/apache/atlas/notification/NotificationException.java
new file mode 100644
index 0000000..e6b02fb
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import org.apache.atlas.AtlasException;
+
+public class NotificationException extends AtlasException {
+ public NotificationException(Exception e) {
+ super(e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
new file mode 100644
index 0000000..36a62f0
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import com.google.inject.Inject;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class NotificationHookConsumer {
+ private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
+
+ public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
+ public static final String ATLAS_ENDPOINT_PROPERTY = "atlas.rest.address";
+
+ @Inject
+ private static NotificationInterface notificationInterface;
+
+ private static ExecutorService executors;
+ private static AtlasClient atlasClient;
+
+ public static void start() throws AtlasException {
+ Configuration applicationProperties = ApplicationProperties.get();
+ notificationInterface.initialize(applicationProperties);
+
+ String atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000");
+ atlasClient = new AtlasClient(atlasEndpoint);
+ int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 2);
+ List<NotificationConsumer> consumers =
+ notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads);
+ executors = Executors.newFixedThreadPool(consumers.size());
+
+ for (final NotificationConsumer consumer : consumers) {
+ executors.submit(new HookConsumer(consumer));
+ }
+ }
+
+ public static void stop() {
+ notificationInterface.shutdown();
+ executors.shutdown();
+ }
+
+ static class HookConsumer implements Runnable {
+ private final NotificationConsumer consumer;
+
+ public HookConsumer(NotificationConsumer consumerInterface) {
+ this.consumer = consumerInterface;
+ }
+
+ @Override
+ public void run() {
+ while(consumer.hasNext()) {
+ String entityJson = consumer.next();
+ LOG.debug("Processing message {}", entityJson);
+ try {
+ atlasClient.createEntity(entityJson);
+ } catch (AtlasServiceException e) {
+ //todo handle failures
+ LOG.warn("Error handling message {}", entityJson);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
new file mode 100644
index 0000000..0951124
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import org.apache.atlas.AtlasException;
+import org.apache.commons.configuration.Configuration;
+
+import java.io.IOException;
+import java.util.List;
+
+public abstract class NotificationInterface {
+ public static final String PROPERTY_PREFIX = "atlas.notification";
+ private static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded";
+ private boolean embedded;
+
+
+ public enum NotificationType {
+ HOOK, ENTITIES, TYPES
+ }
+
+ /**
+ * Initialise
+ * @param applicationProperties
+ * @throws AtlasException
+ */
+ public void initialize(Configuration applicationProperties) throws AtlasException {
+ this.embedded = applicationProperties.getBoolean(PROPERTY_EMBEDDED, false);
+ }
+
+ /**
+ * Start embedded notification service on atlast server
+ * @throws IOException
+ */
+ public final void startService() throws IOException {
+ if (embedded) {
+ _startService();
+ }
+ }
+
+ /**
+ * Is the notification service embedded in atlas server
+ * @return
+ */
+ protected final boolean isEmbedded() {
+ return embedded;
+ }
+
+ protected abstract void _startService() throws IOException;
+
+ /**
+ * Shutdown - close all the connections
+ */
+ public final void shutdown() {
+ _shutdown();
+ }
+
+ protected abstract void _shutdown();
+
+ public abstract List<NotificationConsumer> createConsumers(NotificationType type, int numConsumers);
+
+ public abstract void send(NotificationType type, String... messages) throws NotificationException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java b/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java
new file mode 100644
index 0000000..db17e35
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import com.google.inject.AbstractModule;
+import org.apache.atlas.kafka.KafkaNotification;
+
+public class NotificationModule extends AbstractModule {
+ @Override
+ protected void configure() {
+ bind(NotificationInterface.class).to(KafkaNotification.class).asEagerSingleton();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
new file mode 100644
index 0000000..02752dc
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.kafka;
+
+import com.google.inject.Inject;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.atlas.notification.NotificationInterface;
+import org.apache.atlas.notification.NotificationModule;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.RandomStringUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+@Guice(modules = NotificationModule.class)
+public class KafkaNotificationTest {
+
+ @Inject
+ private NotificationInterface kafka;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ Configuration conf = ApplicationProperties.get();
+ conf.setProperty(KafkaNotification.PROPERTY_PREFIX + ".data", "target/data/kafka" + random());
+ kafka.initialize(conf);
+ kafka.startService();
+ }
+
+ @Test
+ public void testSendMessage() throws AtlasException {
+ String msg1 = "message" + random();
+ String msg2 = "message" + random();
+ kafka.send(NotificationInterface.NotificationType.HOOK, msg1, msg2);
+ NotificationConsumer consumer = kafka.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0);
+ Assert.assertTrue(consumer.hasNext());
+ Assert.assertEquals(msg1, consumer.next());
+ Assert.assertTrue(consumer.hasNext());
+ Assert.assertEquals(msg2, consumer.next());
+ }
+
+ private String random() {
+ return RandomStringUtils.randomAlphanumeric(5);
+ }
+
+ @AfterClass
+ public void teardown() throws Exception {
+ kafka.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 66182e7..facd539 100755
--- a/pom.xml
+++ b/pom.xml
@@ -329,7 +329,8 @@
<titan.version>0.5.4</titan.version>
<hadoop.version>2.7.0</hadoop.version>
<hbase.version>0.98.9-hadoop2</hbase.version>
-
+ <kafka.version>0.8.2.0</kafka.version>
+
<!-- scala versions -->
<scala.version>2.10.4</scala.version>
<scala.binary.version>2.10</scala.binary.version>
@@ -420,6 +421,7 @@
</profiles>
<modules>
<module>typesystem</module>
+ <module>notification</module>
<module>client</module>
<module>repository</module>
<module>webapp</module>
@@ -933,6 +935,12 @@
<dependency>
<groupId>org.apache.atlas</groupId>
+ <artifactId>atlas-notification</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
<artifactId>atlas-client</artifactId>
<version>${project.version}</version>
</dependency>
@@ -1114,6 +1122,25 @@
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
+
+ <!-- kafka -->
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${scala.binary.version}</artifactId>
+ <version>${kafka.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
</dependencyManagement>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 4e53ad5..a2d2b68 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -8,6 +8,7 @@ ATLAS-54 Rename configs in hive hook (shwethags)
ATLAS-3 Mixed Index creation fails with Date types (suma.shivaprasad via shwethags)
ALL CHANGES:
+ATLAS-74 Create notification framework (shwethags)
ATLAS-93 import-hive.sh reports FileNotFoundException (shwethags)
ATLAS-92 import-hive.sh failed to find HiveMetaStoreBridge (airbots via shwethags)
ATLAS-16 jersey jaxb exception (shwethags)
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/repository/src/test/resources/application.properties
----------------------------------------------------------------------
diff --git a/repository/src/test/resources/application.properties b/repository/src/test/resources/application.properties
deleted file mode 100755
index d0eaa8c..0000000
--- a/repository/src/test/resources/application.properties
+++ /dev/null
@@ -1,65 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-######### Graph Database Configs #########
-#Refer http://s3.thinkaurelius.com/docs/titan/0.5.1/titan-config-ref.html
-# Graph Storage
-
-atlas.graph.storage.backend=${titan.storage.backend}
-
-#Berkeley storage directory
-atlas.graph.storage.directory=target/data/berkley
-
-#hbase
-#For standalone mode , specify localhost
-#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2
-atlas.graph.storage.hostname=${titan.storage.hostname}
-
-# Graph Search Index Backend
-atlas.graph.index.search.backend=${titan.index.backend}
-
-#lucene
-#atlas.graph.index.search.directory=target/data/lucene
-
-#elasticsearch
-atlas.graph.index.search.directory=./target/data/es
-atlas.graph.index.search.elasticsearch.client-only=false
-atlas.graph.index.search.elasticsearch.local-mode=true
-atlas.graph.index.search.elasticsearch.create.sleep=2000
-
-#solr in cloud mode
-atlas.graph.index.search.solr.mode=cloud
-atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address}
-
-#solr in http mode
-atlas.graph.index.search.solr.http-urls=http://localhost:8983/solr
-
-######### Hive Lineage Configs #########
-#atlas.lineage.hive.table.type.name=DataSet
-#atlas.lineage.hive.process.type.name=Process
-#atlas.lineage.hive.process.inputs.name=inputs
-#atlas.lineage.hive.process.outputs.name=outputs
-
-## Schema
-atlas.lineage.hive.table.schema.query.hive_table=hive_table where name='%s'\, columns
-
-
-######### Security Properties #########
-
-# SSL config
-atlas.enableTLS=false
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/src/conf/application.properties
----------------------------------------------------------------------
diff --git a/src/conf/application.properties b/src/conf/application.properties
index 5487749..bf323a7 100755
--- a/src/conf/application.properties
+++ b/src/conf/application.properties
@@ -45,6 +45,10 @@ atlas.graph.index.search.elasticsearch.client-only=false
atlas.graph.index.search.elasticsearch.local-mode=true
atlas.graph.index.search.elasticsearch.create.sleep=2000
+######### Notification Configs #########
+atlas.notification.embedded=true
+atlas.notification.kafka.data=${sys:atlas.home}/data/kafka
+
######### Hive Lineage Configs #########
# This models reflects the base super types for Data and Process
#atlas.lineage.hive.table.type.name=DataSet
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/src/conf/client.properties
----------------------------------------------------------------------
diff --git a/src/conf/client.properties b/src/conf/client.properties
index b64755f..ab6ef2f 100755
--- a/src/conf/client.properties
+++ b/src/conf/client.properties
@@ -21,10 +21,12 @@
# SSL config
atlas.enableTLS=false
-truststore.file=/path/to/truststore.jks
-cert.stores.credential.provider.path=jceks://file/path/to/credentialstore.jceks
-# following only required for 2-way SSL
-keystore.file=/path/to/keystore.jks
+
+#truststore.file=/path/to/truststore.jks
+#cert.stores.credential.provider.path=jceks://file/path/to/credentialstore.jceks
+
+#following only required for 2-way SSL
+#keystore.file=/path/to/keystore.jks
# Authentication config
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/typesystem/src/main/resources/application.properties
----------------------------------------------------------------------
diff --git a/typesystem/src/main/resources/application.properties b/typesystem/src/main/resources/application.properties
index 29c933f..f7e2774 100644
--- a/typesystem/src/main/resources/application.properties
+++ b/typesystem/src/main/resources/application.properties
@@ -18,8 +18,47 @@
######### Graph Database Configs #########
# Graph Storage
-atlas.graph.storage.backend=inmemory
+atlas.graph.storage.backend=${titan.storage.backend}
-# Graph Search Index
-atlas.graph.index.search.backend=lucene
-atlas.graph.index.search.directory=target/data/lucene
+# Graph Search Index Backend
+atlas.graph.index.search.backend=${titan.index.backend}
+
+#Berkeley storage directory
+atlas.graph.storage.directory=target/data/berkley
+
+#hbase
+#For standalone mode , specify localhost
+#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2
+atlas.graph.storage.hostname=${titan.storage.hostname}
+
+#ElasticSearch
+atlas.graph.index.search.directory=target/data/es
+atlas.graph.index.search.elasticsearch.client-only=false
+atlas.graph.index.search.elasticsearch.local-mode=true
+atlas.graph.index.search.elasticsearch.create.sleep=2000
+
+# Solr cloud mode properties
+atlas.graph.index.search.solr.mode=cloud
+atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address}
+
+######### Hive Lineage Configs #########
+# This models reflects the base super types for Data and Process
+#atlas.lineage.hive.table.type.name=DataSet
+#atlas.lineage.hive.process.type.name=Process
+#atlas.lineage.hive.process.inputs.name=inputs
+#atlas.lineage.hive.process.outputs.name=outputs
+
+## Schema
+atlas.lineage.hive.table.schema.query.hive_table=hive_table where name='%s'\, columns
+
+######### Notification Configs #########
+atlas.notification.embedded=true
+atlas.notification.implementation=org.apache.atlas.kafka.KafkaNotification
+atlas.notification.kafka.data=target/data/kafka
+
+######### Security Properties #########
+
+# SSL config
+atlas.enableTLS=false
+
+######### Security Properties #########
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/typesystem/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/typesystem/src/main/resources/log4j.xml b/typesystem/src/main/resources/log4j.xml
index 999caad..528881a 100755
--- a/typesystem/src/main/resources/log4j.xml
+++ b/typesystem/src/main/resources/log4j.xml
@@ -27,15 +27,6 @@
</layout>
</appender>
- <appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender">
- <param name="File" value="${user.dir}/target/logs/application.log"/>
- <param name="Append" value="true"/>
- <param name="Threshold" value="debug"/>
- <layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%c{1}:%L)%n"/>
- </layout>
- </appender>
-
<appender name="AUDIT" class="org.apache.log4j.DailyRollingFileAppender">
<param name="File" value="${user.dir}/target/logs/audit.log"/>
<param name="Append" value="true"/>
@@ -55,23 +46,8 @@
<appender-ref ref="console"/>
</logger>
- <logger name="com.thinkaurelius.titan" additivity="false">
- <level value="warn"/>
- <appender-ref ref="console"/>
- </logger>
-
- <logger name="org.elasticsearch" additivity="false">
- <level value="warn"/>
- <appender-ref ref="console"/>
- </logger>
-
- <logger name="org.apache.lucene" additivity="false">
- <level value="warn"/>
- <appender-ref ref="console"/>
- </logger>
-
<root>
- <priority value="info"/>
+ <priority value="warn"/>
<appender-ref ref="console"/>
</root>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/webapp/src/main/java/org/apache/atlas/Main.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/Main.java b/webapp/src/main/java/org/apache/atlas/Main.java
index b71ca82..7b13f3d 100755
--- a/webapp/src/main/java/org/apache/atlas/Main.java
+++ b/webapp/src/main/java/org/apache/atlas/Main.java
@@ -41,10 +41,27 @@ public final class Main {
private static final String APP_PORT = "port";
private static final String ATLAS_HOME = "atlas.home";
private static final String ATLAS_LOG_DIR = "atlas.log.dir";
- public static final String ATLAS_SERVER_HTTPS_PORT =
- "atlas.server.https.port";
- public static final String ATLAS_SERVER_HTTP_PORT =
- "atlas.server.http.port";
+ public static final String ATLAS_SERVER_HTTPS_PORT = "atlas.server.https.port";
+ public static final String ATLAS_SERVER_HTTP_PORT = "atlas.server.http.port";
+
+ private static EmbeddedServer server;
+
+ static {
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ try {
+ shutdown();
+ } catch (Exception e) {
+ LOG.debug("Failed to shutdown", e);
+ }
+ }
+ });
+ }
+
+ private static void shutdown() {
+ server.stop();
+ }
/**
* Prevent users from constructing this.
@@ -84,7 +101,7 @@ public final class Main {
configuration.setProperty("atlas.enableTLS", String.valueOf(enableTLS));
showStartupInfo(buildConfiguration, enableTLS, appPort);
- EmbeddedServer server = EmbeddedServer.newServer(appPort, appPath, enableTLS);
+ server = EmbeddedServer.newServer(appPort, appPath, enableTLS);
server.start();
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java b/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java
index 80df87d..871d857 100755
--- a/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java
+++ b/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java
@@ -18,14 +18,16 @@
package org.apache.atlas.web.service;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.commons.configuration.Configuration;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.webapp.WebAppContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
@@ -33,6 +35,8 @@ import java.io.IOException;
* This class embeds a Jetty server and a connector.
*/
public class EmbeddedServer {
+ public static final Logger LOG = LoggerFactory.getLogger(EmbeddedServer.class);
+
private static final int DEFAULT_BUFFER_SIZE = 16192;
protected final Server server = new Server();
@@ -71,9 +75,9 @@ public class EmbeddedServer {
protected Integer getBufferSize() {
try {
- PropertiesConfiguration configuration = new PropertiesConfiguration("application.properties");
+ Configuration configuration = ApplicationProperties.get();
return configuration.getInt("atlas.jetty.request.buffer.size", DEFAULT_BUFFER_SIZE);
- } catch (ConfigurationException e) {
+ } catch (Exception e) {
// do nothing
}
@@ -85,7 +89,11 @@ public class EmbeddedServer {
server.join();
}
- public void stop() throws Exception {
- server.stop();
+ public void stop() {
+ try {
+ server.stop();
+ } catch (Exception e) {
+ LOG.warn("Error during shutdown", e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/webapp/src/main/resources/application.properties
----------------------------------------------------------------------
diff --git a/webapp/src/main/resources/application.properties b/webapp/src/main/resources/application.properties
deleted file mode 100755
index ecfdc38..0000000
--- a/webapp/src/main/resources/application.properties
+++ /dev/null
@@ -1,60 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-######### Graph Database Configs #########
-# Graph Storage
-atlas.graph.storage.backend=${titan.storage.backend}
-
-# Graph Search Index Backend
-atlas.graph.index.search.backend=${titan.index.backend}
-
-#Berkeley storage directory
-atlas.graph.storage.directory=target/data/berkley
-
-#hbase
-#For standalone mode , specify localhost
-#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2
-atlas.graph.storage.hostname=${titan.storage.hostname}
-
-#ElasticSearch
-atlas.graph.index.search.directory=target/data/es
-atlas.graph.index.search.elasticsearch.client-only=false
-atlas.graph.index.search.elasticsearch.local-mode=true
-atlas.graph.index.search.elasticsearch.create.sleep=2000
-
-# Solr cloud mode properties
-atlas.graph.index.search.solr.mode=cloud
-atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address}
-
-######### Hive Lineage Configs #########
-# This models reflects the base super types for Data and Process
-#atlas.lineage.hive.table.type.name=DataSet
-#atlas.lineage.hive.process.type.name=Process
-#atlas.lineage.hive.process.inputs.name=inputs
-#atlas.lineage.hive.process.outputs.name=outputs
-
-## Schema
-atlas.lineage.hive.table.schema.query.hive_table=hive_table where name='%s'\, columns
-
-
-######### Security Properties #########
-
-# SSL config
-atlas.enableTLS=false
-
-######### Security Properties #########
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/webapp/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/webapp/src/main/resources/log4j.xml b/webapp/src/main/resources/log4j.xml
index 5c42c98..7827c1a 100755
--- a/webapp/src/main/resources/log4j.xml
+++ b/webapp/src/main/resources/log4j.xml
@@ -57,7 +57,7 @@
</logger>
<root>
- <priority value="info"/>
+ <priority value="warn"/>
<appender-ref ref="FILE"/>
</root>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/webapp/src/test/java/org/apache/atlas/web/TestUtils.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/TestUtils.java b/webapp/src/test/java/org/apache/atlas/web/TestUtils.java
index ede041e..47bea1f 100644
--- a/webapp/src/test/java/org/apache/atlas/web/TestUtils.java
+++ b/webapp/src/test/java/org/apache/atlas/web/TestUtils.java
@@ -46,4 +46,9 @@ public class TestUtils {
public static String getTempDirectory() {
return System.getProperty("projectBaseDir") + "/webapp/target/" + random();
}
+
+ public static String getWarPath() {
+ return System.getProperty("projectBaseDir") + String.format("/webapp/target/atlas-webapp-%s",
+ System.getProperty("project.version"));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/webapp/src/test/java/org/apache/atlas/web/security/BaseSecurityTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/security/BaseSecurityTest.java b/webapp/src/test/java/org/apache/atlas/web/security/BaseSecurityTest.java
index 614638c..8af4a7e 100644
--- a/webapp/src/test/java/org/apache/atlas/web/security/BaseSecurityTest.java
+++ b/webapp/src/test/java/org/apache/atlas/web/security/BaseSecurityTest.java
@@ -16,9 +16,9 @@
*/
package org.apache.atlas.web.security;
+import org.apache.atlas.web.TestUtils;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.ssl.SSLHostnameVerifier;
@@ -35,10 +35,7 @@ import java.nio.file.Files;
import java.util.Locale;
import java.util.Properties;
-import static org.apache.atlas.security.SecurityProperties.CERT_STORES_CREDENTIAL_PROVIDER_PATH;
-import static org.apache.atlas.security.SecurityProperties.KEYSTORE_FILE_KEY;
-import static org.apache.atlas.security.SecurityProperties.TLS_ENABLED;
-import static org.apache.atlas.security.SecurityProperties.TRUSTSTORE_FILE_KEY;
+import static org.apache.atlas.security.SecurityProperties.*;
/**
*
@@ -110,8 +107,7 @@ public class BaseSecurityTest {
}
protected String getWarPath() {
- return System.getProperty("projectBaseDir") + String.format("/webapp/target/atlas-webapp-%s",
- System.getProperty("project.version"));
+ return TestUtils.getWarPath();
}
protected PropertiesConfiguration getSSLConfiguration(String providerUrl) {
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerIT.java b/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerIT.java
index 7eb36d8..e1f9b54 100644
--- a/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerIT.java
@@ -17,6 +17,7 @@
package org.apache.atlas.web.service;
+import org.apache.atlas.web.TestUtils;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -37,9 +38,7 @@ public class SecureEmbeddedServerIT extends SecureEmbeddedServerITBase {
SecureEmbeddedServer secureEmbeddedServer = null;
try {
- String appPath = System.getProperty("user.dir") + getWarPath();
-
- secureEmbeddedServer = new SecureEmbeddedServer(21443, appPath) {
+ secureEmbeddedServer = new SecureEmbeddedServer(21443, TestUtils.getWarPath()) {
@Override
protected PropertiesConfiguration getConfiguration() {
return configuration;
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b627a681/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerITBase.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerITBase.java b/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerITBase.java
index 9a5b8ad..f7c3625 100755
--- a/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerITBase.java
+++ b/webapp/src/test/java/org/apache/atlas/web/service/SecureEmbeddedServerITBase.java
@@ -19,6 +19,7 @@ package org.apache.atlas.web.service;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.DefaultClientConfig;
+import org.apache.atlas.web.TestUtils;
import org.apache.atlas.web.resources.AdminJerseyResourceIT;
import org.apache.atlas.web.resources.BaseResourceIT;
import org.apache.atlas.web.resources.EntityJerseyResourceIT;
@@ -31,7 +32,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
import org.apache.hadoop.security.alias.JavaKeyStoreProvider;
-import org.eclipse.jetty.webapp.WebAppContext;
import org.testng.Assert;
import org.testng.TestListenerAdapter;
import org.testng.TestNG;
@@ -45,11 +45,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
-import static org.apache.atlas.security.SecurityProperties.CERT_STORES_CREDENTIAL_PROVIDER_PATH;
-import static org.apache.atlas.security.SecurityProperties.DEFAULT_KEYSTORE_FILE_LOCATION;
-import static org.apache.atlas.security.SecurityProperties.KEYSTORE_PASSWORD_KEY;
-import static org.apache.atlas.security.SecurityProperties.SERVER_CERT_PASSWORD_KEY;
-import static org.apache.atlas.security.SecurityProperties.TRUSTSTORE_PASSWORD_KEY;
+import static org.apache.atlas.security.SecurityProperties.*;
/**
* Secure Test class for jersey resources.
@@ -106,18 +102,13 @@ public class SecureEmbeddedServerITBase {
public void testNoConfiguredCredentialProvider() throws Exception {
try {
- secureEmbeddedServer = new SecureEmbeddedServer(21443, "webapp/target/apache-atlas");
- WebAppContext webapp = new WebAppContext();
- webapp.setContextPath("/");
- webapp.setWar(System.getProperty("user.dir") + getWarPath());
- secureEmbeddedServer.server.setHandler(webapp);
-
+ secureEmbeddedServer = new SecureEmbeddedServer(21443, TestUtils.getWarPath());
secureEmbeddedServer.server.start();
Assert.fail("Should have thrown an exception");
} catch (IOException e) {
- Assert.assertEquals("No credential provider path configured for storage of certificate store passwords",
- e.getMessage());
+ Assert.assertEquals(e.getMessage(),
+ "No credential provider path configured for storage of certificate store passwords");
} finally {
secureEmbeddedServer.server.stop();
}
@@ -130,7 +121,7 @@ public class SecureEmbeddedServerITBase {
configuration.setProperty(CERT_STORES_CREDENTIAL_PROVIDER_PATH, providerUrl);
try {
- secureEmbeddedServer = new SecureEmbeddedServer(21443, "webapp/target/apache-atlas") {
+ secureEmbeddedServer = new SecureEmbeddedServer(21443, TestUtils.getWarPath()) {
@Override
protected PropertiesConfiguration getConfiguration() {
return configuration;
@@ -157,17 +148,12 @@ public class SecureEmbeddedServerITBase {
setupCredentials();
try {
- secureEmbeddedServer = new SecureEmbeddedServer(21443, "webapp/target/apache-atlas") {
+ secureEmbeddedServer = new SecureEmbeddedServer(21443, TestUtils.getWarPath()) {
@Override
protected PropertiesConfiguration getConfiguration() {
return configuration;
}
};
- WebAppContext webapp = new WebAppContext();
- webapp.setContextPath("/");
- webapp.setWar(System.getProperty("user.dir") + getWarPath());
- secureEmbeddedServer.server.setHandler(webapp);
-
secureEmbeddedServer.server.start();
TestListenerAdapter tla = new TestListenerAdapter();
@@ -184,11 +170,6 @@ public class SecureEmbeddedServerITBase {
}
- protected String getWarPath() {
- return String
- .format("/target/atlas-webapp-%s", System.getProperty("project.version"));
- }
-
protected void setupCredentials() throws Exception {
Configuration conf = new Configuration(false);