You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/06/28 05:57:30 UTC

[17/25] incubator-atlas git commit: ATLAS-1898: initial commit of ODF

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFImplementations.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFImplementations.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFImplementations.java
new file mode 100755
index 0000000..4021049
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFImplementations.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.text.MessageFormat;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ODFImplementations {
+
+	Logger logger = Logger.getLogger(ODFImplementations.class.getName());
+
+	private Map<String, String> implementations = new HashMap<String, String>();
+
+	public ODFImplementations(String path, ClassLoader cl) {
+		Enumeration<URL> resources;
+		try {
+			resources = cl.getResources(path);
+		} catch (IOException exc) {
+			logger.log(Level.WARNING, MessageFormat.format("An error occurred while reading properties ''0'' could not be loaded", path), exc);
+			return;
+		}
+		while (resources.hasMoreElements()) {
+			URL url = resources.nextElement();
+			try {
+				InputStream is = url.openStream();
+				if (is != null) {
+					Properties props = new Properties();
+					props.load(is);
+					for (Object key : props.keySet()) {
+						String keyString = (String) key;
+						try {
+							if (implementations.containsKey(key)) {
+								String existingClassString = implementations.get(keyString);
+								String newClassString = props.getProperty(keyString);
+								if (!existingClassString.equals(newClassString)) {
+									Class<?> existingClass = cl.loadClass(existingClassString);
+									Class<?> newClass = cl.loadClass(newClassString);
+									String superClass = null;
+									String subClass = null;
+									// select the class lowest in the class hierarchy 
+									if (existingClass.isAssignableFrom(newClass)) {
+										superClass = existingClassString;
+										subClass = newClassString;
+									} else if (newClass.isAssignableFrom(existingClass)) {
+										superClass = newClassString;
+										subClass = existingClassString;
+									}
+									if (superClass != null) {
+										logger.log(Level.INFO, "Implementation for interface ''{0}'' was found more than once, using subclass ''{1}'' (found superclass ''{2}'')",
+												new Object[] { key, subClass, superClass });
+										implementations.put(keyString, subClass);
+									} else {
+										logger.log(Level.WARNING, "Implementation for interface ''{0}'' was found more than once, using ''{1}''. (Conflict between ''{1}'' and ''{2}'')",
+												new Object[] { key, existingClassString, newClassString });
+									}
+								}
+							} else {
+								cl.loadClass(props.getProperty(keyString));
+								implementations.put(keyString, props.getProperty(keyString));
+							}
+						} catch (ClassNotFoundException exc) {
+							logger.log(Level.SEVERE, "Class found in odf-implementation.properties file could not be loaded", exc);
+						}
+					}
+				}
+			} catch (IOException e) {
+				logger.log(Level.WARNING, MessageFormat.format("Properties ''0'' could not be loaded", url), e);
+			}
+		}
+	}
+
+	public Map<String, String> getImplementations() {
+		return implementations;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInitializer.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInitializer.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInitializer.java
new file mode 100755
index 0000000..64e54ad
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInitializer.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core;
+
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager;
+import org.apache.atlas.odf.core.messaging.DiscoveryServiceQueueManager;
+
+public class ODFInitializer {
+
+	static Logger logger = Logger.getLogger(ODFInitializer.class.getName());
+
+	static Object initLock = new Object();
+
+	private static boolean running = false;
+	private static long lastStopTimestamp = 0;
+	private static long lastStartTimestamp = 0;
+	private static boolean startStopInProgress = false;
+	
+
+	public static long getLastStopTimestamp() {
+		synchronized (initLock) {
+			return lastStopTimestamp;
+		}
+	}
+
+	public static long getLastStartTimestamp() {
+		synchronized (initLock) {
+			return lastStartTimestamp;
+		}
+	}
+
+	public static boolean isRunning() {
+		synchronized (initLock) {
+			return running;
+		}
+	}
+	
+	public static boolean isStartStopInProgress() {
+		return startStopInProgress;
+	}
+
+	public static void start() {
+		synchronized (initLock) {
+			if (!running) {
+				startStopInProgress = true;
+				DiscoveryServiceQueueManager qm = new ODFInternalFactory().create(DiscoveryServiceQueueManager.class);
+				try {
+					qm.start();
+				} catch (Exception e) {
+					logger.log(Level.WARNING, "Timeout occurred while starting ODF", e);
+				}
+				lastStartTimestamp = System.currentTimeMillis();
+				running = true;
+				startStopInProgress = false;
+			}
+		}
+	}
+
+	public static void stop() {
+		synchronized (initLock) {
+			if (running) {
+				startStopInProgress = true;
+				ODFInternalFactory f = new ODFInternalFactory();
+				DiscoveryServiceQueueManager qm = f.create(DiscoveryServiceQueueManager.class);
+				try {
+					qm.stop();
+				} catch (TimeoutException e) {
+					logger.log(Level.WARNING, "Timeout occurred while stopping ODF", e);
+				}
+				ThreadManager tm = f.create(ThreadManager.class);
+				tm.shutdownAllUnmanagedThreads();
+				AnalysisRequestTrackerStore arts = f.create(AnalysisRequestTrackerStore.class);
+				arts.clearCache();
+				lastStopTimestamp = System.currentTimeMillis();
+				running = false;
+				startStopInProgress = false;
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInternalFactory.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInternalFactory.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInternalFactory.java
new file mode 100755
index 0000000..4fd09a7
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFInternalFactory.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.utils.ODFLogConfig;
+
+public class ODFInternalFactory {
+
+	private static Properties defaultImplemenetations = Utils.readConfigProperties("org/apache/atlas/odf/core/internal/odf-default-implementation.properties");
+	private static ODFImplementations overwrittenImplementations = null;
+	private static Map<Class<?>, Object> singletons = new HashMap<>();
+
+	public static String SINGLETON_MARKER = "@singleton";
+
+	static {
+		ODFLogConfig.run();
+
+		Logger logger = Logger.getLogger(ODFInternalFactory.class.getName());
+		ClassLoader cl = ODFInternalFactory.class.getClassLoader();
+		String overwriteConfig = "org/apahe/atlas/odf/odf-implementation.properties";
+		overwrittenImplementations = new ODFImplementations(overwriteConfig, cl);
+		if (overwrittenImplementations.getImplementations().isEmpty()) {
+			overwrittenImplementations = null;
+		} else {
+			logger.log(Level.INFO, "Found overwritten implementation config: {0}", overwrittenImplementations.getImplementations());
+		}
+		if (overwrittenImplementations == null) {
+			logger.log(Level.INFO, "Default implementations are used");
+		}
+	}
+
+	private Object createObject(Class<?> cl) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+		String clazz = null;
+		if (overwrittenImplementations != null) {
+			clazz = overwrittenImplementations.getImplementations().get(cl.getName());
+		}
+		if (clazz == null) {
+			clazz = defaultImplemenetations.getProperty(cl.getName());
+		}
+		if (clazz == null) {
+			// finally try to instantiate the class as such
+			clazz = cl.getName();
+		}
+		boolean isSingleton = false;
+		if (clazz.endsWith(SINGLETON_MARKER)) {
+			clazz = clazz.substring(0, clazz.length() - SINGLETON_MARKER.length());
+			isSingleton = true;
+		}
+		Object o = null;
+		Class<?> implClass = this.getClass().getClassLoader().loadClass(clazz);
+		if (isSingleton) {
+			o = singletons.get(implClass);
+			if (o == null) {
+				o = implClass.newInstance();
+				singletons.put(implClass, o);
+			}
+		} else {
+			o = implClass.newInstance();
+		}
+		return o;
+	}
+
+	@SuppressWarnings("unchecked")
+	public <T> T create(Class<T> cl) {
+		try {
+			return (T) createObject(cl);
+		} catch (ClassNotFoundException e) {
+			throw new RuntimeException(e);
+		} catch (IllegalAccessException e) {
+			throw new RuntimeException(e);
+		} catch (InstantiationException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFUtils.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFUtils.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFUtils.java
new file mode 100755
index 0000000..623a727
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/ODFUtils.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core;
+
+import java.text.MessageFormat;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisManager;
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+
+public class ODFUtils {
+	public static int DEFAULT_TIMEOUT_SECS = 10 * 60; // 10 minutes
+
+	public static AnalysisRequestStatus runSynchronously(AnalysisManager analysisManager, AnalysisRequest request) {
+		return runSynchronously(analysisManager, request, DEFAULT_TIMEOUT_SECS); // default is 
+	}
+
+	public static AnalysisRequestStatus runSynchronously(AnalysisManager analysisManager, AnalysisRequest request, int timeoutInSeconds) {
+		Logger logger = Logger.getLogger(ODFUtils.class.getName());
+		AnalysisResponse response = analysisManager.runAnalysis(request);
+		if (response.isInvalidRequest()) {
+			AnalysisRequestStatus status = new AnalysisRequestStatus();
+			status.setState(AnalysisRequestStatus.State.ERROR);
+			status.setDetails(MessageFormat.format("Request was invalid. Details: {0}", response.getDetails()));
+			status.setRequest(request);
+			return status;
+		}
+		AnalysisRequestStatus status = null;
+		long startTime = System.currentTimeMillis();
+		boolean timeOutReached = false;
+		do {
+			logger.fine("Polling for result...");
+			status = analysisManager.getAnalysisRequestStatus(response.getId());
+			try {
+				Thread.sleep(1000);
+			} catch (InterruptedException e) {
+				e.printStackTrace();
+			}
+			long currentTime = System.currentTimeMillis();
+			timeOutReached = (currentTime - startTime) > (timeoutInSeconds * 1000);
+		} while ((AnalysisRequestStatus.State.ACTIVE.equals(status.getState()) || AnalysisRequestStatus.State.QUEUED.equals(status.getState()) //
+				&& !timeOutReached));
+		return status;
+
+	}
+
+	public static AnalysisRequestStatus.State combineStates(List<AnalysisRequestStatus.State> allStates) {
+		// if one of the requests is in error, so is the complete request
+		if (allStates.contains(AnalysisRequestStatus.State.ERROR)) {
+			return AnalysisRequestStatus.State.ERROR;
+		}
+		// if no request could be found -> not found
+		if (Utils.containsOnly(allStates, new AnalysisRequestStatus.State[] { AnalysisRequestStatus.State.NOT_FOUND })) {
+			return AnalysisRequestStatus.State.NOT_FOUND;
+		}
+		// if all request are either not found or finished -> finished
+		if (Utils.containsOnly(allStates, new AnalysisRequestStatus.State[] { AnalysisRequestStatus.State.NOT_FOUND, AnalysisRequestStatus.State.FINISHED })) {
+			return AnalysisRequestStatus.State.FINISHED;
+		}
+		// else always return active
+		return AnalysisRequestStatus.State.ACTIVE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/OpenDiscoveryFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/OpenDiscoveryFrameworkImpl.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/OpenDiscoveryFrameworkImpl.java
new file mode 100755
index 0000000..e8361fd
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/OpenDiscoveryFrameworkImpl.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.OpenDiscoveryFramework;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.importer.JDBCMetadataImporter;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.analysis.AnalysisManager;
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceManager;
+import org.apache.atlas.odf.api.engine.EngineManager;
+import org.apache.atlas.odf.api.engine.ServiceRuntimesInfo;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntimes;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class OpenDiscoveryFrameworkImpl implements OpenDiscoveryFramework {
+
+	private Logger logger = Logger.getLogger(OpenDiscoveryFrameworkImpl.class.getName());
+
+	public OpenDiscoveryFrameworkImpl() {
+		if (!ODFInitializer.isRunning() && !ODFInitializer.isStartStopInProgress()) {
+			logger.log(Level.INFO, "Initializing Open Discovery Platform");
+			ODFInitializer.start();
+			getEngineManager().checkHealthStatus(); // This implicitly initializes the control center and the message queues
+			
+			logger.log(Level.INFO, "Open Discovery Platform successfully initialized.");
+			
+			// log active runtimes
+			ServiceRuntimesInfo activeRuntimesInfo = ServiceRuntimes.getRuntimesInfo(ServiceRuntimes.getActiveRuntimes());
+			try {
+				logger.log(Level.INFO, "Active runtimes: ''{0}''", JSONUtils.toJSON(activeRuntimesInfo));
+			} catch (JSONException e) {
+				logger.log(Level.WARNING, "Active runtime info has wrong format", e);
+			}
+		}
+	}
+
+	public AnalysisManager getAnalysisManager() {
+		return new ODFInternalFactory().create(AnalysisManager.class);
+	}
+
+	public DiscoveryServiceManager getDiscoveryServiceManager() {
+		return new ODFInternalFactory().create(DiscoveryServiceManager.class);
+	}
+
+	public EngineManager getEngineManager() {
+		return new ODFInternalFactory().create(EngineManager.class);
+	}
+
+	public SettingsManager getSettingsManager() {
+		return new ODFInternalFactory().create(SettingsManager.class);
+	}
+
+	public AnnotationStore getAnnotationStore() {
+		return new ODFInternalFactory().create(AnnotationStore.class);
+	}
+
+	public MetadataStore getMetadataStore() {
+		return new ODFInternalFactory().create(MetadataStore.class);
+	}
+
+	public JDBCMetadataImporter getJDBCMetadataImporter() {
+		return new ODFInternalFactory().create(JDBCMetadataImporter.class);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/StandaloneEnvironment.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/StandaloneEnvironment.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/StandaloneEnvironment.java
new file mode 100755
index 0000000..e58dd37
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/StandaloneEnvironment.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.atlas.odf.core.configuration.ConfigContainer;
+
+public class StandaloneEnvironment implements Environment {
+
+	@Override
+	public String getProperty(String propertyName) {
+		return System.getProperty(propertyName);
+	}
+
+	@Override
+	public String getCurrentUser() {
+		return System.getProperty("user.name");
+	}
+
+	@Override
+	public String getZookeeperConnectString() {
+		return getProperty("odf.zookeeper.connect");
+	}
+
+	@Override
+	public ConfigContainer getDefaultConfiguration() {
+		return Utils.readConfigurationFromClasspath("org/apache/atlas/odf/core/internal/odf-initial-configuration.json");
+	}
+
+	@Override
+	public Map<String, String> getPropertiesWithPrefix(String prefix) {
+		Map<String, String> foundProps = new HashMap<>();
+		Properties props = System.getProperties();
+		for (String key : props.stringPropertyNames()) {
+			if (key.startsWith(prefix)) {
+				foundProps.put(key, props.getProperty(key));
+			}
+		}
+		return foundProps;
+	}
+
+	@Override
+	public List<String> getActiveRuntimeNames() {
+		String p = getProperty("odf.active.runtimes");
+		if (p == null || p.equals("ALL")) {
+			return null;
+		}
+		if (p.equals("NONE")) {
+			return new ArrayList<>();
+		}
+		return Arrays.asList(p.split(","));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/Utils.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/Utils.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/Utils.java
new file mode 100755
index 0000000..060f9fb
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/Utils.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.StringTokenizer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.settings.KafkaConsumerConfig;
+import org.apache.atlas.odf.api.settings.ODFSettings;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.core.configuration.ConfigContainer;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONObject;
+
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+
+public class Utils {
+
+	static Logger logger = Logger.getLogger(Utils.class.getName());
+
+	private static final List<Class<? extends Object>> MERGABLE_CLASSES = Arrays.asList(ConfigContainer.class, KafkaConsumerConfig.class, ODFSettings.class, DiscoveryServiceProperties.class);
+
+	public static void mergeODFPOJOs(Object source, Object update) {
+		if (!source.getClass().isAssignableFrom(update.getClass())) {
+			return;
+		}
+
+		Method[] sourceMethods = source.getClass().getDeclaredMethods();
+
+		for (Method getterMethod : sourceMethods) {
+			if (getterMethod.getName().startsWith("get") || getterMethod.getName().startsWith("is")) {
+				String setterMethodName = getterMethod.getName().replaceFirst("get", "set");
+				if (getterMethod.getName().startsWith("is")) {
+					setterMethodName = setterMethodName.replaceFirst("is", "set");
+				}
+				try {
+					Method setterMethod = source.getClass().getDeclaredMethod(setterMethodName, getterMethod.getReturnType());
+					Object updateValue = getterMethod.invoke(update);
+					if (updateValue != null) {
+						Object sourceValue = getterMethod.invoke(source);
+
+						if (sourceValue != null && MERGABLE_CLASSES.contains(updateValue.getClass())) {
+							//Value is another POJO, must also try merging these instead of overwriting
+							mergeODFPOJOs(sourceValue, updateValue);
+							setterMethod.invoke(source, sourceValue);
+						} else if (sourceValue instanceof Map && updateValue instanceof Map) {
+							Map updateJSON = (Map) updateValue;
+							Map sourceJSON = (Map) sourceValue;
+							for (Object key : updateJSON.keySet()) {
+								sourceJSON.put(key, updateJSON.get(key));
+							}
+							setterMethod.invoke(source, sourceJSON);
+						} else {
+							setterMethod.invoke(source, updateValue);
+						}
+					}
+
+				} catch (NoSuchMethodException e) {
+					throw new RuntimeException(MessageFormat.format("Objects of type {0}  and {1} could not be merged, no matching method found for {2}!", source.getClass().getName(), update
+							.getClass().getName(), getterMethod.getName()), e);
+				} catch (SecurityException e) {
+					throw new RuntimeException(MessageFormat.format("Objects of type {0}  and {1} could not be merged, method {2} could not be accessed (SecurityException)!", source.getClass()
+							.getName(), update.getClass().getName(), setterMethodName), e);
+				} catch (IllegalAccessException e) {
+					throw new RuntimeException(MessageFormat.format("Objects of type {0}  and {1} could not be merged, method {2} could not be accessed! (IllegalAccessException)", source.getClass()
+							.getName(), update.getClass().getName(), getterMethod.getName()), e);
+				} catch (IllegalArgumentException e) {
+					throw new RuntimeException(MessageFormat.format("Objects of type {0}  and {1} could not be merged, method {2} does not accept the right parameters!", source.getClass().getName(),
+							update.getClass().getName(), setterMethodName), e);
+				} catch (InvocationTargetException e) {
+					e.printStackTrace();
+					throw new RuntimeException(MessageFormat.format("Objects of type {0}  and {1} could not be merged, method {2} or {3} could not be invoked!", source.getClass().getName(), update
+							.getClass().getName(), getterMethod.getName(), setterMethodName), e);
+				}
+
+			}
+		}
+	}
+
+	public static Properties readConfigProperties(String path) {
+		// TODO cache this in static variables, it doesn't change at runtime 
+		InputStream is = Utils.class.getClassLoader().getResourceAsStream(path);
+		if (is == null) {
+			return null;
+		}
+		Properties props = new Properties();
+		try {
+			props.load(is);
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+		return props;
+	}
+
+	public static void setCurrentTimeAsLastModified(AnalysisRequestTracker tracker) {
+		tracker.setLastModified(System.currentTimeMillis());
+	}
+
+	public static String getExceptionAsString(Throwable exc) {
+		StringWriter sw = new StringWriter();
+		PrintWriter pw = new PrintWriter(sw);
+		exc.printStackTrace(pw);
+		String st = sw.toString();
+		return st;
+	}
+
+	public static String collectionToString(Collection<?> coll, String separator) {
+		StringBuffer buf = null;
+		for (Object o : coll) {
+			if (buf == null) {
+				buf = new StringBuffer("[ ");
+			} else {
+				buf.append(separator);
+			}
+			buf.append(o.toString());
+		}
+		buf.append(" ]");
+		return buf.toString();
+	}
+
+	public static <T> boolean containsOnly(List<T> l, T[] elements) {
+		for (T t : l) {
+			boolean containsOnlyElements = false;
+			for (T el : elements) {
+				if (t.equals(el)) {
+					containsOnlyElements = true;
+					break;
+				}
+			}
+			if (!containsOnlyElements) {
+				return false;
+			}
+		}
+		return true;
+	}
+
+	public static <T> boolean containsNone(List<T> l, T[] elements) {
+		for (T t : l) {
+			boolean containsAnyElement = false;
+			for (T el : elements) {
+				if (t.equals(el)) {
+					containsAnyElement = true;
+					break;
+				}
+			}
+			if (containsAnyElement) {
+				return true;
+			}
+		}
+		return false;
+	}
+
+	public static List<String> splitString(String s, char separator) {
+		List<String> l = new ArrayList<String>();
+		if (s != null) {
+			StringTokenizer tok = new StringTokenizer(s, String.valueOf(separator));
+			while (tok.hasMoreTokens()) {
+				l.add(tok.nextToken());
+			}
+		}
+		return l;
+	}
+
+	public static String getInputStreamAsString(InputStream is, String encoding) {
+		try {
+			final int n = 2048;
+			byte[] b = new byte[0];
+			byte[] temp = new byte[n];
+			int bytesRead;
+			while ((bytesRead = is.read(temp)) != -1) {
+				byte[] newB = new byte[b.length + bytesRead];
+				System.arraycopy(b, 0, newB, 0, b.length);
+				System.arraycopy(temp, 0, newB, b.length, bytesRead);
+				b = newB;
+			}
+			String s = new String(b, encoding);
+			return s;
+		} catch (IOException exc) {
+			return getExceptionAsString(exc);
+		}
+	}
+
+	public static void mergeJSONObjects(JSONObject source, JSONObject target) {
+		if (source != null && target != null) {
+			target.putAll(source);
+		}
+	}
+
+	public static <T> T getValue(T value, T defaultValue) {
+		if (value == null) {
+			return defaultValue;
+		}
+		return value;
+	}
+
+	public static String getSystemPropertyExceptionIfMissing(String propertyName) {
+		Environment env = new ODFInternalFactory().create(Environment.class);
+		String value = env.getProperty(propertyName);
+		if (value == null) {
+			String msg = MessageFormat.format("System property ''{0}'' is not set", propertyName);
+			logger.log(Level.SEVERE, msg);
+			throw new RuntimeException(msg);
+		}
+		return value;
+	}
+	
+	public static int getIntEnvironmentProperty(String propertyName, int defaultValue) {
+		Environment env = new ODFInternalFactory().create(Environment.class);
+		String value = env.getProperty(propertyName);
+		if (value == null) {
+			return defaultValue;
+		}
+		try {
+			return Integer.parseInt(value);
+		} catch(NumberFormatException exc) {
+			return defaultValue;
+		}
+	}
+
+
+	public static void runSystemCommand(String command) {
+		logger.log(Level.INFO, "Running system command: " + command);
+		try {
+			Runtime r = Runtime.getRuntime();
+			Process p = r.exec(command);
+			p.waitFor();
+			BufferedReader b = new BufferedReader(new InputStreamReader(p.getInputStream()));
+			String line = "";
+			while ((line = b.readLine()) != null) {
+				logger.log(Level.INFO, "System command out: " + line);
+			}
+			b.close();
+		} catch(IOException | InterruptedException e) {
+			logger.log(Level.INFO, "Error executing system command.", e);
+		}
+	}
+	
+	public static ConfigContainer readConfigurationFromClasspath(String jsonFileInClasspath) {
+		InputStream is = SettingsManager.class.getClassLoader().getResourceAsStream(jsonFileInClasspath);
+		try {
+			JSONObject configJSON = new JSONObject(is);
+			ConfigContainer config = JSONUtils.fromJSON(configJSON.write(), ConfigContainer.class);
+			return config;
+		} catch (Exception exc) {
+			throw new RuntimeException(exc);
+		}
+	}
+
+	public static String joinStrings(List<String> l, char separator) {
+		String result = null;
+		if ((l != null) && !l.isEmpty()) {
+			StringBuilder buf = null;
+			for (String s : l) {
+				if (buf == null) {
+					buf = new StringBuilder();
+				} else {
+					buf.append(separator);
+				}
+				buf.append(s);
+			}
+			result = buf.toString();
+		}
+		return result;
+	}
+	
+	public static String getEnvironmentProperty(String name, String defaultValue) {
+		Environment env = new ODFInternalFactory().create(Environment.class);
+		String s = env.getProperty(name);
+		return s != null ? s : defaultValue;		
+	}
+	
+	public static long getEnvironmentProperty(String name, long defaultValue) {
+		Environment env = new ODFInternalFactory().create(Environment.class);
+		String s = env.getProperty(name);
+		if (s == null) {
+			return defaultValue;
+		}
+		try {
+			return Long.parseLong(s);
+		} catch(NumberFormatException exc) {
+			String msg = MessageFormat.format("Property ''{0}'' could not be converted to an integer", new Object[]{name});
+			logger.log(Level.WARNING, msg);
+			return defaultValue;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/analysis/AnalysisManagerImpl.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/analysis/AnalysisManagerImpl.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/analysis/AnalysisManagerImpl.java
new file mode 100755
index 0000000..8f7fab2
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/analysis/AnalysisManagerImpl.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.analysis;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisCancelResult;
+import org.apache.atlas.odf.api.analysis.AnalysisManager;
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus.State;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestSummary;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackers;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.ODFUtils;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore;
+import org.apache.atlas.odf.core.controlcenter.ControlCenter;
+import org.apache.atlas.odf.json.JSONUtils;
+
+/**
+ *
+ * External Java API for creating and managing analysis requests
+ *
+ */
+public class AnalysisManagerImpl implements AnalysisManager {
+
+	public final static char COMPOUND_REQUEST_SEPARATOR = ',';
+	private Logger logger = Logger.getLogger(AnalysisManagerImpl.class.getName());
+	private ControlCenter controlCenter;
+
+	public AnalysisManagerImpl() {
+		controlCenter = new ODFInternalFactory().create(ControlCenter.class);
+	}
+
+	/**
+	 * Issues a new ODF analysis request
+	 *
+	 * @param request Analysis request
+	 * @return Response containing the request id and status information
+	 */
+	public AnalysisResponse runAnalysis(AnalysisRequest request) {
+		if (((request.getDiscoveryServiceSequence() == null) || request.getDiscoveryServiceSequence().isEmpty())
+			&& ((request.getAnnotationTypes() == null) || request.getAnnotationTypes().isEmpty())) {
+			AnalysisResponse response = new AnalysisResponse();
+			response.setId(request.getId());
+			response.setDetails("Either a sequence of discovery service ids or a list of annotation types must be specified to initiate an analysis request.");
+			response.setInvalidRequest(true);
+			return response;
+		}
+
+		if ((request.getDataSets().size() == 1) || request.isProcessDataSetsSequentially()) {
+			logger.log(Level.INFO, "Using sequential request processing (maybe because there is only a single data set)");
+			AnalysisResponse response = controlCenter.startRequest(request);
+			logger.log(Level.INFO, "Request with ID ''{0}'' started on data sets ''{1}''. Complete request: {2}.",
+					new Object[] { response.getId(), request.getDataSets(), JSONUtils.lazyJSONSerializer(request) });
+			return response;
+		}
+
+		List<String> requestIDs = new ArrayList<String>();
+		List<String> detailsMessages = new ArrayList<String>();
+		boolean invalidRequest = true;
+		logger.log(Level.INFO, "Running requests for ''{0}'' data sets in parallel", request.getDataSets().size());
+		logger.log(Level.FINE, "Splitting request into multiple request for each data set. Data Sets: {0}", request.getDataSets());
+		for (MetaDataObjectReference dataSet : request.getDataSets()) {
+			AnalysisRequest partRequest = new AnalysisRequest();
+			partRequest.setDiscoveryServiceSequence(request.getDiscoveryServiceSequence());
+			partRequest.setAdditionalProperties(request.getAdditionalProperties());
+			partRequest.setDataSets(Collections.singletonList(dataSet));
+			AnalysisResponse partResponse = controlCenter.startRequest(partRequest);
+			if (!partResponse.isInvalidRequest()) {
+				String partRequestID = partResponse.getId();
+				requestIDs.add(partRequestID);
+				detailsMessages.add(partResponse.getDetails());
+				// as soon as one request is valid, we make the compound request valid
+				invalidRequest = false;
+			}
+		}
+		AnalysisResponse response = new AnalysisResponse();
+		response.setId(Utils.joinStrings(requestIDs, COMPOUND_REQUEST_SEPARATOR));
+		response.setDetails(Utils.joinStrings(detailsMessages, COMPOUND_REQUEST_SEPARATOR));
+		response.setInvalidRequest(invalidRequest);
+		return response;
+	}
+
+	/**
+	 * Retrieve status of an ODF analysis request
+	 *
+	 * @param requestId Unique id of the analysis request
+	 * @return Status of the analysis request
+	 */
+	public AnalysisRequestStatus getAnalysisRequestStatus(String requestId) {
+		List<String> singleRequestIds = Utils.splitString(requestId, COMPOUND_REQUEST_SEPARATOR);
+		if (singleRequestIds.size() == 1) {
+			AnalysisRequestStatus status = controlCenter.getRequestStatus(requestId);
+			return status;
+		}
+		AnalysisRequestStatus compoundStatus = new AnalysisRequestStatus();
+		compoundStatus.setState(State.QUEUED);
+		AnalysisRequest compoundRequest = new AnalysisRequest(); // assemble a compound request 
+		compoundRequest.setId(requestId);
+		List<String> allMessages = new ArrayList<String>();
+		List<MetaDataObjectReference> allDataSets = new ArrayList<>();
+		List<State> allStates = new ArrayList<>();
+		for (String singleRequestId : singleRequestIds) {	
+			AnalysisRequestStatus singleStatus = controlCenter.getRequestStatus(singleRequestId);
+			if (compoundRequest.getDiscoveryServiceSequence() == null) {
+				// assume all fields of the single requests are the same
+				// since they were created through runAnalysis()
+				compoundRequest.setDiscoveryServiceSequence(singleStatus.getRequest().getDiscoveryServiceSequence());
+				compoundRequest.setAdditionalProperties(singleStatus.getRequest().getAdditionalProperties());
+			}
+			if (singleStatus.getRequest().getDataSets() != null) {
+				allDataSets.addAll(singleStatus.getRequest().getDataSets());
+			}
+			allStates.add(singleStatus.getState());
+			allMessages.add(singleStatus.getDetails());
+		}
+		compoundRequest.setDataSets(allDataSets);
+
+		compoundStatus.setState(ODFUtils.combineStates(allStates));
+		compoundStatus.setRequest(compoundRequest);
+		compoundStatus.setDetails(Utils.joinStrings(allMessages, COMPOUND_REQUEST_SEPARATOR));
+		return compoundStatus;
+	}
+
+	/**
+	 * Retrieve statistics about all previous ODF analysis requests
+	 *
+	 * @return Request summary
+	 */
+	public AnalysisRequestSummary getAnalysisStats() {
+		AnalysisRequestTrackerStore store = new ODFInternalFactory().create(AnalysisRequestTrackerStore.class);
+		return store.getRequestSummary();
+	}
+
+	/**
+	 * Retrieve status details of recent ODF analysis requests
+	 *
+	 * @param offset Starting offset (use 0 to start with the latest request)
+	 * @param limit Maximum number of analysis requests to be returned (use -1 to retrieve all requests)
+	 * @return Status details for each discovery request
+	 */
+	public AnalysisRequestTrackers getAnalysisRequests(int offset, int limit) {
+		AnalysisRequestTrackerStore store = new ODFInternalFactory().create(AnalysisRequestTrackerStore.class);
+		AnalysisRequestTrackers analysisrequestTrackers = new AnalysisRequestTrackers();
+		analysisrequestTrackers.setAnalysisRequestTrackers(store.getRecentTrackers(offset, limit));
+		return analysisrequestTrackers;
+	}
+
+	/**
+	 * Request a specific ODF discovery request to be canceled
+	 *
+	 * @param requestId Unique id of the analysis request
+	 * @return Status of the cancellation attempt
+	 */
+	public AnalysisCancelResult cancelAnalysisRequest(String requestId) {
+		return controlCenter.cancelRequest(requestId);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/annotation/InternalAnnotationStoreUtils.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/annotation/InternalAnnotationStoreUtils.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/annotation/InternalAnnotationStoreUtils.java
new file mode 100755
index 0000000..798b2d3
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/annotation/InternalAnnotationStoreUtils.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.annotation;
+
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResult;
+
+public class InternalAnnotationStoreUtils {
+
+	public static void storeDiscoveryServiceResult(DiscoveryServiceResult result, AnalysisRequest req) {
+		Logger logger = Logger.getLogger(InternalAnnotationStoreUtils.class.getName());
+		AnnotationStore mds = new ODFFactory().create().getAnnotationStore();
+		mds.setAnalysisRun(req.getId());
+		if (result != null) {
+			logger.log(Level.FINE, "Persisting annotations returned by discovery service");
+			List<Annotation> annotations = result.getAnnotations();
+			if (annotations != null) {
+				for (Annotation annot : annotations) {
+					// only persist if reference was not set
+					if (annot.getReference() == null) {
+						mds.store(annot);
+					} else {
+						logger.log(Level.WARNING, "Returned annotation object has a non-null reference set and will not be persisted (reference: {0})", annot.getReference().toString());
+					}
+				}
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigContainer.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigContainer.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigContainer.java
new file mode 100755
index 0000000..f779155
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigContainer.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.configuration;
+
+
+import java.util.List;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.settings.ODFSettings;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+
+/**
+ * { 
+ *  	"odf" : {...},
+ *  	"userDefined" : {...}
+ * }
+ *
+ *
+ * This class is final, because reflection is used to access getters / setters in order to merge. This doesn't work with inherited methods
+ */
+@ApiModel(description="All ODF configuration options.")
+public final class ConfigContainer {
+
+	@ApiModelProperty(value="General ODF configuration options along with details about available discovery services", required=true)
+	private ODFSettings odf;
+
+	@ApiModelProperty(value="Details about available discovery services")
+	private List<DiscoveryServiceProperties> registeredServices = null;
+
+	public List<DiscoveryServiceProperties> getRegisteredServices() {
+		return registeredServices;
+	}
+
+	public void setRegisteredServices(List<DiscoveryServiceProperties> registeredServices) {
+		this.registeredServices = registeredServices;
+	}
+
+	public ODFSettings getOdf() {
+		return odf;
+	}
+
+	public void setOdf(ODFSettings odfSettings) {
+		this.odf = odfSettings;
+	}
+
+	public void validate() throws ValidationException {
+		if (this.odf != null) {
+			odf.validate();
+		}
+		if (this.registeredServices != null) {
+			new ServiceValidator().validate("ODFConfig.registeredServices", this.registeredServices);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigManager.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigManager.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigManager.java
new file mode 100755
index 0000000..7ad90e6
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ConfigManager.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.configuration;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.UUID;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.settings.SparkConfig;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.apache.atlas.odf.core.Encryption;
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.core.controlcenter.ControlCenter;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.core.store.ODFConfigurationStorage;
+
+public class ConfigManager {
+	private Logger logger = Logger.getLogger(ConfigManager.class.getName());
+	public static final String HIDDEN_PASSWORD_IDENTIFIER = "***hidden***";
+	public static final long CONFIG_UPDATE_SLEEP_BETWEEN_POLLS = 20;
+	public static final int CONFIG_UPDATE_MAX_POLLS = 1500;
+	private static final String DEFAULT_ENCRYPTED_SPARK_CONFIGS = "spark.authenticate.secret,spark.ssl.keyPassword,spark.ssl.keyStorePassword,spark.ssl.trustStorePassword";
+
+	protected ODFConfigurationStorage configurationStore;
+	protected ODFConfigNotificationPublisher notificationManager;
+
+	public ConfigManager() {
+		ODFInternalFactory f = new ODFInternalFactory();
+		this.configurationStore = f.create(ODFConfigurationStorage.class);
+		this.notificationManager = f.create(ODFConfigNotificationPublisher.class);
+	}
+
+	public ConfigContainer getConfigContainer() {
+		ConfigContainer config = configurationStore.getConfig(getDefaultConfigContainer());
+		return config;
+	}
+
+	public ConfigContainer getConfigContainerHidePasswords() {
+		ConfigContainer config = configurationStore.getConfig(getDefaultConfigContainer());
+		hidePasswords(config);
+		return config;
+	}
+
+	public void updateConfigContainer(ConfigContainer update) throws ValidationException {
+		try {
+			update = JSONUtils.cloneJSONObject(update);
+		} catch (JSONException e) {
+			throw new RuntimeException(e);
+		}
+		update.validate();
+		ConfigContainer source = getConfigContainer();
+		unhideAndEncryptPasswords(update, source);
+
+		List<DiscoveryServiceProperties> newServicesToRun = new ArrayList<DiscoveryServiceProperties>();
+		if (update.getRegisteredServices() != null
+				&& source.getRegisteredServices().size() < update.getRegisteredServices().size()) {
+			// store added services if update registers new ones
+			List<DiscoveryServiceProperties> newRegisteredServices = new ArrayList<DiscoveryServiceProperties>();
+			newRegisteredServices.addAll(update.getRegisteredServices());
+			for (DiscoveryServiceProperties oldService : source.getRegisteredServices()) {
+				for (int no = 0; no < newRegisteredServices.size(); no++) {
+					if (newRegisteredServices.get(no).getId().equals(oldService.getId())) {
+						newRegisteredServices.remove(no);
+						break;
+					}
+				}
+			}
+
+			newServicesToRun.addAll(newRegisteredServices);
+		}
+
+		Utils.mergeODFPOJOs(source, update);
+		configurationStore.storeConfig(source);
+
+		if (source.getOdf().getRunNewServicesOnRegistration() && !newServicesToRun.isEmpty()) {
+			runNewServices(newServicesToRun);
+		}
+
+		String changeId = UUID.randomUUID().toString();
+		configurationStore.addPendingConfigChange(changeId);
+		this.notificationManager.publishConfigChange(source, changeId);
+		for (int i=0; i < CONFIG_UPDATE_MAX_POLLS; i++) {
+			if (!configurationStore.isConfigChangePending(changeId)) {
+				logger.log(Level.INFO, MessageFormat.format("Config change id ''{0}'' successfully completed after {1} msec.", new Object[] { changeId, i * CONFIG_UPDATE_SLEEP_BETWEEN_POLLS } ));
+				return;
+			}
+			try {
+				Thread.sleep(CONFIG_UPDATE_SLEEP_BETWEEN_POLLS);
+			} catch (InterruptedException e) {
+				// Ignore interrupt
+				logger.log(Level.WARNING, "Sleep period was interrupted", e);
+			}
+		}
+		logger.log(Level.WARNING, MessageFormat.format("Config change did not complete after {0} msec.", CONFIG_UPDATE_SLEEP_BETWEEN_POLLS * CONFIG_UPDATE_MAX_POLLS));
+	}
+
+	public void resetConfigContainer() {
+		logger.warning("resetting ODF configuration!");
+		configurationStore.storeConfig(getDefaultConfigContainer());
+	}
+
+	private static String defaultConfig = null;
+
+	List<DiscoveryServiceProperties> getServicesFoundOnClassPath() throws IOException, JSONException {
+		ClassLoader cl = this.getClass().getClassLoader();
+		Enumeration<URL> services = cl.getResources("META-INF/odf/odf-services.json");
+		List<DiscoveryServiceProperties> result = new ArrayList<>();
+		while (services.hasMoreElements()) {
+			URL url = services.nextElement();
+			InputStream is = url.openStream();
+			String json = Utils.getInputStreamAsString(is, "UTF-8");
+			logger.log(Level.INFO, "Service found on the classpath at {0}: {1}", new Object[] { url, json });
+			result.addAll(JSONUtils.fromJSONList(json, DiscoveryServiceProperties.class));
+		}
+		logger.log(Level.INFO, "Number of classpath services found: {0}", result.size());
+		return result;
+	}
+
+	private ConfigContainer getDefaultConfigContainer() {
+		if (defaultConfig == null) {			
+			try {
+				ConfigContainer config = new ODFInternalFactory().create(Environment.class).getDefaultConfiguration();
+				// now look for services found on the classpath
+				config.getRegisteredServices().addAll(getServicesFoundOnClassPath());
+				defaultConfig = JSONUtils.toJSON(config);
+			} catch (IOException | JSONException e) {
+				String msg = "Default config could not be loaded or parsed!";
+				logger.severe(msg);
+				throw new RuntimeException(msg, e);
+			}
+		}
+		try {
+			return JSONUtils.fromJSON(defaultConfig, ConfigContainer.class);
+		} catch (JSONException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	private void runNewServices(List<DiscoveryServiceProperties> newServices) {
+		ControlCenter cc = new ODFInternalFactory().create(ControlCenter.class);
+		List<String> servicesToRun = new ArrayList<String>();
+		for (DiscoveryServiceProperties info : newServices) {
+			servicesToRun.add(info.getId());
+		}
+
+		AnalysisRequest req = new AnalysisRequest();
+		MetadataStore mds = new ODFFactory().create().getMetadataStore();
+		req.setDiscoveryServiceSequence(servicesToRun);
+		req.setDataSets(mds.search(mds.newQueryBuilder().objectType("DataSet").build()));
+		req.setIgnoreDataSetCheck(true);
+		cc.startRequest(req);
+	}
+
+	private void unhideAndEncryptPasswords(ConfigContainer updatedConfigContainer,
+			ConfigContainer originalConfiguration) {
+		if (updatedConfigContainer.getOdf() != null) {
+			String odfPassword = updatedConfigContainer.getOdf().getOdfPassword();
+			if (odfPassword != null) {
+				if (odfPassword.equals(HIDDEN_PASSWORD_IDENTIFIER)) {
+					// Password was not changed, therefore keep original
+					// encrypted password
+					updatedConfigContainer.getOdf().setOdfPassword(originalConfiguration.getOdf().getOdfPassword());
+				} else if (!Encryption.isEncrypted(odfPassword)) {
+					updatedConfigContainer.getOdf().setOdfPassword(Encryption.encryptText(odfPassword));
+				}
+			}
+			if (updatedConfigContainer.getOdf().getSparkConfig() != null) {
+				SparkConfig updatedSparkConfig = updatedConfigContainer.getOdf().getSparkConfig();
+				if (updatedSparkConfig.getConfigs() != null) {
+					List<String> encryptedSparkConfigs = Arrays.asList(DEFAULT_ENCRYPTED_SPARK_CONFIGS.split(","));
+					for (String configName : updatedSparkConfig.getConfigs().keySet()) {
+						if (encryptedSparkConfigs.contains(configName)) {
+							String updatedConfigValue = (String) updatedSparkConfig.getConfigs().get(configName);
+							if (updatedConfigValue.equals(HIDDEN_PASSWORD_IDENTIFIER)) {
+								// Encrypted value was not changed, therefore keep original
+								// Encrypted value
+								SparkConfig originalSparkConfig = originalConfiguration.getOdf().getSparkConfig();
+								updatedSparkConfig.setConfig(configName, originalSparkConfig.getConfigs().get(configName));
+							} else if (!Encryption.isEncrypted(updatedConfigValue)) {
+								updatedSparkConfig.setConfig(configName, Encryption.encryptText(updatedConfigValue));
+							}
+						}
+					}
+				}
+			}
+		}
+	}
+
+	private void hidePasswords(ConfigContainer configContainer) {
+		if (configContainer.getOdf() != null) {
+			if (configContainer.getOdf().getOdfPassword() != null) {
+				configContainer.getOdf().setOdfPassword(HIDDEN_PASSWORD_IDENTIFIER);
+			}
+			if ((configContainer.getOdf().getSparkConfig() != null)){
+				SparkConfig sparkConfig = configContainer.getOdf().getSparkConfig();
+				if (sparkConfig.getConfigs() != null) {
+					List<String> encryptedSparkConfigs = Arrays.asList(DEFAULT_ENCRYPTED_SPARK_CONFIGS.split(","));
+					for (String configName : sparkConfig.getConfigs().keySet()) {
+						if (((encryptedSparkConfigs.contains(configName)) && (sparkConfig.getConfigs().get(configName)) != null)) {
+							sparkConfig.setConfig(configName, HIDDEN_PASSWORD_IDENTIFIER);
+						}
+					}
+				}
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ODFConfigNotificationPublisher.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ODFConfigNotificationPublisher.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ODFConfigNotificationPublisher.java
new file mode 100755
index 0000000..a7f822f
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ODFConfigNotificationPublisher.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.configuration;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.controlcenter.AdminMessage;
+import org.apache.atlas.odf.core.controlcenter.AdminMessage.Type;
+import org.apache.atlas.odf.core.messaging.DiscoveryServiceQueueManager;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class ODFConfigNotificationPublisher {
+
+	Logger logger = Logger.getLogger(ODFConfigNotificationPublisher.class.getName());
+
+	public void publishConfigChange(ConfigContainer update, String changeId) {
+		try {
+			logger.log(Level.FINE, "publishing config change: {0}", JSONUtils.toJSON(update));
+			ConfigContainer clone = JSONUtils.fromJSON(JSONUtils.toJSON(update), ConfigContainer.class);
+			AdminMessage amsg = new AdminMessage();
+			amsg.setId(changeId);
+			amsg.setAdminMessageType(Type.CONFIGCHANGE);
+			amsg.setConfigUpdateDetails(clone);
+			amsg.setDetails("Configuration update");
+			DiscoveryServiceQueueManager qm = new ODFInternalFactory().create(DiscoveryServiceQueueManager.class);
+			qm.enqueueInAdminQueue(amsg);
+		} catch (Exception exc) {
+			logger.log(Level.WARNING, "An unexpected exception occurres when writing to admin queue. Ignoring it", exc);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ServiceValidator.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ServiceValidator.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ServiceValidator.java
new file mode 100755
index 0000000..011d728
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/configuration/ServiceValidator.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.configuration;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.settings.validation.PropertyValidator;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntime;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntimes;
+
+public class ServiceValidator implements PropertyValidator {
+
+	public void validate(String property, Object value) throws ValidationException {
+		validate(property, value, true);
+	}
+
+	private void validate(String property, Object value, boolean topLevel) throws ValidationException {
+		if (value == null) {
+			throw new ValidationException("Null values are not allowed for this property");
+		}
+
+		if (value instanceof List) {
+			List<DiscoveryServiceProperties> newServices = (List<DiscoveryServiceProperties>) value;
+			List<String> ids = new ArrayList<String>();
+			for (int no = 0; no < newServices.size(); no++) {
+				DiscoveryServiceProperties service = (DiscoveryServiceProperties) newServices.get(no);
+				validate(property, service, false);
+				String serviceId = service.getId();
+				if (ids.contains(serviceId)) {
+					throw new ValidationException(property, MessageFormat.format("you cannot register multiple services with the same id {0}!", serviceId));
+				} else {
+					ids.add(serviceId);
+				}
+			}
+		} else if (value instanceof DiscoveryServiceProperties) {
+			DiscoveryServiceProperties service = (DiscoveryServiceProperties) value;
+			if (service.getId() == null || service.getId().trim().isEmpty() || service.getName() == null || service.getName().trim().isEmpty() || service.getEndpoint() == null) {
+				throw new ValidationException(property, MessageFormat.format("A service requires {0}", "id, name and an endpoint"));
+			}
+
+			if (topLevel) {
+				List<String> regServices = new ArrayList<String>();
+				List<DiscoveryServiceProperties> services = new ODFFactory().create().getDiscoveryServiceManager().getDiscoveryServicesProperties();
+				for (DiscoveryServiceProperties regService : services) {
+					regServices.add(regService.getId());
+				}
+
+				if (regServices.contains(service.getId())) {
+					throw new ValidationException(property, MessageFormat.format("a service with id {0} already exists!", service.getId()));
+				}
+			}
+
+			ServiceRuntime runtime = ServiceRuntimes.getRuntimeForDiscoveryService(service);
+			runtime.validate(service);
+		} else {
+			throw new ValidationException(property, "only DiscoveryServiceRegistrationInfo objects or list of such objects are allowed for this property");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminMessage.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminMessage.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminMessage.java
new file mode 100755
index 0000000..fffff6f
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminMessage.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import org.apache.atlas.odf.core.configuration.ConfigContainer;
+
+// JSON
+public class AdminMessage {
+	public static enum Type {
+		SHUTDOWN, RESTART, CONFIGCHANGE
+	}
+
+	private Type adminMessageType;
+	private String details;
+	private ConfigContainer configUpdateDetails;
+	private String messageId;
+
+	public Type getAdminMessageType() {
+		return adminMessageType;
+	}
+
+	public void setAdminMessageType(Type adminMessageType) {
+		this.adminMessageType = adminMessageType;
+	}
+
+	public String getDetails() {
+		return details;
+	}
+
+	public void setDetails(String details) {
+		this.details = details;
+	}
+
+	public ConfigContainer getConfigUpdateDetails() {
+		return configUpdateDetails;
+	}
+
+	public void setConfigUpdateDetails(ConfigContainer configUpdateDetails) {
+		this.configUpdateDetails = configUpdateDetails;
+	}
+
+	public String getId() {
+		return this.messageId;
+	}
+
+	public void setId(String messageId) {
+		this.messageId = messageId;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminQueueProcessor.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminQueueProcessor.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminQueueProcessor.java
new file mode 100755
index 0000000..874e061
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AdminQueueProcessor.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.core.ODFInitializer;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class AdminQueueProcessor implements QueueMessageProcessor {
+
+	private Logger logger = Logger.getLogger(AdminQueueProcessor.class.getName());
+
+	@Override
+	public void process(ExecutorService executorService, String msg, int partition, long offset) {
+		AdminMessage adminMessage;
+		try {
+			adminMessage = JSONUtils.fromJSON(msg, AdminMessage.class);
+		} catch (JSONException e) {
+			throw new RuntimeException(e);
+		}
+		switch (adminMessage.getAdminMessageType()) {
+		case SHUTDOWN:
+			initiateShutdown(executorService, false);
+			break;
+		case RESTART:
+			initiateShutdown(executorService, true);
+			break;
+		default:
+			// do nothing
+		}
+	}
+
+	static Object restartLockObject = new Object();
+
+	private void initiateShutdown(ExecutorService executorService, final boolean restart) {
+		logger.log(Level.INFO, "Shutdown of ODF was requested...");
+		Runnable shutDownRunnable = new Runnable() {
+
+			@Override
+			public void run() {
+				logger.log(Level.INFO, "Initiating shutdown");
+
+				// sleep some time before initiating the actual shutdown to give the process() a chance to return
+				// before it is itself shut down
+				long sleepTimeBeforeShutdown = 1000;
+				try {
+					Thread.sleep(sleepTimeBeforeShutdown);
+				} catch (InterruptedException e) {
+					// do nothing
+					e.printStackTrace();
+				}
+
+				synchronized (restartLockObject) {
+					logger.log(Level.INFO, "Shutting down ODF...");
+					try {
+						ODFInitializer.stop();
+						logger.log(Level.INFO, "ODF was shutdown");
+											
+						if (restart) {
+							logger.log(Level.INFO, "Restarting ODF");
+							ODFInitializer.start();
+							logger.log(Level.INFO, "ODF restarted");
+						}
+					}  catch (Exception e) {
+						logger.log(Level.SEVERE, "An unexpected error occurred when shutting down ODF", e);
+					}
+				}
+
+			}
+
+		};
+
+		executorService.submit(shutDownRunnable);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AnalysisRequestTrackerStore.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AnalysisRequestTrackerStore.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AnalysisRequestTrackerStore.java
new file mode 100755
index 0000000..e43bd45
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AnalysisRequestTrackerStore.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.List;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestSummary;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+
+
+public interface AnalysisRequestTrackerStore {
+	
+	/**
+	 * set the status of old requests which were last modified before the cutOffTimestamp
+	 * with an optional detailsMessage
+	 */
+	void setStatusOfOldRequest(long cutOffTimestamp, STATUS status, String detailsMessage);
+	
+	// store / update the passed tracker
+	void store(AnalysisRequestTracker tracker);
+	
+	AnalysisRequestTracker query(String analysisRequestId);
+
+	AnalysisRequestTracker findSimilarQueuedRequest(AnalysisRequest request);
+	
+	/**
+	 * @param number - number of trackers to retrieve, -1 for all
+	 * @return
+	 */
+	List<AnalysisRequestTracker> getRecentTrackers(int offset, int limit);
+	
+	/**
+	 * Clear any internal caches, if any.
+	 */
+	void clearCache(); 
+
+	int getSize();
+
+	AnalysisRequestSummary getRequestSummary();
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AsyncDiscoveryServiceWrapper.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AsyncDiscoveryServiceWrapper.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AsyncDiscoveryServiceWrapper.java
new file mode 100755
index 0000000..8100f18
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/AsyncDiscoveryServiceWrapper.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.api.discoveryservice.DataSetCheckResult;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse.ResponseCode;
+import org.apache.atlas.odf.api.discoveryservice.async.AsyncDiscoveryService;
+import org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncRunStatus;
+import org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncStartResponse;
+import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer;
+import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.core.Utils;
+
+public class AsyncDiscoveryServiceWrapper implements SyncDiscoveryService {
+
+	AsyncDiscoveryService wrappedService = null;
+
+	public AsyncDiscoveryServiceWrapper(AsyncDiscoveryService wrappedService) {
+		this.wrappedService = wrappedService;
+	}
+
+	@Override
+	public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest request) {
+		try {
+			DiscoveryServiceAsyncStartResponse asyncResponse = wrappedService.startAnalysis(request);
+			ResponseCode code = asyncResponse.getCode();
+			if (code != ResponseCode.OK) {
+				DiscoveryServiceSyncResponse response = new DiscoveryServiceSyncResponse();
+				response.setCode(code);
+				response.setDetails(asyncResponse.getDetails());
+				return response;
+			}
+			// poll the async service
+			final long maxWaitTimeSecs = Utils.getEnvironmentProperty("odf.async.max.wait.secs", 10 * 60); // default: 10 minutes
+			final long pollingIntervalMS = Utils.getEnvironmentProperty("odf.async.poll.interval.ms", 1000);
+			long maxPolls = (maxWaitTimeSecs * 1000) / pollingIntervalMS;
+			int pollCounter = 0;
+
+			DiscoveryServiceSyncResponse response = new DiscoveryServiceSyncResponse();
+			String runId = asyncResponse.getRunId();
+			while (pollCounter < maxPolls) {
+				Thread.sleep(pollingIntervalMS);
+				DiscoveryServiceAsyncRunStatus status = wrappedService.getStatus(runId);
+				switch (status.getState()) {
+				case NOT_FOUND:
+					// should not happen
+					response.setCode(ResponseCode.UNKNOWN_ERROR);
+					response.setDetails("Run ID " + runId + " was not found. This should not have happened.");
+					return response;
+				case ERROR:
+					response.setCode(ResponseCode.UNKNOWN_ERROR);
+					response.setDetails(status.getDetails());
+					return response;
+				case FINISHED:
+					response.setCode(ResponseCode.OK);
+					response.setDetails(status.getDetails());
+					response.setResult(status.getResult());
+					return response;
+				default:
+					// continue polling
+					pollCounter++;
+				}
+			}
+			response.setCode(ResponseCode.UNKNOWN_ERROR);
+			response.setDetails("Polled Async service for " + maxWaitTimeSecs + " seconds without positive result");
+			return response;
+		} catch (Exception exc) {
+			DiscoveryServiceSyncResponse response = new DiscoveryServiceSyncResponse();
+			response.setCode(ResponseCode.UNKNOWN_ERROR);
+			response.setDetails("An unknown error occurred: " + Utils.getExceptionAsString(exc));
+			return response;
+		}
+	}
+
+	public void setExecutorService(ExecutorService executorService) {
+		wrappedService.setExecutorService(executorService);
+	}
+
+	public void setMetadataStore(MetadataStore metadataStore) {
+		wrappedService.setMetadataStore(metadataStore);
+	}
+
+	public void setAnnotationStore(AnnotationStore annotationStore) {
+		wrappedService.setAnnotationStore(annotationStore);
+	}
+
+	public DataSetCheckResult checkDataSet(DataSetContainer dataSetContainer) {
+		return wrappedService.checkDataSet(dataSetContainer);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ConfigChangeQueueProcessor.java
----------------------------------------------------------------------
diff --git a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ConfigChangeQueueProcessor.java b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ConfigChangeQueueProcessor.java
new file mode 100755
index 0000000..bcd2965
--- /dev/null
+++ b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ConfigChangeQueueProcessor.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.controlcenter.AdminMessage.Type;
+import org.apache.atlas.odf.core.store.ODFConfigurationStorage;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class ConfigChangeQueueProcessor implements QueueMessageProcessor {
+
+	Logger logger = Logger.getLogger(ConfigChangeQueueProcessor.class.getName());
+	
+	@Override
+	public void process(ExecutorService executorService, String msg, int partition, long offset) {
+		try {
+			AdminMessage amsg = JSONUtils.fromJSON(msg, AdminMessage.class);
+			if (Type.CONFIGCHANGE.equals(amsg.getAdminMessageType())) {
+				logger.info("Received config change: " + JSONUtils.toJSON(amsg));
+				ODFInternalFactory f = new ODFInternalFactory();
+				ODFConfigurationStorage configStorage = f.create(ODFConfigurationStorage.class);
+				configStorage.onConfigChange(amsg.getConfigUpdateDetails());
+				configStorage.removePendingConfigChange(amsg.getId());
+			}
+		} catch(Exception exc) {
+			logger.log(Level.WARNING, "An exception occurred while processing admin message", exc);
+		}
+	}
+
+}