You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/06/28 05:57:19 UTC

[06/25] incubator-atlas git commit: ATLAS-1898: initial commit of ODF

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceTest.java b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceTest.java
new file mode 100755
index 0000000..7a180d2
--- /dev/null
+++ b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceTest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.messaging.kafka;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisManager;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus.State;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.ODFTestcase;
+import org.apache.atlas.odf.core.test.controlcenter.ODFAPITest;
+
+public class ParallelServiceTest extends ODFTestcase {
+	private static final int NUMBER_OF_QUEUED_REQUESTS = 1;
+	Logger log = ODFTestLogger.get();
+
+	@Test
+	public void runDataSetsInParallelSuccess() throws Exception {
+		runDataSetsInParallelAndCheckResult(Arrays.asList(new String[] { "successID1", "successID2" }), State.FINISHED, State.FINISHED);
+	}
+
+	private void runDataSetsInParallelAndCheckResult(List<String> dataSetIDs, State... expectedState) throws Exception {
+		log.info("Running data sets in parallel: " + dataSetIDs);
+		log.info("Expected state: " + expectedState);
+		AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager();
+
+		List<AnalysisRequest> requests = new ArrayList<AnalysisRequest>();
+		List<AnalysisResponse> responses = new ArrayList<AnalysisResponse>();
+		List<String> idList = new ArrayList<String>();
+
+		for (int no = 0; no < NUMBER_OF_QUEUED_REQUESTS; no++) {
+			for (String dataSet : dataSetIDs) {
+				final AnalysisRequest req = ODFAPITest.createAnalysisRequest(Arrays.asList(dataSet + UUID.randomUUID().toString()));
+				AnalysisResponse resp = analysisManager.runAnalysis(req);
+				req.setId(resp.getId());
+				requests.add(req);
+				idList.add(resp.getId());
+				responses.add(resp);
+			}
+		}
+		log.info("Parallel requests started: " + idList.toString());
+
+		Assert.assertEquals(NUMBER_OF_QUEUED_REQUESTS * dataSetIDs.size(), requests.size());
+		Assert.assertEquals(NUMBER_OF_QUEUED_REQUESTS * dataSetIDs.size(), responses.size());
+
+		// check that requests are processed in parallel: 
+		//   there must be a point in time where both requests are in status "active"
+		log.info("Polling for status of parallel request...");
+		boolean foundPointInTimeWhereBothRequestsAreActive = false;
+		int maxPolls = ODFAPITest.MAX_NUMBER_OF_POLLS;
+		List<State> allSingleStates = new ArrayList<AnalysisRequestStatus.State>();
+		do {
+			int foundActive = 0;
+			allSingleStates.clear();
+			for (AnalysisRequest request : requests) {
+				final State state = analysisManager.getAnalysisRequestStatus(request.getId()).getState();
+				if (state == State.ACTIVE) {
+					log.info("ACTIVE: " + request.getId() + " foundactive: " + foundActive);
+					foundActive++;
+				} else {
+					log.info("NOT ACTIVE " + request.getId() + " _ " + state);
+				}
+				allSingleStates.add(state);
+			}
+			if (foundActive > 1) {
+				foundPointInTimeWhereBothRequestsAreActive = true;
+			}
+
+			maxPolls--;
+			Thread.sleep(ODFAPITest.WAIT_MS_BETWEEN_POLLING);
+		} while (maxPolls > 0 && Utils.containsNone(allSingleStates, new State[] { State.ACTIVE, State.QUEUED }));
+
+		Assert.assertTrue(maxPolls > 0);
+		Assert.assertTrue(foundPointInTimeWhereBothRequestsAreActive);
+		Assert.assertTrue(allSingleStates.containsAll(Arrays.asList(expectedState)));
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestEnvironmentMessagingInitializer.java
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestEnvironmentMessagingInitializer.java b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestEnvironmentMessagingInitializer.java
new file mode 100755
index 0000000..5e3d97e
--- /dev/null
+++ b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestEnvironmentMessagingInitializer.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.messaging.kafka;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.core.test.TestEnvironmentInitializer;
+
+public class TestEnvironmentMessagingInitializer implements TestEnvironmentInitializer {
+
+	public TestEnvironmentMessagingInitializer() {
+	}
+	
+	public void start() {
+		Logger logger = Logger.getLogger(TestEnvironmentMessagingInitializer.class.getName());
+		try {
+			logger.info("Starting Test-Kafka during initialization...");
+			TestKafkaStarter starter = new TestKafkaStarter();
+			starter.startKafka();
+			logger.info("Test-Kafka initialized");
+		} catch (Exception exc) {
+			logger.log(Level.INFO, "Exception occurred while starting test kafka", exc);
+			throw new RuntimeException(exc);
+		}
+	}
+
+	@Override
+	public void stop() {
+		// TODO Auto-generated method stub
+		
+	}
+
+	@Override
+	public String getName() {
+		return "Kafka1001";
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestKafkaStarter.java
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestKafkaStarter.java b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestKafkaStarter.java
new file mode 100755
index 0000000..1c3025e
--- /dev/null
+++ b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestKafkaStarter.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.messaging.kafka;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.BindException;
+import java.net.DatagramSocket;
+import java.net.ServerSocket;
+import java.rmi.NotBoundException;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.wink.json4j.JSONObject;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+
+import org.apache.atlas.odf.core.Utils;
+
+import kafka.cluster.Broker;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+
+public class TestKafkaStarter {
+
+	public static boolean deleteRecursive(File path) throws FileNotFoundException {
+		if (!path.exists()) {
+			throw new FileNotFoundException(path.getAbsolutePath());
+		}
+		boolean ret = true;
+		if (path.isDirectory()) {
+			for (File f : path.listFiles()) {
+				ret = ret && deleteRecursive(f);
+			}
+		}
+		return ret && path.delete();
+	}
+
+	static Thread zookeeperThread = null;
+	static boolean kafkaStarted = false;
+	static Object lockObject = new Object();
+	static KafkaServerStartable kafkaServer = null;
+	static ZooKeeperServerMainWithShutdown zooKeeperServer = null;
+
+
+	boolean cleanData = true; // all data is cleaned at server start !!
+
+	public boolean isCleanData() {
+		return cleanData;
+	}
+
+	public void setCleanData(boolean cleanData) {
+		this.cleanData = cleanData;
+	}
+
+	Logger logger = Logger.getLogger(TestKafkaStarter.class.getName());
+
+	void log(String s) {
+		logger.info(s);
+	}
+
+	int zookeeperStartupTime = 10000;
+	int kafkaStartupTime = 10000;
+
+	static class ZooKeeperServerMainWithShutdown extends ZooKeeperServerMain {
+		public void shutdown() {
+			super.shutdown();
+		}
+	}
+
+	private void startZookeeper() throws Exception {
+		log("Starting zookeeper");
+
+		final Properties zkProps = Utils.readConfigProperties("org/apache/atlas/odf/core/messaging/kafka/test-embedded-zookeeper.properties");
+		final String zkPort = (String) zkProps.get("clientPort");
+		if (zooKeeperServer == null) {
+			log("zookeeper properties: " + zkProps);
+			if (cleanData) {
+				String dataDir = zkProps.getProperty("dataDir");
+				log("Removing all data from zookeeper data dir " + dataDir);
+				File dir = new File(dataDir);
+				if (dir.exists()) {
+					if (!deleteRecursive(dir)) {
+						throw new IOException("Could not delete directory " + dataDir);
+					}
+				}
+			}
+			final ZooKeeperServerMainWithShutdown zk = new ZooKeeperServerMainWithShutdown();
+			final ServerConfig serverConfig = new ServerConfig();
+			log("Loading zookeeper config...");
+			QuorumPeerConfig zkConfig = new QuorumPeerConfig();
+			zkConfig.parseProperties(zkProps);
+			serverConfig.readFrom(zkConfig);
+
+			Runnable zookeeperStarter = new Runnable() {
+
+				@Override
+				public void run() {
+					try {
+						log("Now starting Zookeeper with API...");
+						zk.runFromConfig(serverConfig);
+					} catch (BindException ex) {
+						log("Embedded zookeeper could not be started, port is already in use. Trying to use external zookeeper");
+						ZooKeeper zk = null;
+						try {
+							zk = new ZooKeeper("localhost:" + zkPort, 5000, null);
+							if (zk.getState().equals(States.CONNECTED)) {
+								log("Using existing zookeeper running on port " + zkPort);
+								return;
+							} else {
+								throw new NotBoundException();
+							}
+						} catch (Exception zkEx) {
+							throw new RuntimeException("Could not connect to zookeeper on port " + zkPort + ". Please close all applications listening on this port.");
+						} finally {
+							if (zk != null) {
+								try {
+									zk.close();
+								} catch (InterruptedException e) {
+									logger.log(Level.WARNING, "An error occured closing the zk connection", e);
+								}
+							}
+						}
+					} catch (Exception e) {
+						e.printStackTrace();
+						throw new RuntimeException(e);
+					}
+
+				}
+			};
+
+			zookeeperThread = new Thread(zookeeperStarter);
+			zookeeperThread.setDaemon(true);
+			zookeeperThread.start();
+			log("Zookeeper start initiated");
+			zooKeeperServer = zk;
+		}
+		ZkConnection conn = new ZkConnection("localhost:" + zkPort);
+		final CountDownLatch latch = new CountDownLatch(1);
+		conn.connect(new Watcher() {
+
+			@Override
+			public void process(WatchedEvent event) {
+				log("Zookeeper event: " + event.getState());
+				if (event.getState().equals(KeeperState.SyncConnected)) {
+					log("Zookeeper server up and running");
+					latch.countDown();
+				}
+			}
+		});
+
+		boolean zkReady = latch.await(zookeeperStartupTime, TimeUnit.MILLISECONDS);
+		if (zkReady) {
+			log("Zookeeper initialized and started");
+
+		} else {
+			logger.severe("Zookeeper could not be initialized within " + (zookeeperStartupTime / 1000) + " sec");
+		}
+		conn.close();
+	}
+
+	public boolean isRunning() {
+		return kafkaStarted;
+	}
+
+	public void startKafka() throws Exception {
+		synchronized (lockObject) {
+			if (kafkaStarted) {
+				log("Kafka already running");
+				return;
+			}
+			this.startZookeeper();
+
+			log("Starting Kafka server...");
+			Properties kafkaProps = Utils.readConfigProperties("org/apache/atlas/odf/core/messaging/kafka/test-embedded-kafka.properties");
+			log("Kafka properties: " + kafkaProps);
+			KafkaConfig kafkaConfig = new KafkaConfig(kafkaProps);
+			int kafkaPort = kafkaConfig.port();
+			if (cleanData && isPortAvailable(kafkaPort)) {
+				String logDir = kafkaProps.getProperty("log.dirs");
+				log("Removing all data from kafka log dir: " + logDir);
+				File dir = new File(logDir);
+				if (dir.exists()) {
+					if (!deleteRecursive(dir)) {
+						throw new IOException("Kafka logDir could not be deleted: " + logDir);
+					}
+				}
+			}
+			if (!isPortAvailable(kafkaPort)) {
+				log("Kafka port " + kafkaPort + " is already in use. "
+						+ "Checking if zookeeper has a registered broker on this port to make sure it is an existing kafka instance using the port.");
+				ZooKeeper zk = new ZooKeeper(kafkaConfig.zkConnect(), 10000, null);
+				try {
+					List<String> ids = zk.getChildren("/brokers/ids", false);
+					if (ids != null && !ids.isEmpty()) {
+						for (String id : ids) {
+							String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null), "UTF-8");
+							JSONObject broker = new JSONObject(brokerInfo);
+							Integer port = new Integer(String.valueOf(broker.get("port")));
+							if (port != null && port.equals(kafkaPort)) {
+								log("Using externally started kafka broker on port " + port);
+								kafkaStarted = true;
+								return;
+							}
+						}
+					}
+				} catch (NoNodeException ex) {
+					log("No brokers registered with zookeeper!");
+					throw new RuntimeException("Kafka broker port " + kafkaPort
+							+ " not available and no broker found! Please close all running applications listening on this port");
+				} finally {
+					if (zk != null) {
+						try {
+							zk.close();
+						} catch (InterruptedException e) {
+							logger.log(Level.WARNING, "An error occured closing the zk connection", e);
+						}
+					}
+				}
+			}
+			KafkaServerStartable kafka  = KafkaServerStartable.fromProps(kafkaProps);
+			kafka.startup();
+			log("Kafka server start initiated");
+
+			kafkaServer = kafka;
+			log("Give Kafka a maximum of " + kafkaStartupTime + " ms to start");
+			ZkClient zk = new ZkClient(kafkaConfig.zkConnect(), 10000, 5000, ZKStringSerializer$.MODULE$);
+			int maxRetryCount = kafkaStartupTime / 1000;
+			int cnt = 0;
+			while (cnt < maxRetryCount) {
+				cnt++;
+				Seq<Broker> allBrokersInCluster = new ZkUtils(zk, new ZkConnection(kafkaConfig.zkConnect()), false).getAllBrokersInCluster();
+				List<Broker> brokers = JavaConversions.seqAsJavaList(allBrokersInCluster);
+				for (Broker broker : brokers) {
+					if (broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).port() == kafkaPort) {
+						log("Broker is registered, Kafka is available after " + cnt + " seconds");
+						kafkaStarted = true;
+						return;
+					}
+				}
+				Thread.sleep(1000);
+			}
+			logger.severe("Kafka broker was not started after " + kafkaStartupTime + " ms");
+		}
+	}
+
+	public void shutdownKafka() {
+		// do nothing for shutdown
+	}
+
+	boolean isPortAvailable(int port) {
+		ServerSocket ss = null;
+		DatagramSocket ds = null;
+		try {
+			ss = new ServerSocket(port);
+			ss.setReuseAddress(true);
+			ds = new DatagramSocket(port);
+			ds.setReuseAddress(true);
+			return true;
+		} catch (IOException e) {
+		} finally {
+			if (ds != null) {
+				ds.close();
+			}
+
+			if (ss != null) {
+				try {
+					ss.close();
+				} catch (IOException e) {
+				}
+			}
+		}
+
+		return false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-kafka.properties
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-kafka.properties b/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-kafka.properties
new file mode 100755
index 0000000..4769c95
--- /dev/null
+++ b/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-kafka.properties
@@ -0,0 +1,136 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id=0
+
+############################# Socket Server Settings #############################
+
+listeners=PLAINTEXT://:9092 
+
+# The port the socket server listens on
+# port=9092
+
+# Hostname the broker will bind to. If not set, the server will bind to all interfaces
+#host.name=localhost
+
+# Hostname the broker will advertise to producers and consumers. If not set, it uses the
+# value for "host.name" if configured.  Otherwise, it will use the value returned from
+# java.net.InetAddress.getCanonicalHostName().
+#advertised.host.name=<hostname routable by clients>
+
+# The port to publish to ZooKeeper for clients to use. If this is not set,
+# it will publish the same port that the broker binds to.
+#advertised.port=<port accessible by clients>
+
+# The number of threads handling network requests
+num.network.threads=3
+ 
+# The number of threads doing disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=102400
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=102400
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# A comma seperated list of directories under which to store log files
+log.dirs=/tmp/odf-embedded-test-kafka/kafka-logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=1
+
+# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
+# This value is recommended to be increased for installations with data dirs located in RAID array.
+num.recovery.threads.per.data.dir=1
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data to disk. 
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data may be lost if you are not using replication.
+#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
+#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. 
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+#log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+#log.flush.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=24
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+log.segment.bytes=1073741824
+
+# The interval at which log segments are checked to see if they can be deleted according 
+# to the retention policies
+log.retention.check.interval.ms=300000
+
+# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
+# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
+log.cleaner.enable=false
+
+############################# Zookeeper #############################
+
+# Zookeeper connection string (see zookeeper docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect=localhost:2181
+
+# Timeout in ms for connecting to zookeeper
+zookeeperConnectionTimeoutMs=6000

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-zookeeper.properties
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-zookeeper.properties b/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-zookeeper.properties
new file mode 100755
index 0000000..7234e9c
--- /dev/null
+++ b/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-zookeeper.properties
@@ -0,0 +1,34 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir=/tmp/odf-embedded-test-kafka/zookeeper
+# the port at which the clients will connect
+clientPort=2181
+# disable the per-ip limit on the number of connections since this is a non-production config
+maxClientCnxns=0
+

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/odf-implementation.properties
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/odf-implementation.properties b/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/odf-implementation.properties
new file mode 100755
index 0000000..5611c29
--- /dev/null
+++ b/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/odf-implementation.properties
@@ -0,0 +1,18 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+## USE for TESTs only
+
+ODFConfigurationStorage=MockConfigurationStorage
+SparkServiceExecutor=MockSparkServiceExecutor
+NotificationManager=TestNotificationManager

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark-example-application/.gitignore
----------------------------------------------------------------------
diff --git a/odf/odf-spark-example-application/.gitignore b/odf/odf-spark-example-application/.gitignore
new file mode 100755
index 0000000..d523581
--- /dev/null
+++ b/odf/odf-spark-example-application/.gitignore
@@ -0,0 +1,20 @@
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+.settings
+target
+.classpath
+.project
+.factorypath
+.DS_Store
+/bin/

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark-example-application/pom.xml
----------------------------------------------------------------------
diff --git a/odf/odf-spark-example-application/pom.xml b/odf/odf-spark-example-application/pom.xml
new file mode 100755
index 0000000..a2baa9e
--- /dev/null
+++ b/odf/odf-spark-example-application/pom.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0"?>
+<!--
+~
+~ Licensed under the Apache License, Version 2.0 (the "License");
+~ you may not use this file except in compliance with the License.
+~ You may obtain a copy of the License at
+~
+~   http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>com.apache.atlas.odf</groupId>
+		<artifactId>odf</artifactId>
+		<version>1.2.0-SNAPSHOT</version>
+	</parent>
+	<artifactId>odf-spark-example-application</artifactId>
+	<packaging>jar</packaging>
+	<name>odf-spark-example-application</name>
+	<build>
+		<plugins>
+			<plugin>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>3.3</version>
+				<configuration>
+					<source>1.7</source>
+					<target>1.7</target>
+				</configuration>
+			</plugin>
+			<plugin>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>single</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<descriptorRefs>
+						<descriptorRef>jar-with-dependencies</descriptorRef>
+					</descriptorRefs>
+				</configuration>
+			</plugin>
+		</plugins>
+
+	</build>
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.spark</groupId>
+			<artifactId>spark-sql_2.11</artifactId>
+			<version>2.1.0</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency> <!-- Spark dependency -->
+			<groupId>org.apache.spark</groupId>
+			<artifactId>spark-core_2.11</artifactId>
+			<version>2.1.0</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.atlas.odf</groupId>
+			<artifactId>odf-api</artifactId>
+			<version>1.2.0-SNAPSHOT</version>
+		</dependency>
+	</dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SparkDiscoveryServiceExample.java
----------------------------------------------------------------------
diff --git a/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SparkDiscoveryServiceExample.java b/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SparkDiscoveryServiceExample.java
new file mode 100755
index 0000000..f5f7b70
--- /dev/null
+++ b/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SparkDiscoveryServiceExample.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.spark;
+
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DataSetCheckResult;
+import org.apache.atlas.odf.api.spark.SparkDiscoveryServiceBase;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import org.apache.atlas.odf.api.spark.SparkDiscoveryService;
+import org.apache.atlas.odf.api.spark.SparkUtils;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse.ResponseCode;
+import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer;
+import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+
+public class SparkDiscoveryServiceExample extends SparkDiscoveryServiceBase implements SparkDiscoveryService {
+	static Logger logger = Logger.getLogger(SparkDiscoveryServiceExample.class.getName());
+
+	@Override
+	public DataSetCheckResult checkDataSet(DataSetContainer dataSetContainer) {
+		logger.log(Level.INFO, "Checking data set access.");
+		DataSetCheckResult checkResult = new DataSetCheckResult();
+		checkResult.setDataAccess(DataSetCheckResult.DataAccess.Possible);
+		Dataset<Row> df = SparkUtils.createDataFrame(this.spark, dataSetContainer, this.mds);
+		// Print first rows to check whether data frame can be accessed
+		df.show(10);
+		return checkResult;
+	}
+
+	@Override
+	public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest request) {
+		logger.log(Level.INFO, "Starting discovery service.");
+		Dataset<Row> df = SparkUtils.createDataFrame(spark, request.getDataSetContainer(), this.mds);
+		Map<String,Dataset<Row>> annotationDataFrameMap = SummaryStatistics.processDataFrame(this.spark, df, null);
+		DiscoveryServiceSyncResponse response = new DiscoveryServiceSyncResponse();
+		response.setCode(ResponseCode.OK);
+		response.setDetails("Discovery service successfully completed.");
+		response.setResult(SparkUtils.createAnnotationsFromDataFrameMap(request.getDataSetContainer(), annotationDataFrameMap, this.mds));
+		return response;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SummaryStatistics.java
----------------------------------------------------------------------
diff --git a/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SummaryStatistics.java b/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SummaryStatistics.java
new file mode 100755
index 0000000..a7d1542
--- /dev/null
+++ b/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SummaryStatistics.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.spark;
+
+import org.apache.atlas.odf.api.spark.SparkUtils;
+import org.apache.spark.SparkFiles;
+
+import java.text.MessageFormat;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+public class SummaryStatistics {
+	static Logger logger = Logger.getLogger(SummaryStatistics.class.getName());
+	private static final String CSV_FILE_PARAMETER = "-dataFile=";
+	// The following constant is defined in class DiscoveryServiceSparkEndpoint but is duplicated here to avoid dependencies to the ODF code:
+	private static final String ANNOTATION_PROPERTY_COLUMN_NAME = "ODF_ANNOTATED_COLUMN";
+
+	// The main method is only available for testing purposes and is not called by ODF
+	public static void main(String[] args) {
+		logger.log(Level.INFO, "Running spark launcher with arguments: " + args[0]);
+		if ((args[0] == null) || (!args[0].startsWith(CSV_FILE_PARAMETER))) {
+			System.out.println(MessageFormat.format("Error: Spark Application Parameter '{0}' is missing.", CSV_FILE_PARAMETER));
+			System.exit(1);
+		}
+		String dataFilePath = SparkFiles.get(args[0].replace(CSV_FILE_PARAMETER, ""));
+		logger.log(Level.INFO, "Data file path is " + dataFilePath);
+
+		// Create Spark session
+		SparkSession spark = SparkSession.builder().master("local").appName("ODF Spark example application").getOrCreate();
+
+		// Read CSV file into data frame
+		Dataset<Row> df = spark.read()
+		    .format("com.databricks.spark.csv")
+		    .option("inferSchema", "true")
+		    .option("header", "true")
+		    .load(dataFilePath);
+
+		// Run actual job and print result
+		Map<String, Dataset<Row>> annotationDataFrameMap = null;
+		try {
+			annotationDataFrameMap = processDataFrame(spark, df, args);
+		} catch (Exception e) {
+			logger.log(Level.INFO, MessageFormat.format("An error occurred while processing data set {0}:", args[0]), e);
+		} finally {
+			// Close and stop spark context
+			spark.close();
+			spark.stop();
+		}
+		if (annotationDataFrameMap == null) {
+			System.exit(1);
+		} else {
+			// Print all annotationDataFrames for all annotation types to stdout
+			for (Map.Entry<String, Dataset<Row>> entry : annotationDataFrameMap.entrySet()) {
+				logger.log(Level.INFO, "Result data frame for annotation type " + entry.getKey() + ":");
+				entry.getValue().show();
+			}
+		}
+	}
+
+	// The following method contains the actual implementation of the ODF Spark discovery service
+	public static Map<String,Dataset<Row>> processDataFrame(SparkSession spark, Dataset<Row> df, String[] args) {
+		logger.log(Level.INFO, "Started summary statistics Spark application.");
+		Map<String, Dataset<Row>> resultMap = new HashMap<String, Dataset<Row>>();
+
+		// Print input data set
+		df.show();
+
+		// Create column annotation data frame that contains basic data frame statistics
+		Dataset<Row> dfStatistics = df.describe();
+
+		// Rename "summary" column to ANNOTATION_PROPERTY_COLUMN_NAME
+		String[] columnNames = dfStatistics.columns();
+		columnNames[0] = ANNOTATION_PROPERTY_COLUMN_NAME;
+		Dataset<Row> summaryStatistics =  dfStatistics.toDF(columnNames);
+		summaryStatistics.show();
+		String columnAnnotationTypeName = "SparkSummaryStatisticsAnnotation";
+
+		// Transpose table to turn it into format required by ODF
+		Dataset<Row> columnAnnotationDataFrame = SparkUtils.transposeDataFrame(spark, summaryStatistics);
+		columnAnnotationDataFrame.show();
+
+		// Create table annotation that contains the data frame's column count
+		String tableAnnotationTypeName = "SparkTableAnnotation";
+		Dataset<Row> tableAnnotationDataFrame = columnAnnotationDataFrame.select(new Column("count")).limit(1);
+		tableAnnotationDataFrame.show();
+
+		// Add annotation data frames to result map
+		resultMap.put(columnAnnotationTypeName, columnAnnotationDataFrame);
+		resultMap.put(tableAnnotationTypeName, tableAnnotationDataFrame);
+
+		logger.log(Level.INFO, "Spark job finished.");
+		return resultMap;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark/.gitignore
----------------------------------------------------------------------
diff --git a/odf/odf-spark/.gitignore b/odf/odf-spark/.gitignore
new file mode 100755
index 0000000..cde346c
--- /dev/null
+++ b/odf/odf-spark/.gitignore
@@ -0,0 +1,19 @@
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+.settings
+target
+.classpath
+.project
+.factorypath
+.DS_Store

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark/pom.xml
----------------------------------------------------------------------
diff --git a/odf/odf-spark/pom.xml b/odf/odf-spark/pom.xml
new file mode 100755
index 0000000..378f280
--- /dev/null
+++ b/odf/odf-spark/pom.xml
@@ -0,0 +1,242 @@
+<?xml version="1.0"?>
+<!--
+~
+~ Licensed under the Apache License, Version 2.0 (the "License");
+~ you may not use this file except in compliance with the License.
+~ You may obtain a copy of the License at
+~
+~   http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+	xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.apache.atlas.odf</groupId>
+		<artifactId>odf</artifactId>
+		<version>1.2.0-SNAPSHOT</version>
+	</parent>
+	<artifactId>odf-spark</artifactId>
+	<packaging>jar</packaging>
+	<name>odf-spark</name>
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.atlas.odf</groupId>
+			<artifactId>odf-api</artifactId>
+			<version>1.2.0-SNAPSHOT</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.atlas.odf</groupId>
+			<artifactId>odf-core</artifactId>
+			<version>1.2.0-SNAPSHOT</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.atlas.odf</groupId>
+			<artifactId>odf-core</artifactId>
+			<version>1.2.0-SNAPSHOT</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.atlas.odf</groupId>
+			<artifactId>odf-messaging</artifactId>
+			<version>1.2.0-SNAPSHOT</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.atlas.odf</groupId>
+			<artifactId>odf-messaging</artifactId>
+			<version>1.2.0-SNAPSHOT</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.atlas.odf</groupId>
+			<artifactId>odf-store</artifactId>
+			<version>1.2.0-SNAPSHOT</version>
+			<scope>test</scope>
+		</dependency>
+		<!-- Workaround: Add odf-spark-example-application because dynamic jar load does not seem to work on IBM JDK -->
+		<dependency>
+			<groupId>org.apache.atlas.odf</groupId>
+			<artifactId>odf-spark-example-application</artifactId>
+			<version>1.2.0-SNAPSHOT</version>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>4.12</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.spark</groupId>
+			<artifactId>spark-launcher_2.11</artifactId>
+			<version>2.1.0</version>
+		</dependency>
+		<dependency>
+			<groupId>commons-io</groupId>
+			<artifactId>commons-io</artifactId>
+			<version>2.4</version>
+		</dependency>
+		<!-- The following Spark dependencies are needed for testing only. -->
+		<!-- Nevertheless, they have to be added as compile dependencies in order to become available to the SDPFactory. -->
+		<dependency>
+			<groupId>org.apache.spark</groupId>
+			<artifactId>spark-core_2.11</artifactId>
+			<version>2.1.0</version>
+			<exclusions>
+				<exclusion>
+					<groupId>commons-codec</groupId>
+					<artifactId>commons-codec</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.spark</groupId>
+			<artifactId>spark-sql_2.11</artifactId>
+			<version>2.1.0</version>
+			<exclusions>
+				<exclusion>
+					<groupId>commons-codec</groupId>
+					<artifactId>commons-codec</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+	</dependencies>
+	<build>
+		<resources>
+			<resource>
+				<directory>${project.build.directory}/downloads</directory>
+			</resource>
+		</resources>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<version>2.6</version>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<version>2.19</version>
+				<configuration>
+					<systemPropertyVariables>
+						<odf.zookeeper.connect>${testZookeepeConnectionString}</odf.zookeeper.connect>
+						<odf.logspec>${odf.unittest.logspec}</odf.logspec>
+						<odf.build.project.name>${project.name}</odf.build.project.name>
+					</systemPropertyVariables>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-dependency-plugin</artifactId>
+				<version>2.4</version>
+				<executions>
+					<execution>
+						<id>download-jar-file</id>
+						<phase>validate</phase>
+						<goals>
+							<goal>copy</goal>
+						</goals>
+						<configuration>
+							<artifactItems>
+								<artifactItem>
+									<groupId>org.apache.atlas.odf</groupId>
+									<artifactId>odf-api</artifactId>
+									<version>1.2.0-SNAPSHOT</version>
+									<type>jar</type>
+									<overWrite>true</overWrite>
+									<outputDirectory>${project.build.directory}/downloads/META-INF/spark</outputDirectory>
+								</artifactItem>
+								<artifactItem>
+									<groupId>org.apache.atlas.odf</groupId>
+									<artifactId>odf-spark-example-application</artifactId>
+									<version>1.2.0-SNAPSHOT</version>
+									<type>jar</type>
+									<overWrite>true</overWrite>
+									<outputDirectory>/tmp/odf-spark</outputDirectory>
+								</artifactItem>
+								<artifactItem>
+									<groupId>org.apache.atlas.odf</groupId>
+									<artifactId>odf-spark-example-application</artifactId>
+									<version>1.2.0-SNAPSHOT</version>
+									<type>jar</type>
+									<overWrite>true</overWrite>
+									<outputDirectory>${project.build.directory}/downloads/META-INF/spark</outputDirectory>
+								</artifactItem>
+								<artifactItem>
+									<groupId>org.apache.wink</groupId>
+									<artifactId>wink-json4j</artifactId>
+									<version>1.4</version>
+									<type>jar</type>
+									<overWrite>true</overWrite>
+									<outputDirectory>${project.build.directory}/downloads/META-INF/spark</outputDirectory>
+								</artifactItem>
+							</artifactItems>
+							<includes>**/*</includes>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+	<profiles>
+		<profile>
+			<id>integration-tests</id>
+			<activation>
+				<property>
+					<name>reduced-tests</name>
+					<value>!true</value>
+				</property>
+			</activation>
+			<build>
+				<plugins>
+					<plugin>
+						<groupId>org.apache.maven.plugins</groupId>
+						<artifactId>maven-failsafe-plugin</artifactId>
+						<version>2.19</version>
+						<configuration>
+							<systemPropertyVariables>
+								<odf.zookeeper.connect>${testZookeepeConnectionString}</odf.zookeeper.connect>
+								<odf.logspec>${odf.integrationtest.logspec}</odf.logspec>
+							</systemPropertyVariables>
+							<dependenciesToScan>
+								<dependency>org.apache.atlas.odf:odf-core</dependency>
+							</dependenciesToScan>
+							<includes>
+								<include>**/integrationtest/**/SparkDiscoveryServiceLocalTest.java</include>
+							</includes>
+						</configuration>
+						<executions>
+							<execution>
+								<id>integration-test</id>
+								<goals>
+									<goal>integration-test</goal>
+								</goals>
+							</execution>
+							<execution>
+								<id>verify</id>
+								<goals>
+									<goal>verify</goal>
+								</goals>
+							</execution>
+						</executions>
+					</plugin>
+				</plugins>
+			</build>
+		</profile>
+	</profiles>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/LocalSparkServiceExecutor.java
----------------------------------------------------------------------
diff --git a/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/LocalSparkServiceExecutor.java b/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/LocalSparkServiceExecutor.java
new file mode 100755
index 0000000..84ae80c
--- /dev/null
+++ b/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/LocalSparkServiceExecutor.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.spark;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.reflect.Constructor;
+import java.text.MessageFormat;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DataSetCheckResult;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.models.MetaDataObject;
+import org.apache.atlas.odf.api.metadata.models.RelationalDataSet;
+import org.apache.atlas.odf.api.spark.SparkDiscoveryService;
+import org.apache.atlas.odf.api.spark.SparkServiceExecutor;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceSparkEndpoint;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceSparkEndpoint.SERVICE_INTERFACE_TYPE;
+import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer;
+import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.api.spark.SparkUtils;
+import org.apache.atlas.odf.json.JSONUtils;
+
+/**
+ * This class calls the actual Spark discovery services depending on the type of interface they implement.
+ * The class is used to run a Spark discovery service either on a local Spark cluster ({@link SparkServiceExecutorImpl})
+ * or on a remote Spark cluster ({@link SparkApplicationStub}).
+ * 
+ *
+ */
+
+public class LocalSparkServiceExecutor implements SparkServiceExecutor {
+	private Logger logger = Logger.getLogger(LocalSparkServiceExecutor.class.getName());
+	private SparkSession spark;
+	private MetadataStore mds;
+
+	void setSparkSession(SparkSession spark) {
+		this.spark = spark;
+	}
+
+	void setMetadataStore(MetadataStore mds) {
+		this.mds = mds;
+	}
+
+	@Override
+	public DataSetCheckResult checkDataSet(DiscoveryServiceProperties dsProp, DataSetContainer container) {
+		DiscoveryServiceSparkEndpoint endpoint;
+		try {
+			endpoint = JSONUtils.convert(dsProp.getEndpoint(), DiscoveryServiceSparkEndpoint.class);
+		} catch (JSONException e1) {
+			throw new RuntimeException(e1);
+		}
+		DataSetCheckResult checkResult = new DataSetCheckResult();
+		try {
+			SERVICE_INTERFACE_TYPE inputMethod = endpoint.getInputMethod();
+			if (inputMethod.equals(SERVICE_INTERFACE_TYPE.DataFrame)) {
+				MetaDataObject dataSet = container.getDataSet();
+				if (!(dataSet instanceof RelationalDataSet)) {
+					checkResult.setDataAccess(DataSetCheckResult.DataAccess.NotPossible);
+					checkResult.setDetails("This service can only process relational data sets.");
+				} else {
+					checkResult.setDataAccess(DataSetCheckResult.DataAccess.Possible);
+					Dataset<Row> df = SparkUtils.createDataFrame(this.spark, container, this.mds);
+					// Print first rows to check whether data frame can be accessed
+					df.show(10);
+				}
+			} else if (inputMethod.equals(SERVICE_INTERFACE_TYPE.Generic)) {
+				Class<?> clazz = Class.forName(endpoint.getClassName());
+				Constructor<?> cons = clazz.getConstructor();
+				SparkDiscoveryService service = (SparkDiscoveryService) cons.newInstance();
+				service.setMetadataStore(this.mds);
+				service.setSparkSession(this.spark);
+				checkResult = service.checkDataSet(container);
+			}
+		} catch (Exception e) {
+			logger.log(Level.WARNING,"Access to data set not possible.", e);
+			checkResult.setDataAccess(DataSetCheckResult.DataAccess.NotPossible);
+			checkResult.setDetails(getExceptionAsString(e));
+		} finally {
+			this.spark.close();
+			this.spark.stop();
+		}
+		return checkResult;
+	}
+
+	@Override
+	public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceProperties dsProp, DiscoveryServiceRequest request) {
+		DiscoveryServiceSyncResponse response = new DiscoveryServiceSyncResponse();
+		response.setDetails("Annotations created successfully");
+		response.setCode(DiscoveryServiceResponse.ResponseCode.OK);
+		try {
+			DiscoveryServiceSparkEndpoint endpoint = JSONUtils.convert(dsProp.getEndpoint(), DiscoveryServiceSparkEndpoint.class);
+			Class<?> clazz = Class.forName(endpoint.getClassName());
+			DataSetContainer container = request.getDataSetContainer();
+			String[] optionalArgs = {}; // For future use
+			SERVICE_INTERFACE_TYPE inputMethod = endpoint.getInputMethod();
+
+			if (inputMethod.equals(SERVICE_INTERFACE_TYPE.DataFrame)) {
+				if (!(container.getDataSet() instanceof RelationalDataSet)) {
+					throw new RuntimeException("This service can only process relational data sets (DataFile or Table).");
+				}
+				Dataset<Row> df = SparkUtils.createDataFrame(this.spark, container, this.mds);
+				@SuppressWarnings("unchecked")
+				Map<String, Dataset<Row>> annotationDataFrameMap = (Map<String, Dataset<Row>>) clazz.getMethod("processDataFrame", SparkSession.class, Dataset.class, String[].class).invoke(null, this.spark, df, (Object[]) optionalArgs);
+				response.setResult(SparkUtils.createAnnotationsFromDataFrameMap(container, annotationDataFrameMap, this.mds));
+			} else if (inputMethod.equals(SERVICE_INTERFACE_TYPE.Generic)) {
+				Constructor<?> cons = clazz.getConstructor();
+				SparkDiscoveryService service = (SparkDiscoveryService) cons.newInstance();
+				service.setMetadataStore(this.mds);
+				service.setSparkSession(this.spark);
+				response = service.runAnalysis(request);
+			} else {
+				throw new RuntimeException(MessageFormat.format("Unsupported interface type {0}.", inputMethod));
+			}
+		} catch(Exception e) {
+			logger.log(Level.WARNING,"Error running discovery service.", e);
+			response.setDetails(getExceptionAsString(e));
+			response.setCode(DiscoveryServiceResponse.ResponseCode.UNKNOWN_ERROR);
+		} finally {
+			this.spark.close();
+			this.spark.stop();
+		}
+		return response;
+	}
+
+	public static String getExceptionAsString(Throwable exc) {
+		StringWriter sw = new StringWriter();
+		PrintWriter pw = new PrintWriter(sw);
+		exc.printStackTrace(pw);
+		String st = sw.toString();
+		return st;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkJars.java
----------------------------------------------------------------------
diff --git a/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkJars.java b/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkJars.java
new file mode 100755
index 0000000..81fea2c
--- /dev/null
+++ b/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkJars.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.spark;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.text.MessageFormat;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+
+import org.apache.atlas.odf.core.Utils;
+
+public class SparkJars {
+	private static Logger logger = Logger.getLogger(SparkJars.class.getName());
+
+	public String getResourceAsJarFile(String resource) {
+		ClassLoader cl = this.getClass().getClassLoader();
+		InputStream inputStream = cl.getResourceAsStream(resource);
+		if (inputStream == null) {
+        	String msg = MessageFormat.format("Resource {0} was not found.", resource);
+        	logger.log(Level.WARNING, msg);
+        	throw new RuntimeException(msg);
+		}
+		String tempFilePath = null;
+		try {
+		    File tempFile = File.createTempFile("driver", "jar");
+		    tempFilePath = tempFile.getAbsolutePath();
+		    logger.log(Level.INFO, "Creating temporary file " + tempFilePath);
+			IOUtils.copy(inputStream, new FileOutputStream(tempFile));
+			inputStream.close();
+			Utils.runSystemCommand("chmod 755 " + tempFilePath);
+		} catch (IOException e) {
+        	String msg = MessageFormat.format("Error creating temporary file from resource {0}: ", resource);
+        	logger.log(Level.WARNING, msg, e);
+        	throw new RuntimeException(msg + Utils.getExceptionAsString(e));
+		}
+		return tempFilePath;
+	}
+
+	public String getUrlasJarFile(String urlString) {
+		try {
+		    File tempFile = File.createTempFile("driver", "jar");
+	    	logger.log(Level.INFO, "Creating temporary file " + tempFile);
+			FileUtils.copyURLToFile(new URL(urlString), tempFile);
+			Utils.runSystemCommand("chmod 755 " + tempFile.getAbsolutePath());
+			return tempFile.getAbsolutePath();
+		} catch (MalformedURLException e) {
+			String msg = MessageFormat.format("An invalid Spark application URL {0} was provided: ", urlString);
+			logger.log(Level.WARNING, msg, e);
+			throw new RuntimeException(msg + Utils.getExceptionAsString(e));
+		} catch (IOException e) {
+			logger.log(Level.WARNING, "Error processing Spark application jar file.", e);
+			throw new RuntimeException("Error processing Spark application jar file: " + Utils.getExceptionAsString(e));
+		}
+	}
+
+	public byte[] getFileAsByteArray(String resourceOrURL) {
+        try {
+        	InputStream inputStream;
+        	if (isValidUrl(resourceOrURL)) {
+            	inputStream = new URL(resourceOrURL).openStream();
+        	} else {
+        		ClassLoader cl = this.getClass().getClassLoader();
+        		inputStream = cl.getResourceAsStream(resourceOrURL);
+        		if (inputStream == null) {
+                	String msg = MessageFormat.format("Resource {0} was not found.", resourceOrURL);
+                	logger.log(Level.WARNING, msg);
+                	throw new RuntimeException(msg);
+        		}
+        	}
+        	byte[] bytes = IOUtils.toByteArray(inputStream);
+        	return bytes;
+        } catch (IOException e) {
+        	String msg = MessageFormat.format("Error converting jar file {0} into byte array: ", resourceOrURL);
+        	logger.log(Level.WARNING, msg, e);
+        	throw new RuntimeException(msg + Utils.getExceptionAsString(e));
+        }
+	}
+
+	public static boolean isValidUrl(String urlString) {
+		try {
+			new URL(urlString);
+			return true;
+		} catch (java.net.MalformedURLException exc) {
+			// Expected exception if URL is not valid
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkServiceExecutorImpl.java
----------------------------------------------------------------------
diff --git a/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkServiceExecutorImpl.java b/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkServiceExecutorImpl.java
new file mode 100755
index 0000000..720343b
--- /dev/null
+++ b/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkServiceExecutorImpl.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.spark;
+
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.text.MessageFormat;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DataSetCheckResult;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.api.spark.SparkServiceExecutor;
+import org.apache.spark.sql.SparkSession;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceSparkEndpoint;
+import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer;
+import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.api.settings.SparkConfig;
+import org.apache.atlas.odf.json.JSONUtils;
+
+/**
+ * Calls the appropriate implementation (local vs. remote) of the @link SparkServiceExecutor depending on the current @SparkConfig.
+ * Prepares the local Spark cluster to be used in unit and integration tests.
+ * 
+ *
+ */
+
+public class SparkServiceExecutorImpl implements SparkServiceExecutor {
+	private Logger logger = Logger.getLogger(SparkServiceExecutorImpl.class.getName());
+
+	@Override
+	public DataSetCheckResult checkDataSet(DiscoveryServiceProperties dsri, DataSetContainer dataSetContainer) {
+		return this.getExecutor(dsri).checkDataSet(dsri, dataSetContainer);
+	};
+
+	@Override
+	public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceProperties dsri, DiscoveryServiceRequest request) {
+		return this.getExecutor(dsri).runAnalysis(dsri, request);
+	}
+
+	private SparkServiceExecutor getExecutor(DiscoveryServiceProperties dsri) {
+		SettingsManager config = new ODFFactory().create().getSettingsManager();
+		DiscoveryServiceSparkEndpoint endpoint;
+		try {
+			endpoint = JSONUtils.convert(dsri.getEndpoint(), DiscoveryServiceSparkEndpoint.class);
+		} catch (JSONException e1) {
+			throw new RuntimeException(e1);
+		}
+
+		SparkConfig sparkConfig = config.getODFSettings().getSparkConfig();
+		if (sparkConfig == null) {
+			String msg = "No Spark service is configured. Please manually register Spark service or bind a Spark service to your ODF Bluemix app.";
+			logger.log(Level.SEVERE, msg);
+			throw new RuntimeException(msg);
+		} else {
+			logger.log(Level.INFO, "Using local Spark cluster {0}.", sparkConfig.getClusterMasterUrl());
+			SparkSession spark = SparkSession.builder().master(sparkConfig.getClusterMasterUrl()).appName(dsri.getName()).getOrCreate();
+			SparkJars sparkJars = new SparkJars();
+			try {
+			    // Load jar file containing the Spark job to be started
+			    URLClassLoader classLoader = (URLClassLoader)ClassLoader.getSystemClassLoader();
+				Method method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
+			    method.setAccessible(true);
+			    String applicationJarFile;
+				if (SparkJars.isValidUrl(endpoint.getJar())) {
+					applicationJarFile = sparkJars.getUrlasJarFile(endpoint.getJar());
+				} else {
+					applicationJarFile = sparkJars.getResourceAsJarFile(endpoint.getJar());
+				}
+				logger.log(Level.INFO, "Using application jar file {0}.", applicationJarFile);
+			    method.invoke(classLoader, new URL("file:" + applicationJarFile));
+			} catch (Exception e) {
+				String msg = MessageFormat.format("Error loading jar file {0} implementing the Spark discovery service: ", endpoint.getJar());
+				logger.log(Level.WARNING, msg, e);
+				spark.close();
+				spark.stop();
+				throw new RuntimeException(msg, e);
+			}
+			LocalSparkServiceExecutor executor = new LocalSparkServiceExecutor();
+			executor.setSparkSession(spark);
+			executor.setMetadataStore(new ODFFactory().create().getMetadataStore());
+		    return executor;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark/src/main/resources/org/apache/atlas/odf/odf-implementation.properties
----------------------------------------------------------------------
diff --git a/odf/odf-spark/src/main/resources/org/apache/atlas/odf/odf-implementation.properties b/odf/odf-spark/src/main/resources/org/apache/atlas/odf/odf-implementation.properties
new file mode 100755
index 0000000..d6651ee
--- /dev/null
+++ b/odf/odf-spark/src/main/resources/org/apache/atlas/odf/odf-implementation.properties
@@ -0,0 +1,14 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+SparkServiceExecutor=org.apache.atlas.odf.core.spark.SparkServiceExecutorImpl

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-store/.gitignore
----------------------------------------------------------------------
diff --git a/odf/odf-store/.gitignore b/odf/odf-store/.gitignore
new file mode 100755
index 0000000..ea5ddb8
--- /dev/null
+++ b/odf/odf-store/.gitignore
@@ -0,0 +1,5 @@
+.settings
+target
+.classpath
+.project
+.factorypath
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-store/pom.xml
----------------------------------------------------------------------
diff --git a/odf/odf-store/pom.xml b/odf/odf-store/pom.xml
new file mode 100755
index 0000000..3d0a93d
--- /dev/null
+++ b/odf/odf-store/pom.xml
@@ -0,0 +1,87 @@
+<?xml version="1.0"?>
+<!--
+~
+~ Licensed under the Apache License, Version 2.0 (the "License");
+~ you may not use this file except in compliance with the License.
+~ You may obtain a copy of the License at
+~
+~   http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+	xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.apache.atlas.odf</groupId>
+		<artifactId>odf</artifactId>
+		<version>1.2.0-SNAPSHOT</version>
+	</parent>
+	<artifactId>odf-store</artifactId>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.atlas.odf</groupId>
+			<artifactId>odf-core</artifactId>
+			<version>1.2.0-SNAPSHOT</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.atlas.odf</groupId>
+			<artifactId>odf-messaging</artifactId>
+			<version>1.2.0-SNAPSHOT</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.zookeeper</groupId>
+			<artifactId>zookeeper</artifactId>
+			<version>3.4.6</version>
+			<scope>compile</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>4.12</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.atlas.odf</groupId>
+			<artifactId>odf-core</artifactId>
+			<version>1.2.0-SNAPSHOT</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<version>2.19</version>
+				<configuration>
+					<systemPropertyVariables>
+						<odf.logspec>${odf.unittest.logspec}</odf.logspec>
+						<odf.zookeeper.connect>${testZookeepeConnectionString}</odf.zookeeper.connect>
+						<odf.build.project.name>${project.name}</odf.build.project.name>
+					</systemPropertyVariables>
+					<dependenciesToScan>
+						<dependency>org.apache.atlas.odf:odf-core</dependency>
+					</dependenciesToScan>
+					<includes>
+					    <include>**/configuration/**/*.java</include>
+						<include>**/ZookeeperConfigurationStorageTest.java</include>
+					</includes>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-store/src/main/java/org/apache/atlas/odf/core/store/zookeeper34/ZookeeperConfigurationStorage.java
----------------------------------------------------------------------
diff --git a/odf/odf-store/src/main/java/org/apache/atlas/odf/core/store/zookeeper34/ZookeeperConfigurationStorage.java b/odf/odf-store/src/main/java/org/apache/atlas/odf/core/store/zookeeper34/ZookeeperConfigurationStorage.java
new file mode 100755
index 0000000..3ea9927
--- /dev/null
+++ b/odf/odf-store/src/main/java/org/apache/atlas/odf/core/store/zookeeper34/ZookeeperConfigurationStorage.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.store.zookeeper34;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.text.MessageFormat;
+import java.util.HashSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.configuration.ConfigContainer;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONException;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+import org.apache.atlas.odf.core.store.ODFConfigurationStorage;
+
+public class ZookeeperConfigurationStorage implements ODFConfigurationStorage {
+	private Logger logger = Logger.getLogger(ZookeeperConfigurationStorage.class.getName());
+	static final String ZOOKEEPER_CONFIG_PATH = "/odf/config";
+	static String configCache = null; // cache is a string so that the object is not accidentally modified
+	static Object configCacheLock = new Object();
+	static HashSet<String> pendingConfigChanges = new HashSet<String>();
+
+	String zookeeperString;
+
+	public ZookeeperConfigurationStorage() {
+		zookeeperString = new ODFInternalFactory().create(Environment.class).getZookeeperConnectString();
+	}
+
+	public void clearCache() {
+		synchronized (configCacheLock) {
+			configCache = null;
+		}
+	}
+	
+	@Override
+	public void storeConfig(ConfigContainer config) {
+		synchronized (configCacheLock) {
+			ZooKeeper zk = null;
+			String configTxt = null;
+			try {
+				configTxt = JSONUtils.toJSON(config);
+				zk = getZkConnectionSynchronously();
+				if (zk.exists(getZookeeperConfigPath(), false) == null) {
+					//config file doesn't exist in zookeeper yet, write default config
+					logger.log(Level.WARNING, "Zookeeper config not found - creating it before writing: {0}", configTxt);
+					initializeConfiguration(zk, configTxt);
+				}
+				zk.setData(getZookeeperConfigPath(), configTxt.getBytes("UTF-8"), -1);
+				configCache = configTxt;
+			} catch (InterruptedException e) {
+				e.printStackTrace();
+				throw new RuntimeException("A zookeeper connection could not be established in time to write settings");
+			} catch (KeeperException e) {
+				if (Code.NONODE.equals(e.code())) {
+					logger.info("Setting could not be written, the required node is not available!");
+					initializeConfiguration(zk, configTxt);
+					return;
+				}
+				//This should never happen! Only NoNode or BadVersion codes are possible. Because the file version is ignored, a BadVersion should never occur
+				throw new RuntimeException("A zookeeper connection could not be established because of an unknown exception", e);
+			} catch (UnsupportedEncodingException e) {
+				throw new RuntimeException("A zookeeper connection could not be established because of an incorrect encoding");
+			} catch (JSONException e) {
+				throw new RuntimeException("Configuration is not valid", e);
+			} finally {
+				if (zk != null) {
+					try {
+						zk.close();
+					} catch (InterruptedException e) {
+						e.printStackTrace();
+					}
+				}
+			}
+		}
+	}
+
+	@Override
+	public ConfigContainer getConfig(ConfigContainer defaultConfiguration) {
+		synchronized (configCacheLock) {
+			if (configCache == null) {
+				ZooKeeper zk = getZkConnectionSynchronously();
+				try {
+					if (zk.exists(getZookeeperConfigPath(), false) == null) {
+						//config file doesn't exist in zookeeper yet, write default config
+						String defaultConfigString = JSONUtils.toJSON(defaultConfiguration);
+						logger.log(Level.WARNING, "Zookeeper config not found - creating now with default: {0}", defaultConfigString);
+						initializeConfiguration(zk, defaultConfigString);
+					}
+					byte[] configBytes = zk.getData(getZookeeperConfigPath(), true, new Stat());
+					if (configBytes != null) {
+						String configString = new String(configBytes, "UTF-8");
+						configCache = configString;
+					} else {
+						// should never happen
+						throw new RuntimeException("Zookeeper configuration was not stored");
+					}
+				} catch (KeeperException e) {
+					throw new RuntimeException(MessageFormat.format("Zookeeper config could not be read, {0} Zookeeper exception occured!", e.code().name()), e);
+				} catch (InterruptedException e) {
+					throw new RuntimeException("Zookeeper config could not be read, the connection was interrupded", e);
+				} catch (IOException | JSONException e) {
+					throw new RuntimeException("Zookeeper config could not be read, the file could not be parsed correctly", e);
+				} finally {
+					if (zk != null) {
+						try {
+							zk.close();
+						} catch (InterruptedException e) {
+							e.printStackTrace();
+						}
+
+					}
+				}
+
+			}
+			try {
+				return JSONUtils.fromJSON(configCache, ConfigContainer.class);
+			} catch (JSONException e) {
+				throw new RuntimeException("Cached configuration was not valid", e);
+			}
+		}
+	}
+
+	private void initializeConfiguration(ZooKeeper zk, String config) {
+		try {
+			if (getZookeeperConfigPath().contains("/")) {
+				String[] nodes = getZookeeperConfigPath().split("/");
+				StringBuilder path = new StringBuilder();
+				for (String node : nodes) {
+					if (node.trim().equals("")) {
+						//ignore empty paths
+						continue;
+					}
+					path.append("/" + node);
+					try {
+						zk.create(path.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+					} catch (NodeExistsException ex) {
+						//ignore if node already exists and continue with next node
+					}
+				}
+			}
+
+			//use version -1 to ignore versioning conflicts
+			try {
+				zk.setData(getZookeeperConfigPath(), config.toString().getBytes("UTF-8"), -1);
+			} catch (UnsupportedEncodingException e) {
+				// should not happen
+				throw new RuntimeException(e);
+			}
+		} catch (KeeperException e) {
+			throw new RuntimeException(MessageFormat.format("The zookeeper config could not be initialized, a Zookeeper exception of type {0} occured!", e.code().name()), e);
+		} catch (InterruptedException e) {
+			throw new RuntimeException("The zookeeper config could not be initialized, the connection got interrupted!", e);
+		}
+	}
+
+	private ZooKeeper getZkConnectionSynchronously() {
+		final CountDownLatch latch = new CountDownLatch(1);
+		logger.log(Level.FINE, "Trying to connect to zookeeper at {0}", zookeeperString);
+		ZooKeeper zk = null;
+		try {
+			int timeout = 5;
+			zk = new ZooKeeper(zookeeperString, timeout * 1000, new Watcher() {
+
+				@Override
+				public void process(WatchedEvent event) {
+					if (event.getState().equals(Watcher.Event.KeeperState.ConnectedReadOnly) || event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
+						//count down latch, connected successfully to zk
+						latch.countDown();
+					}
+				}
+			});
+			//block thread till countdown, maximum of "timeout" seconds
+			latch.await(5 * timeout, TimeUnit.SECONDS);
+			if (latch.getCount() > 0) {
+				zk.close();
+				throw new RuntimeException("The zookeeper connection could not be retrieved on time!");
+			}
+			return zk;
+		} catch (IOException e1) {
+			throw new RuntimeException("The zookeeper connection could not be retrieved, the connection failed!", e1);
+		} catch (InterruptedException e) {
+			throw new RuntimeException("Zookeeper connection could not be retrieved, the thread was interrupted!", e);
+		}
+	}
+
+	public String getZookeeperConfigPath() {
+		return ZOOKEEPER_CONFIG_PATH;
+	}
+
+	@Override
+	public void onConfigChange(ConfigContainer container) {
+		synchronized (configCacheLock) {
+			try {
+				configCache = JSONUtils.toJSON(container);
+			} catch (JSONException e) {
+				throw new RuntimeException("Config could not be cloned!", e);
+			}
+		}
+	}
+
+	@Override
+	public void addPendingConfigChange(String changeId) {
+		synchronized (configCacheLock) {
+			pendingConfigChanges.add(changeId);
+		}
+	}
+
+	@Override
+	public void removePendingConfigChange(String changeId) {
+		synchronized (configCacheLock) {
+			pendingConfigChanges.remove(changeId);
+		}
+	}
+
+	@Override
+	public boolean isConfigChangePending(String changeId) {
+		synchronized (configCacheLock) {
+			return pendingConfigChanges.contains(changeId);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-store/src/main/resources/org/apache/atlas/odf/core/internal/zookeeper/test-embedded-zookeeper.properties
----------------------------------------------------------------------
diff --git a/odf/odf-store/src/main/resources/org/apache/atlas/odf/core/internal/zookeeper/test-embedded-zookeeper.properties b/odf/odf-store/src/main/resources/org/apache/atlas/odf/core/internal/zookeeper/test-embedded-zookeeper.properties
new file mode 100755
index 0000000..7234e9c
--- /dev/null
+++ b/odf/odf-store/src/main/resources/org/apache/atlas/odf/core/internal/zookeeper/test-embedded-zookeeper.properties
@@ -0,0 +1,34 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir=/tmp/odf-embedded-test-kafka/zookeeper
+# the port at which the clients will connect
+clientPort=2181
+# disable the per-ip limit on the number of connections since this is a non-production config
+maxClientCnxns=0
+

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-store/src/main/resources/org/apache/atlas/odf/odf-implementation.properties
----------------------------------------------------------------------
diff --git a/odf/odf-store/src/main/resources/org/apache/atlas/odf/odf-implementation.properties b/odf/odf-store/src/main/resources/org/apache/atlas/odf/odf-implementation.properties
new file mode 100755
index 0000000..65a7b5d
--- /dev/null
+++ b/odf/odf-store/src/main/resources/org/apache/atlas/odf/odf-implementation.properties
@@ -0,0 +1,14 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+ODFConfigurationStorage=org.apache.atlas.odf.core.store.zookeeper34.ZookeeperConfigurationStorage